From 7fa2f8ea827257e0320f670e5f22a114a52933cf Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 2 Jun 2021 10:10:35 +0800 Subject: [PATCH] [HUDI-1921] Add target io option for flink compaction (#2980) --- .../java/org/apache/hudi/configuration/FlinkOptions.java | 6 ++++++ .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 1 + 2 files changed, 7 insertions(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 4c5d309c4..7466b413e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -369,6 +369,12 @@ public class FlinkOptions { .defaultValue(100) // default 100 MB .withDescription("Max memory in MB for compaction spillable map, default 100MB"); + public static final ConfigOption COMPACTION_TARGET_IO = ConfigOptions + .key("compaction.target_io") + .longType() + .defaultValue(5120L) // default 5 GB + .withDescription("Target IO per compaction (both read and write), default 5 GB"); + public static final ConfigOption CLEAN_ASYNC_ENABLED = ConfigOptions .key("clean.async.enabled") .booleanType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index c33d635ae..abc8d9c57 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -195,6 +195,7 @@ public class StreamerUtil { .withCompactionConfig( HoodieCompactionConfig.newBuilder() .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS)) + .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO)) .withInlineCompactionTriggerStrategy( CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))