1
0

[HUDI-2731] Make clustering work regardless of whether there are base… (#3970)

This commit is contained in:
Sagar Sumit
2021-11-19 21:39:08 +05:30
committed by GitHub
parent bf008762df
commit eba354e922
6 changed files with 148 additions and 43 deletions

View File

@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
@@ -191,7 +192,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
@@ -205,6 +205,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema,
tableConfig.getPayloadClass(),

View File

@@ -45,6 +45,7 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.List;
@@ -52,6 +53,7 @@ import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTestHarness {
@@ -111,7 +113,8 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
/*
* Write 2 (more inserts to create new files)
@@ -119,7 +122,8 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
// we already set small file size to small number to force inserts to go into new file.
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
if (doUpdates) {
/*
@@ -144,28 +148,101 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());
// Do the clustering and validate
client.cluster(clusteringCommitTime, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient);
clusteredTable.getHoodieView().sync();
Stream<HoodieBaseFile> dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths())
.flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p));
// verify there should be only one base file per partition after clustering.
assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count());
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction());
if (cfg.populateMetaFields()) {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")),
"Must contain 200 records");
} else {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty()));
}
doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen);
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testClusteringWithNoBaseFiles(boolean doUpdates) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(10L)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024 * 1024)
.parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true)
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
// set index type to INMEMORY so that log files can be indexed, and it is safe to send
// inserts straight to the log to produce file slices with only log files and no data files
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0)
.withInlineClustering(true)
.withInlineClusteringNumCommits(1).build())
.withRollbackUsingMarkers(false);
HoodieWriteConfig cfg = cfgBuilder.build();
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps());
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
// test 2 inserts
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
// run updates
if (doUpdates) {
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecords(metaClient, records, client, cfg, newCommitTime);
}
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
hoodieTable.getHoodieView().sync();
FileStatus[] allBaseFiles = listAllBaseFilesInPath(hoodieTable);
// expect 0 base files for each partition
assertEquals(0, allBaseFiles.length);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
// verify log files are included in clustering plan for each partition.
assertEquals(dataGen.getPartitionPaths().length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());
// do the clustering and validate
doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen);
}
}
private void doClusteringAndValidate(SparkRDDWriteClient client,
String clusteringCommitTime,
HoodieTableMetaClient metaClient,
HoodieWriteConfig cfg,
HoodieTestDataGenerator dataGen) {
client.cluster(clusteringCommitTime, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient);
clusteredTable.getHoodieView().sync();
Stream<HoodieBaseFile> dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths())
.flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p));
assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count());
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction());
if (cfg.populateMetaFields()) {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")),
"Must contain 200 records");
} else {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty()));
}
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -57,6 +58,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -94,7 +96,8 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
client.startCommitWithTime(commitTime1);
List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
insertRecords(metaClient, records001, client, cfg, commitTime1);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records001, client, cfg, commitTime1);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
// verify only one base file shows up with commit time 001
FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath);
@@ -142,7 +145,8 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
String insertsTime = "006";
List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime, 200);
client.startCommitWithTime(insertsTime);
insertRecords(metaClient, records006, client, cfg, insertsTime);
dataFiles = insertRecords(metaClient, records006, client, cfg, insertsTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
// verify new write shows up in snapshot mode even though there is pending compaction
snapshotROFiles = getROSnapshotFiles(partitionPath);

View File

@@ -95,7 +95,8 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
insertRecords(metaClient, records, client, cfg, newCommitTime);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records, client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
/*
* Write 2 (updates)

View File

@@ -203,7 +203,8 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
index.updateLocation(HoodieJavaRDD.of(writeStatus), context, table));
}
protected void insertRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
protected Stream<HoodieBaseFile> insertRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records,
SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
@@ -228,8 +229,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
roView = getHoodieTableFileSystemView(reloadedMetaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the base files we wrote in the delta commit");
return dataFilesToRead;
}
protected void updateRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {

View File

@@ -23,33 +23,53 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
import java.util.Iterator;
import java.util.stream.StreamSupport;
/**
* Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice.
*/
public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Iterator<HoodieRecord<T>> {
private Iterator<HoodieRecord<T>> recordsIterator;
private final Iterator<HoodieRecord<T>> recordsIterator;
public static <R extends IndexedRecord, T> HoodieFileSliceReader getFileSliceReader(
HoodieFileReader<R> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
String preCombineField, Option<Pair<String,String>> simpleKeyGenFieldsOpt) throws IOException {
Iterator<R> baseIterator = baseFileReader.getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
: SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField());
scanner.processNextRecord(hoodieRecord);
public static HoodieFileSliceReader getFileSliceReader(
Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) throws IOException {
if (baseFileReader.isPresent()) {
Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
scanner.processNextRecord(hoodieRecord);
}
return new HoodieFileSliceReader(scanner.iterator());
} else {
Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator();
return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
.map(e -> {
try {
GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema).get();
return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
} catch (IOException io) {
throw new HoodieIOException("Error while creating reader for file slice with no base file.", io);
}
}).iterator());
}
return new HoodieFileSliceReader(scanner.iterator());
}
private static HoodieRecord<? extends HoodieRecordPayload> transform(
GenericRecord record, HoodieMergedLogRecordScanner scanner, String payloadClass,
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
return simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
: SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField());
}
private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {