From 29bc5fd912df9517acb9e15ead97f1849f21056b Mon Sep 17 00:00:00 2001 From: Fugle666 <30539368+Fugle666@users.noreply.github.com> Date: Tue, 14 Dec 2021 11:31:36 +0800 Subject: [PATCH] [HUDI-2996] Flink streaming reader 'skip_compaction' option does not work (#4304) close apache/hudi#4304 --- .../apache/hudi/source/IncrementalInputSplits.java | 2 +- .../apache/hudi/table/HoodieDataSourceITCase.java | 9 +++++++-- .../test/java/org/apache/hudi/utils/TestUtils.java | 13 +++++++++++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index fbb77c630..58c38ef56 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -286,7 +286,7 @@ public class IncrementalInputSplits implements Serializable { HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants(); if (issuedInstant != null) { // returns early for streaming mode - return completedTimeline.getInstants() + return maySkipCompaction(completedTimeline.getInstants()) .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) .collect(Collectors.toList()); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 2844261a6..9eef2feb2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -245,7 +245,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { @Test void testStreamWriteReadSkippingCompaction() throws Exception { // create filesystem table named source - String createSource = TestConfigurations.getFileSourceDDL("source"); + String createSource = TestConfigurations.getFileSourceDDL("source", 4); streamTableEnv.executeSql(createSource); String hoodieTableDDL = sql("t1") @@ -260,7 +260,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); - List rows = execSelectSql(streamTableEnv, "select * from t1", 10); + String instant = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true); + + streamTableEnv.getConfig().getConfiguration() + .setBoolean("table.dynamic-table-options.enabled", true); + final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant); + List rows = execSelectSql(streamTableEnv, query, 10); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java index 92e16cd10..466ccdfd0 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java @@ -29,6 +29,8 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import javax.annotation.Nullable; + import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -64,6 +66,17 @@ public class TestUtils { .map(HoodieInstant::getTimestamp).orElse(null); } + @Nullable + public static String getNthCompleteInstant(String basePath, int n, boolean isDelta) { + final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build(); + return metaClient.getActiveTimeline() + .filterCompletedInstants() + .filter(instant -> isDelta ? HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) : HoodieTimeline.COMMIT_ACTION.equals(instant.getAction())) + .nthInstant(n).map(HoodieInstant::getTimestamp) + .orElse(null); + } + public static String getSplitPartitionPath(MergeOnReadInputSplit split) { assertTrue(split.getLogPaths().isPresent()); final String logPath = split.getLogPaths().get().get(0);