1
0

[HUDI-1837] Add optional instant range to log record scanner for log (#2870)

This commit is contained in:
Danny Chan
2021-04-26 16:53:18 +08:00
committed by GitHub
parent 3e4fa170cf
commit d047e91d86
13 changed files with 159 additions and 54 deletions

View File

@@ -179,11 +179,18 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), clusteringOp.getDeltaFilePaths(), readerSchema, instantTime,
maxMemoryPerCompaction, config.getCompactionLazyBlockReadEnabled(),
config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(),
config.getSpillableMapBasePath());
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
table.getMetaClient().getTableConfig().getPayloadClass()));