diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index c7dcc0a38..ae6b3e1fe 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.InstantRange; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -52,11 +53,13 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; @@ -206,7 +209,7 @@ public class StreamReadMonitoringFunction return; } metaClient.reloadActiveTimeline(); - HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(); if (commitTimeline.empty()) { LOG.warn("No splits found for the table under path " + path); return; @@ -228,8 +231,7 @@ public class StreamReadMonitoringFunction : InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE); } else { // first time consume and no start commit, consumes the latest incremental data set. - HoodieInstant latestCommitInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); - instantRange = InstantRange.getInstance(latestCommitInstant.getTimestamp(), instantToIssue.getTimestamp(), + instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(), instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE); } } else { @@ -243,8 +245,13 @@ public class StreamReadMonitoringFunction // 4. use the file paths from #step 3 as the back-up of the filesystem view String tableName = conf.getString(FlinkOptions.TABLE_NAME); - List metadataList = instants.stream() + List activeMetadataList = instants.stream() .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); + List archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName); + List metadataList = archivedMetadataList.size() > 0 + ? mergeList(activeMetadataList, archivedMetadataList) + : activeMetadataList; + Set writePartitions = getWritePartitionPaths(metadataList); // apply partition push down if (this.requiredPartitionPaths.size() > 0) { @@ -325,6 +332,38 @@ public class StreamReadMonitoringFunction } } + /** + * Returns the archived metadata in case the reader consumes untimely or it wants + * to read from the earliest. + * + *

Note: should improve it with metadata table when the metadata table is stable enough. + * + * @param instantRange The instant range to filter the timeline instants + * @param commitTimeline The commit timeline + * @param tableName The table name + * @return the list of archived metadata, or empty if there is no need to read the archived timeline + */ + private List getArchivedMetadata( + InstantRange instantRange, + HoodieTimeline commitTimeline, + String tableName) { + if (instantRange == null || commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) { + // read the archived metadata if: + // 1. the start commit is 'earliest'; + // 2. the start instant is archived. + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); + if (!metaClient.getArchivedTimeline().empty()) { + Stream instantStream = archivedTimeline.getCommitsTimeline().filterCompletedInstants().getInstants(); + if (instantRange != null) { + instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, instantRange.getStartInstant())); + } + return instantStream + .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList()); + } + } + return Collections.emptyList(); + } + /** * Returns the instants with a given issuedInstant to start from. * @@ -335,19 +374,19 @@ public class StreamReadMonitoringFunction private List filterInstantsWithStart( HoodieTimeline commitTimeline, final String issuedInstant) { + HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants(); if (issuedInstant != null) { - return commitTimeline.getInstants() + return completedTimeline.getInstants() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) .collect(Collectors.toList()); } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent() && !this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) { String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT); - return commitTimeline.getInstants() + return completedTimeline.getInstants() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit)) .collect(Collectors.toList()); } else { - return commitTimeline.getInstants() - .collect(Collectors.toList()); + return completedTimeline.getInstants().collect(Collectors.toList()); } } @@ -363,4 +402,10 @@ public class StreamReadMonitoringFunction .flatMap(Collection::stream) .collect(Collectors.toSet()); } + + private static List mergeList(List list1, List list2) { + List merged = new ArrayList<>(list1); + merged.addAll(list2); + return merged; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index fc4239442..52cb76588 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -458,7 +458,7 @@ public class HoodieTableSource implements } private Schema inferSchemaFromDdl() { - Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toSourceRowDataType().getLogicalType()); + Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType()); return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 26e0be6ee..f8fc42ac3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -319,7 +319,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .option(FlinkOptions.TABLE_TYPE, tableType.name()) .option(FlinkOptions.READ_AS_STREAMING, "true") .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2") .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) @@ -334,6 +334,40 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, expected, true); } + @Test + void testStreamReadMorTableWithCompactionPlan() throws Exception { + String createSource = TestConfigurations.getFileSourceDDL("source"); + streamTableEnv.executeSql(createSource); + + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ) + .option(FlinkOptions.READ_AS_STREAMING, "true") + .option(FlinkOptions.READ_STREAMING_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST) + .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, "2") + // close the async compaction + .option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false) + // generate compaction plan for each commit + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, "1") + .withPartition(false) + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + + streamTableEnv.executeSql("insert into t1 select * from source"); + + List result = execSelectSql(streamTableEnv, "select * from t1", 10); + final String expected = "[" + + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], " + + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], " + + "+I[id3, Julian, 53, 1970-01-01T00:00:03, par2], " + + "+I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], " + + "+I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], " + + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"; + assertRowsEquals(result, expected); + } + @ParameterizedTest @MethodSource("executionModeAndPartitioningParams") void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index b6b738a95..3e0afc25a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -400,7 +400,7 @@ public class TestData { String rowsString = rows.stream() .sorted(Comparator.comparing(o -> toIdSafely(o.getField(0)))) .collect(Collectors.toList()).toString(); - assertThat(rowDataToString(expected), is(rowsString)); + assertThat(rowsString, is(rowDataToString(expected))); } /**