[HUDI-2804] Add option to skip compaction instants for streaming read (#4051)
This commit is contained in:
@@ -196,6 +196,17 @@ public class FlinkOptions extends HoodieConfig {
|
||||
.defaultValue(60)// default 1 minute
|
||||
.withDescription("Check interval for streaming read of SECOND, default 1 minute");
|
||||
|
||||
// this option is experimental
|
||||
public static final ConfigOption<Boolean> READ_STREAMING_SKIP_COMPACT = ConfigOptions
|
||||
.key("read.streaming.skip_compaction")
|
||||
.booleanType()
|
||||
.defaultValue(false)// default read as batch
|
||||
.withDescription("Whether to skip compaction instants for streaming read,\n"
|
||||
+ "there are two cases that this option can be used to avoid reading duplicates:\n"
|
||||
+ "1) you are definitely sure that the consumer reads faster than any compaction instants, "
|
||||
+ "usually with delta time compaction strategy that is long enough, for e.g, one week;\n"
|
||||
+ "2) changelog mode is enabled, this option is a solution to keep data integrity");
|
||||
|
||||
public static final String START_COMMIT_EARLIEST = "earliest";
|
||||
public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
|
||||
.key("read.start-commit")
|
||||
|
||||
@@ -80,16 +80,20 @@ public class IncrementalInputSplits implements Serializable {
|
||||
private final long maxCompactionMemoryInBytes;
|
||||
// for partition pruning
|
||||
private final Set<String> requiredPartitions;
|
||||
// skip compaction
|
||||
private final boolean skipCompaction;
|
||||
|
||||
private IncrementalInputSplits(
|
||||
Configuration conf,
|
||||
Path path,
|
||||
long maxCompactionMemoryInBytes,
|
||||
@Nullable Set<String> requiredPartitions) {
|
||||
@Nullable Set<String> requiredPartitions,
|
||||
boolean skipCompaction) {
|
||||
this.conf = conf;
|
||||
this.path = path;
|
||||
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
|
||||
this.requiredPartitions = requiredPartitions;
|
||||
this.skipCompaction = skipCompaction;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -262,7 +266,7 @@ public class IncrementalInputSplits implements Serializable {
|
||||
final String startTs = archivedCompleteTimeline.firstInstant().get().getTimestamp();
|
||||
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
|
||||
}
|
||||
return instantStream
|
||||
return maySkipCompaction(instantStream)
|
||||
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, archivedTimeline)).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
@@ -299,7 +303,13 @@ public class IncrementalInputSplits implements Serializable {
|
||||
final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
|
||||
instantStream = instantStream.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, endCommit));
|
||||
}
|
||||
return instantStream.collect(Collectors.toList());
|
||||
return maySkipCompaction(instantStream).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private Stream<HoodieInstant> maySkipCompaction(Stream<HoodieInstant> instants) {
|
||||
return this.skipCompaction
|
||||
? instants.filter(instant -> !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION))
|
||||
: instants;
|
||||
}
|
||||
|
||||
private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
|
||||
@@ -352,6 +362,8 @@ public class IncrementalInputSplits implements Serializable {
|
||||
private long maxCompactionMemoryInBytes;
|
||||
// for partition pruning
|
||||
private Set<String> requiredPartitions;
|
||||
// skip compaction
|
||||
private boolean skipCompaction = false;
|
||||
|
||||
public Builder() {
|
||||
}
|
||||
@@ -376,9 +388,14 @@ public class IncrementalInputSplits implements Serializable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder skipCompaction(boolean skipCompaction) {
|
||||
this.skipCompaction = skipCompaction;
|
||||
return this;
|
||||
}
|
||||
|
||||
public IncrementalInputSplits build() {
|
||||
return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path),
|
||||
this.maxCompactionMemoryInBytes, this.requiredPartitions);
|
||||
this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,7 +107,9 @@ public class StreamReadMonitoringFunction
|
||||
.conf(conf)
|
||||
.path(path)
|
||||
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
|
||||
.requiredPartitions(requiredPartitionPaths).build();
|
||||
.requiredPartitions(requiredPartitionPaths)
|
||||
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -242,6 +242,28 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStreamWriteReadSkippingCompaction() throws Exception {
|
||||
// create filesystem table named source
|
||||
String createSource = TestConfigurations.getFileSourceDDL("source");
|
||||
streamTableEnv.executeSql(createSource);
|
||||
|
||||
String hoodieTableDDL = sql("t1")
|
||||
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
|
||||
.option(FlinkOptions.READ_AS_STREAMING, true)
|
||||
.option(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true)
|
||||
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
|
||||
.option(FlinkOptions.COMPACTION_TASKS, 1)
|
||||
.end();
|
||||
streamTableEnv.executeSql(hoodieTableDDL);
|
||||
String insertInto = "insert into t1 select * from source";
|
||||
execInsertSql(streamTableEnv, insertInto);
|
||||
|
||||
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
|
||||
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testStreamWriteWithCleaning() {
|
||||
// create filesystem table named source
|
||||
|
||||
Reference in New Issue
Block a user