diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 4c5d309c4..7466b413e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -369,6 +369,12 @@ public class FlinkOptions { .defaultValue(100) // default 100 MB .withDescription("Max memory in MB for compaction spillable map, default 100MB"); + public static final ConfigOption COMPACTION_TARGET_IO = ConfigOptions + .key("compaction.target_io") + .longType() + .defaultValue(5120L) // default 5 GB + .withDescription("Target IO per compaction (both read and write), default 5 GB"); + public static final ConfigOption CLEAN_ASYNC_ENABLED = ConfigOptions .key("clean.async.enabled") .booleanType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index c33d635ae..abc8d9c57 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -195,6 +195,7 @@ public class StreamerUtil { .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) + .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO)) .withInlineCompactionTriggerStrategy( CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))