diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 983c19f6f..92c06e951 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -203,9 +203,10 @@ public class StreamReadMonitoringFunction instantRange = InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE); } else { - // first time consume and no start commit, - // would consume all the snapshot data PLUS incremental data set - instantRange = null; + // first time consume and no start commit, consumes the latest incremental data set. + HoodieInstant latestCommitInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + instantRange = InstantRange.getInstance(latestCommitInstant.getTimestamp(), instantToIssue.getTimestamp(), + InstantRange.RangeType.CLOSE_CLOSE); } } else { LOG.info("No new instant found for the table under path " + path + ", skip reading"); diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java index 406da3228..f14574446 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java @@ -70,6 +70,36 @@ public class TestStreamReadMonitoringFunction { @Test public void testConsumeFromLatestCommit() throws Exception { + // write 2 commits first, and all the splits should come from the second commit. + TestData.writeData(TestData.DATA_SET_INSERT, conf); + TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); + StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); + try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { + harness.setup(); + harness.open(); + + CountDownLatch latch = new CountDownLatch(4); + CollectingSourceContext sourceContext = new CollectingSourceContext(latch); + + runAsync(sourceContext, function); + + assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); + assertThat("Should produce the expected splits", + sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); + + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), + "All the instants should have range limit"); + String latestCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath()); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(latestCommit)), + "All the splits should be with latestCommit instant time"); + + // Stop the stream task. + function.close(); + } + } + + @Test + public void testConsumeFromLastCommit() throws Exception { TestData.writeData(TestData.DATA_SET_INSERT, conf); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { @@ -84,8 +114,8 @@ public class TestStreamReadMonitoringFunction { assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); assertThat("Should produce the expected splits", sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); - assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), - "No instants should have range limit"); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), + "All instants should have range limit"); Thread.sleep(1000L); @@ -163,8 +193,8 @@ public class TestStreamReadMonitoringFunction { assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation"); assertThat("Should produce the expected splits", sourceContext.getPartitionPaths(), is("par1,par2,par3,par4")); - assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()), - "No instants should have range limit"); + assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), + "All instants should have range limit"); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 1ddcb740b..c1813dca4 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -93,6 +93,36 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @TempDir File tempFile; + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType) throws Exception { + // create filesystem table named source + String createSource = TestConfigurations.getFileSourceDDL("source"); + streamTableEnv.executeSql(createSource); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); + options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name()); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath()); + options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), firstCommit); + streamTableEnv.executeSql("drop table t1"); + hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + List rows = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + + // insert another batch of data + execInsertSql(streamTableEnv, insertInto); + List rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testStreamWriteAndRead(HoodieTableType tableType) throws Exception { @@ -109,13 +139,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); + // reading from latest commit instance. List rows = execSelectSql(streamTableEnv, "select * from t1", 10); - assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); // insert another batch of data execInsertSql(streamTableEnv, insertInto); List rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); - assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT); + assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); } @ParameterizedTest diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index b85c35b92..5ddb99c5d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -160,6 +160,18 @@ public class TestData { TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) ); + // data set of test_source.data latest commit. + public static List DATA_SET_SOURCE_INSERT_LATEST_COMMIT = Arrays.asList( + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), + insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), + insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), + insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) + ); + // merged data set of test_source.data and test_source_2.data public static List DATA_SET_SOURCE_MERGED = Arrays.asList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,