1
0

[HUDI-2252] Default consumes from the latest instant for flink streaming reader (#3368)

This commit is contained in:
swuferhong
2021-07-30 14:25:05 +08:00
committed by GitHub
parent 7bdae69053
commit 8b19ec9ca0
4 changed files with 83 additions and 9 deletions

View File

@@ -203,9 +203,10 @@ public class StreamReadMonitoringFunction
instantRange = InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(),
InstantRange.RangeType.CLOSE_CLOSE);
} else {
// first time consume and no start commit,
// would consume all the snapshot data PLUS incremental data set
instantRange = null;
// first time consume and no start commit, consumes the latest incremental data set.
HoodieInstant latestCommitInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
instantRange = InstantRange.getInstance(latestCommitInstant.getTimestamp(), instantToIssue.getTimestamp(),
InstantRange.RangeType.CLOSE_CLOSE);
}
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");

View File

@@ -70,6 +70,36 @@ public class TestStreamReadMonitoringFunction {
@Test
public void testConsumeFromLatestCommit() throws Exception {
// write 2 commits first and all the splits should come from the second commit.
TestData.writeData(TestData.DATA_SET_INSERT, conf);
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {
harness.setup();
harness.open();
CountDownLatch latch = new CountDownLatch(4);
CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
runAsync(sourceContext, function);
assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()),
"All the instants should have range limit");
String latestCommit = TestUtils.getLatestCommit(tempFile.getAbsolutePath());
assertTrue(sourceContext.splits.stream().allMatch(split -> split.getLatestCommit().equals(latestCommit)),
"All the splits should be with latestCommit instant time");
// Stop the stream task.
function.close();
}
}
@Test
public void testConsumeFromLastCommit() throws Exception {
TestData.writeData(TestData.DATA_SET_INSERT, conf);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {
@@ -84,8 +114,8 @@ public class TestStreamReadMonitoringFunction {
assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()),
"No instants should have range limit");
assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()),
"All instants should have range limit");
Thread.sleep(1000L);
@@ -163,8 +193,8 @@ public class TestStreamReadMonitoringFunction {
assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
assertThat("Should produce the expected splits",
sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()),
"No instants should have range limit");
assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()),
"All instants should have range limit");
}

View File

@@ -93,6 +93,36 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@TempDir
File tempFile;
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testStreamWriteAndReadFromSpecifiedCommit(HoodieTableType tableType) throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);
String firstCommit = TestUtils.getFirstCommit(tempFile.getAbsolutePath());
options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), firstCommit);
streamTableEnv.executeSql("drop table t1");
hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
}
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
@@ -109,13 +139,14 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);
// reading from latest commit instance.
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}
@ParameterizedTest

View File

@@ -160,6 +160,18 @@ public class TestData {
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
// data set of test_source.data latest commit.
public static List<RowData> DATA_SET_SOURCE_INSERT_LATEST_COMMIT = Arrays.asList(
insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20,
TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
);
// merged data set of test_source.data and test_source_2.data
public static List<RowData> DATA_SET_SOURCE_MERGED = Arrays.asList(
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,