diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 21966e0df..f50872672 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -19,6 +19,7 @@ package org.apache.hudi.source; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -111,6 +112,8 @@ public class StreamReadMonitoringFunction private final long maxCompactionMemoryInBytes; + private final boolean isDelta; + public StreamReadMonitoringFunction( Configuration conf, Path path, @@ -121,6 +124,7 @@ public class StreamReadMonitoringFunction this.metaClient = metaClient; this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL); this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; + this.isDelta = conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ); } @Override @@ -185,7 +189,10 @@ public class StreamReadMonitoringFunction @VisibleForTesting public void monitorDirAndForwardSplits(SourceContext context) { metaClient.reloadActiveTimeline(); - HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = isDelta + // if is delta, exclude the parquet files from compaction + ? metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() + : metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); if (commitTimeline.empty()) { LOG.warn("No splits found for the table under path " + path); return; @@ -238,8 +245,9 @@ public class StreamReadMonitoringFunction .sorted(HoodieLogFile.getLogFileComparator()) .map(logFile -> logFile.getPath().toString()) .collect(Collectors.toList())); + String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); return new MergeOnReadInputSplit(cnt.getAndAdd(1), - null, logPaths, commitToIssue, + basePath, logPaths, commitToIssue, metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange); }).collect(Collectors.toList())) .flatMap(Collection::stream) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index ada9e01cd..bc53c9223 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.HoodieROTablePathFilter; @@ -156,11 +155,6 @@ public class HoodieTableSource implements this.hadoopConf = StreamerUtil.getHadoopConf(); this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf)); - if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { - ValidationUtils.checkArgument( - conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ), - "Streaming read is only supported for table type: " + FlinkOptions.TABLE_TYPE_MERGE_ON_READ); - } } @Override @@ -377,6 +371,25 @@ public class HoodieTableSource implements .emitDelete(isStreaming) .build(); case COPY_ON_WRITE: + if (isStreaming) { + final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState( + rowType, + requiredRowType, + tableAvroSchema.toString(), + AvroSchemaConverter.convertToSchema(requiredRowType).toString(), + Collections.emptyList(), + conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); + return MergeOnReadInputFormat.builder() + .config(this.conf) + .paths(FilePathUtils.toFlinkPaths(paths)) + .tableState(hoodieTableState2) + // use the explicit fields data type because the AvroSchemaConverter + // is not very stable. + .fieldTypes(rowDataType.getChildren()) + .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME)) + .limit(this.limit) + .build(); + } FileInputFormat format = new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), this.schema.getFieldNames(), diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 1264ea941..595bf5a0f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.format.mor; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; @@ -64,6 +65,7 @@ import java.util.stream.IntStream; import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE; import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema; @@ -162,9 +164,17 @@ public class MergeOnReadInputFormat public void open(MergeOnReadInputSplit split) throws IOException { this.currentReadCount = 0L; this.hadoopConf = StreamerUtil.getHadoopConf(); - if (!split.getLogPaths().isPresent()) { - // base file only - this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get())); + if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) { + if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { + // base file only with commit time filtering + this.iterator = new BaseFileOnlyFilteringIterator( + split.getInstantRange(), + this.tableState.getRequiredRowType(), + getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos))); + } else { + // base file only + this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get())); + } } else if (!split.getBasePath().isPresent()) { // log files only this.iterator = new LogFileOnlyIterator(getLogFileIterator(split)); @@ -390,6 +400,57 @@ public class MergeOnReadInputFormat } } + /** + * Similar with {@link BaseFileOnlyIterator} but with instant time filtering. + */ + static class BaseFileOnlyFilteringIterator implements RecordIterator { + // base file reader + private final ParquetColumnarRowSplitReader reader; + private final InstantRange instantRange; + private final RowDataProjection projection; + + private RowData currentRecord; + + BaseFileOnlyFilteringIterator( + Option instantRange, + RowType requiredRowType, + ParquetColumnarRowSplitReader reader) { + this.reader = reader; + this.instantRange = instantRange.orElse(null); + int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray(); + projection = RowDataProjection.instance(requiredRowType, positions); + } + + @Override + public boolean reachedEnd() throws IOException { + while (!this.reader.reachedEnd()) { + currentRecord = this.reader.nextRecord(); + if (instantRange != null) { + boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); + if (isInRange) { + return false; + } + } else { + return false; + } + } + return true; + } + + @Override + public RowData nextRecord() { + // can promote: no need to project with null instant range + return projection.project(currentRecord); + } + + @Override + public void close() throws IOException { + if (this.reader != null) { + this.reader.close(); + } + } + } + static class LogFileOnlyIterator implements RecordIterator { // iterator for log files private final Iterator iterator; @@ -625,6 +686,17 @@ public class MergeOnReadInputFormat } } + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private static int[] getRequiredPosWithCommitTime(int[] requiredPos) { + int[] requiredPos2 = new int[requiredPos.length + 1]; + requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS; + System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length); + return requiredPos2; + } + @VisibleForTesting public void isEmitDelete(boolean emitDelete) { this.emitDelete = emitDelete; diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index fe652c5ff..3d413a7fb 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -82,8 +82,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @TempDir File tempFile; - @Test - void testStreamWriteAndRead() throws Exception { + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testStreamWriteAndRead(HoodieTableType tableType) throws Exception { // create filesystem table named source String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); @@ -91,7 +92,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); - options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; @@ -106,8 +107,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT); } - @Test - void testStreamReadAppendData() throws Exception { + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testStreamReadAppendData(HoodieTableType tableType) throws Exception { // create filesystem table named source String createSource = TestConfigurations.getFileSourceDDL("source"); String createSource2 = TestConfigurations.getFileSourceDDL("source2", "test_source_2.data"); @@ -117,7 +119,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); - options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options); streamTableEnv.executeSql(createHoodieTable); String insertInto = "insert into t1 select * from source";