diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index 503a5bf06..4bb703e10 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -22,20 +22,24 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.util.StreamerUtil; +import org.apache.avro.Schema; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; @@ -54,11 +58,12 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import static java.util.stream.Collectors.toList; + /** * The function to load index from existing hoodieTable. * @@ -78,6 +83,7 @@ public class BootstrapFunction private final Configuration conf; private transient org.apache.hadoop.conf.Configuration hadoopConf; + private transient HoodieWriteConfig writeConfig; private GlobalAggregateManager aggregateManager; private ListState bootstrapState; @@ -108,13 +114,14 @@ public class BootstrapFunction public void open(Configuration parameters) throws Exception { super.open(parameters); this.hadoopConf = StreamerUtil.getHadoopConf(); + this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); this.hoodieTable = getTable(); this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager(); } @Override @SuppressWarnings("unchecked") - public void processElement(I value, Context ctx, Collector out) throws IOException { + public void processElement(I value, Context ctx, Collector out) throws Exception { if (!alreadyBootstrap) { String basePath = hoodieTable.getMetaClient().getBasePath(); int taskID = getRuntimeContext().getIndexOfThisSubtask(); @@ -155,11 +162,10 @@ public class BootstrapFunction } private HoodieFlinkTable getTable() { - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(this.hadoopConf), new FlinkTaskContextSupplier(getRuntimeContext())); - return HoodieFlinkTable.create(writeConfig, context); + return HoodieFlinkTable.create(this.writeConfig, context); } /** @@ -168,32 +174,64 @@ public class BootstrapFunction * @param partitionPath The partition path */ @SuppressWarnings("unchecked") - private void loadRecords(String partitionPath, Collector out) { + private void loadRecords(String partitionPath, Collector out) throws Exception { long start = System.currentTimeMillis(); + BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat()); - List latestBaseFiles = - HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable); - LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size()); + Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema(); final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); final int taskID = getRuntimeContext().getIndexOfThisSubtask(); - for (HoodieBaseFile baseFile : latestBaseFiles) { - boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator( - baseFile.getFileId(), maxParallelism, parallelism) == taskID; - if (shouldLoad) { - LOG.info("Load records from file {}.", baseFile); - final List hoodieKeys; - try { - hoodieKeys = - fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); - } catch (Exception e) { - throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); + Option latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline() + .filterCompletedInstants().lastInstant(); + + if (latestCommitTime.isPresent()) { + List fileSlices = this.hoodieTable.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true) + .collect(toList()); + + for (FileSlice fileSlice : fileSlices) { + if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) { + continue; } + LOG.info("Load records from {}.", fileSlice); - for (HoodieKey hoodieKey : hoodieKeys) { - out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, baseFile))); + // load parquet records + fileSlice.getBaseFile().ifPresent(baseFile -> { + // filter out crushed files + if (baseFile.getFileSize() <= 0) { + return; + } + + final List hoodieKeys; + try { + hoodieKeys = + fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); + } catch (Exception e) { + throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); + } + + for (HoodieKey hoodieKey : hoodieKeys) { + out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))); + } + }); + + // load avro log records + List logPaths = fileSlice.getLogFiles() + // filter out crushed files + .filter(logFile -> logFile.getFileSize() > 0) + .map(logFile -> logFile.getPath().toString()) + .collect(toList()); + HoodieMergedLogRecordScanner scanner = scanLog(logPaths, schema, latestCommitTime.get().getTimestamp()); + + try { + for (String recordKey : scanner.getRecords().keySet()) { + out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))); + } + } catch (Exception e) { + throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e); } } } @@ -203,17 +241,44 @@ public class BootstrapFunction this.getClass().getSimpleName(), taskID, partitionPath, cost); } + private HoodieMergedLogRecordScanner scanLog( + List logPaths, + Schema logSchema, + String latestInstantTime) { + String basePath = this.hoodieTable.getMetaClient().getBasePath(); + return HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(basePath, this.hadoopConf)) + .withBasePath(basePath) + .withLogFilePaths(logPaths) + .withReaderSchema(logSchema) + .withLatestInstantTime(latestInstantTime) + .withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled()) + .withReverseReader(false) + .withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()) + .withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge()) + .withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath()) + .build(); + } + @SuppressWarnings("unchecked") - public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, HoodieBaseFile baseFile) { + public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) { HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null); - hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId())); + hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId())); hoodieRecord.seal(); return hoodieRecord; } + private static boolean shouldLoadFile(String fileId, + int maxParallelism, + int parallelism, + int taskID) { + return KeyGroupRangeAssignment.assignKeyToParallelOperator( + fileId, maxParallelism, parallelism) == taskID; + } + @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { + public void notifyCheckpointComplete(long checkpointId) { // no operation } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 90a3b344b..b3338a908 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -571,7 +571,7 @@ public class TestWriteCopyOnWrite { checkWrittenData(tempFile, expected, 1); } - Map getMiniBatchExpected() { + protected Map getMiniBatchExpected() { Map expected = new HashMap<>(); // the last 2 lines are merged expected.put("par1", "[" @@ -581,6 +581,10 @@ public class TestWriteCopyOnWrite { return expected; } + protected Map getExpectedBeforeCheckpointComplete() { + return EXPECTED2; + } + @Test public void testIndexStateBootstrap() throws Exception { // open the function and ingest data @@ -637,7 +641,9 @@ public class TestWriteCopyOnWrite { nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); - checkWrittenData(tempFile, EXPECTED2); + + Map expected = getExpectedBeforeCheckpointComplete(); + checkWrittenData(tempFile, expected); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 17c13a60b..07e23b56e 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -37,8 +37,6 @@ import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; import java.io.File; import java.util.Comparator; @@ -83,13 +81,12 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema); } - @Disabled - @Test - public void testIndexStateBootstrap() { - // Ignore the index bootstrap because we only support parquet load now. + @Override + protected Map getExpectedBeforeCheckpointComplete() { + return EXPECTED1; } - Map getMiniBatchExpected() { + protected Map getMiniBatchExpected() { Map expected = new HashMap<>(); // MOR mode merges the messages with the same key. expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index 98d121180..13a71ecb8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -45,7 +45,7 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite { // Ignore the index bootstrap because we only support parquet load now. } - Map getMiniBatchExpected() { + protected Map getMiniBatchExpected() { Map expected = new HashMap<>(); // MOR mode merges the messages with the same key. expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");