diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index 9a9109afb..ed965dfb0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -179,11 +179,18 @@ public class SparkExecuteClusteringCommitActionExecutor baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); - HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(table.getMetaClient().getFs(), - table.getMetaClient().getBasePath(), clusteringOp.getDeltaFilePaths(), readerSchema, instantTime, - maxMemoryPerCompaction, config.getCompactionLazyBlockReadEnabled(), - config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), - config.getSpillableMapBasePath()); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(table.getMetaClient().getFs()) + .withBasePath(table.getMetaClient().getBasePath()) + .withLogFilePaths(clusteringOp.getDeltaFilePaths()) + .withReaderSchema(readerSchema) + .withLatestInstantTime(instantTime) + .withMaxMemorySizeInBytes(maxMemoryPerCompaction) + .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) + .withReverseReader(config.getCompactionReverseLogReadEnabled()) + .withBufferSize(config.getMaxDFSStreamBufferSize()) + .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .build(); recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, table.getMetaClient().getTableConfig().getPayloadClass())); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 50a44ad0f..ec7714ec7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -88,6 +89,8 @@ public abstract class AbstractHoodieLogRecordScanner { private final boolean reverseReader; // Buffer Size for log file reader private final int bufferSize; + // optional instant range for incremental block filtering + private final Option instantRange; // FileSystem private final FileSystem fs; // Total log files read - for metrics @@ -105,8 +108,8 @@ public abstract class AbstractHoodieLogRecordScanner { // Progress private float progress = 0.0f; - public AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize) { + protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, + String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, Option instantRange) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); @@ -118,6 +121,7 @@ public abstract class AbstractHoodieLogRecordScanner { this.reverseReader = reverseReader; this.fs = fs; this.bufferSize = bufferSize; + this.instantRange = instantRange; } /** @@ -141,6 +145,7 @@ public abstract class AbstractHoodieLogRecordScanner { totalLogFiles.set(scannedLogFiles.size()); // Use the HoodieLogFileReader to iterate through the blocks in the log file HoodieLogBlock r = logFormatReaderWrapper.next(); + final String instantTime = r.getLogBlockHeader().get(INSTANT_TIME); totalLogBlocks.incrementAndGet(); if (r.getBlockType() != CORRUPT_BLOCK && !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime @@ -149,12 +154,15 @@ public abstract class AbstractHoodieLogRecordScanner { break; } if (r.getBlockType() != CORRUPT_BLOCK && r.getBlockType() != COMMAND_BLOCK) { - String instantTime = r.getLogBlockHeader().get(INSTANT_TIME); if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime) || inflightInstantsTimeline.containsInstant(instantTime)) { // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one continue; } + if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) { + // filter the log block by instant range + continue; + } } switch (r.getBlockType()) { case HFILE_DATA_BLOCK: @@ -392,6 +400,10 @@ public abstract class AbstractHoodieLogRecordScanner { public abstract Builder withBufferSize(int bufferSize); + public Builder withInstantRange(Option instantRange) { + throw new UnsupportedOperationException(); + } + public abstract AbstractHoodieLogRecordScanner build(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 177be8b91..238b0c6e9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.exception.HoodieIOException; @@ -69,18 +70,11 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner public final HoodieTimer timer = new HoodieTimer(); @SuppressWarnings("unchecked") - public HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, + protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, - boolean reverseReader, int bufferSize, String spillableMapBasePath) { - this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, - reverseReader, bufferSize, spillableMapBasePath, true); - } - - @SuppressWarnings("unchecked") - public HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, - boolean reverseReader, int bufferSize, String spillableMapBasePath, boolean autoScan) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize); + boolean reverseReader, int bufferSize, String spillableMapBasePath, + Option instantRange, boolean autoScan) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange); try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), @@ -163,17 +157,21 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner * Builder used to build {@code HoodieUnMergedLogRecordScanner}. */ public static class Builder extends AbstractHoodieLogRecordScanner.Builder { - private FileSystem fs; - private String basePath; - private List logFilePaths; - private Schema readerSchema; - private String latestInstantTime; - private boolean readBlocksLazily; - private boolean reverseReader; - private int bufferSize; + protected FileSystem fs; + protected String basePath; + protected List logFilePaths; + protected Schema readerSchema; + protected String latestInstantTime; + protected boolean readBlocksLazily; + protected boolean reverseReader; + protected int bufferSize; // specific configurations - private Long maxMemorySizeInBytes; - private String spillableMapBasePath; + protected Long maxMemorySizeInBytes; + protected String spillableMapBasePath; + // incremental filtering + private Option instantRange = Option.empty(); + // auto scan default true + private boolean autoScan = true; public Builder withFileSystem(FileSystem fs) { this.fs = fs; @@ -215,6 +213,11 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner return this; } + public Builder withInstantRange(Option instantRange) { + this.instantRange = instantRange; + return this; + } + public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) { this.maxMemorySizeInBytes = maxMemorySizeInBytes; return this; @@ -225,11 +228,16 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner return this; } + public Builder withAutoScan(boolean autoScan) { + this.autoScan = autoScan; + return this; + } + @Override public HoodieMergedLogRecordScanner build() { return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, - bufferSize, spillableMapBasePath); + bufferSize, spillableMapBasePath, instantRange, autoScan); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 1aac6330e..dd0edd99a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; @@ -34,10 +35,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann private final LogRecordScannerCallback callback; - public HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, - LogRecordScannerCallback callback) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize); + private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, + String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, Option.empty()); this.callback = callback; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/InstantRange.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java similarity index 97% rename from hudi-flink/src/main/java/org/apache/hudi/table/format/mor/InstantRange.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java index a90d4f430..7f7ea560e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/InstantRange.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/InstantRange.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.format.mor; +package org.apache.hudi.common.table.log; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -66,7 +66,7 @@ public abstract class InstantRange implements Serializable { * Represents a range type. */ public enum RangeType { - OPEN_CLOSE, CLOSE_CLOSE; + OPEN_CLOSE, CLOSE_CLOSE } private static class OpenCloseRange extends InstantRange { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index eeede6f41..538ccfbdc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -214,8 +214,16 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // Load the schema Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); - logRecordScanner = new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath, logFilePaths, - schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE, spillableMapDirectory, null); + logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder() + .withFileSystem(metaClient.getFs()) + .withBasePath(metadataBasePath) + .withLogFilePaths(logFilePaths) + .withReaderSchema(schema) + .withLatestInstantTime(latestMetaInstantTimestamp) + .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES) + .withBufferSize(BUFFER_SIZE) + .withSpillableMapBasePath(spillableMapDirectory) + .build(); logScannerOpenMs = timer.endTimer(); LOG.info(String.format("Opened metadata log files from %s at instant (dataset instant=%s, metadata instant=%s) in %d ms", diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java index ae471dc52..23549da3c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java @@ -39,12 +39,12 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS // Set of all record keys that are to be read in memory private Set mergeKeyFilter; - public HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, + private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize, String spillableMapBasePath, Set mergeKeyFilter) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize, - spillableMapBasePath, false); - this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet(); + spillableMapBasePath, Option.empty(), false); + this.mergeKeyFilter = mergeKeyFilter; performScan(); } @@ -63,6 +63,13 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS } } + /** + * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}. + */ + public static HoodieMetadataMergedLogRecordScanner.Builder newBuilder() { + return new HoodieMetadataMergedLogRecordScanner.Builder(); + } + /** * Retrieve a record given its key. * @@ -72,4 +79,70 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS public Option> getRecordByKey(String key) { return Option.ofNullable((HoodieRecord) records.get(key)); } + + /** + * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}. + */ + public static class Builder extends HoodieMergedLogRecordScanner.Builder { + private Set mergeKeyFilter = Collections.emptySet(); + + public Builder withFileSystem(FileSystem fs) { + this.fs = fs; + return this; + } + + public Builder withBasePath(String basePath) { + this.basePath = basePath; + return this; + } + + public Builder withLogFilePaths(List logFilePaths) { + this.logFilePaths = logFilePaths; + return this; + } + + public Builder withReaderSchema(Schema schema) { + this.readerSchema = schema; + return this; + } + + public Builder withLatestInstantTime(String latestInstantTime) { + this.latestInstantTime = latestInstantTime; + return this; + } + + public Builder withReadBlocksLazily(boolean readBlocksLazily) { + throw new UnsupportedOperationException(); + } + + public Builder withReverseReader(boolean reverseReader) { + throw new UnsupportedOperationException(); + } + + public Builder withBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) { + this.maxMemorySizeInBytes = maxMemorySizeInBytes; + return this; + } + + public Builder withSpillableMapBasePath(String spillableMapBasePath) { + this.spillableMapBasePath = spillableMapBasePath; + return this; + } + + public Builder withMergeKeyFilter(Set mergeKeyFilter) { + this.mergeKeyFilter = mergeKeyFilter; + return this; + } + + @Override + public HoodieMetadataMergedLogRecordScanner build() { + return new HoodieMetadataMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, + latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter); + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index de9b950f1..dd20f22e4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -285,6 +285,12 @@ public class FlinkOptions { .defaultValue(128) .withDescription("Max log block size in MB for log file, default 128MB"); + public static final ConfigOption WRITE_LOG_MAX_SIZE = ConfigOptions + .key("write.log.max.size") + .intType() + .defaultValue(1024) + .withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB"); + public static final ConfigOption WRITE_MERGE_MAX_MEMORY = ConfigOptions .key("write.merge.max_memory") .intType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 6c96e8764..21966e0df 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -29,7 +30,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.table.format.mor.InstantRange; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.util.StreamerUtil; diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 702614b29..c075b093e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -79,6 +79,7 @@ public class FormatUtils { .withSpillableMapBasePath( config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + .withInstantRange(split.getInstantRange()) .build(); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 142d8b0ec..1264ea941 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -64,7 +64,6 @@ import java.util.stream.IntStream; import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE; import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType; -import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema; @@ -334,17 +333,6 @@ public class MergeOnReadInputFormat // skipping if the condition is unsatisfied // continue; } else { - // should improve the code when log scanner supports - // seeking by log blocks with commit time which is more - // efficient. - if (split.getInstantRange().isPresent()) { - // based on the fact that commit time is always the first field - String commitTime = curAvroRecord.get().get(HOODIE_COMMIT_TIME_COL_POS).toString(); - if (!split.getInstantRange().get().isInRange(commitTime)) { - // filter out the records that are not in range - continue; - } - } GenericRecord requiredAvroRecord = buildAvroRecordBySchema( curAvroRecord.get(), requiredSchema, diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java index 7aaa0ad8e..9d479e26b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format.mor; +import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; import org.apache.flink.core.io.InputSplit; diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index a7295fb0f..b28b604f0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -213,6 +213,7 @@ public class StreamerUtil { .forTable(conf.getString(FlinkOptions.TABLE_NAME)) .withStorageConfig(HoodieStorageConfig.newBuilder() .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) + .logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) .build()) .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));