[HUDI-2996] Flink streaming reader 'skip_compaction' option does not work (#4304)
close apache/hudi#4304
This commit is contained in:
@@ -286,7 +286,7 @@ public class IncrementalInputSplits implements Serializable {
|
||||
HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
|
||||
if (issuedInstant != null) {
|
||||
// returns early for streaming mode
|
||||
return completedTimeline.getInstants()
|
||||
return maySkipCompaction(completedTimeline.getInstants())
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@@ -245,7 +245,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
@Test
|
||||
void testStreamWriteReadSkippingCompaction() throws Exception {
|
||||
// create filesystem table named source
|
||||
String createSource = TestConfigurations.getFileSourceDDL("source");
|
||||
String createSource = TestConfigurations.getFileSourceDDL("source", 4);
|
||||
streamTableEnv.executeSql(createSource);
|
||||
|
||||
String hoodieTableDDL = sql("t1")
|
||||
@@ -260,7 +260,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
String insertInto = "insert into t1 select * from source";
|
||||
execInsertSql(streamTableEnv, insertInto);
|
||||
|
||||
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
|
||||
String instant = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true);
|
||||
|
||||
streamTableEnv.getConfig().getConfiguration()
|
||||
.setBoolean("table.dynamic-table-options.enabled", true);
|
||||
final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant);
|
||||
List<Row> rows = execSelectSql(streamTableEnv, query, 10);
|
||||
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,8 @@ import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.core.fs.Path;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
@@ -64,6 +66,17 @@ public class TestUtils {
|
||||
.map(HoodieInstant::getTimestamp).orElse(null);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static String getNthCompleteInstant(String basePath, int n, boolean isDelta) {
|
||||
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
|
||||
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
|
||||
return metaClient.getActiveTimeline()
|
||||
.filterCompletedInstants()
|
||||
.filter(instant -> isDelta ? HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) : HoodieTimeline.COMMIT_ACTION.equals(instant.getAction()))
|
||||
.nthInstant(n).map(HoodieInstant::getTimestamp)
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
|
||||
assertTrue(split.getLogPaths().isPresent());
|
||||
final String logPath = split.getLogPaths().get().get(0);
|
||||
|
||||
Reference in New Issue
Block a user