diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index ddc73a594..699f078a7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -83,6 +83,12 @@ public class FlinkCompactionConfig extends Configuration { @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false) public Integer compactionMaxMemory = 100; + @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.", required = false) + public Long compactionTargetIo = 512000L; + + @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10", required = false) + public Integer compactionTasks = 10; + /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * The latter is more suitable for the table APIs. It reads all the properties @@ -100,6 +106,8 @@ public class FlinkCompactionConfig extends Configuration { conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, config.compactionDeltaCommits); conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds); conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); + conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo); + conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); // use synchronous compaction always conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 289d4f67c..3e0a4375b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -111,6 +111,9 @@ public class HoodieFlinkCompactor { return; } + // get compactionParallelism. + int compactionParallelism = Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS), compactionPlan.getOperations().size()); + env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) .name("compaction_source") .uid("uid_compaction_source") @@ -118,7 +121,7 @@ public class HoodieFlinkCompactor { .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator<>(new CompactFunction(conf))) - .setParallelism(compactionPlan.getOperations().size()) + .setParallelism(compactionParallelism) .addSink(new CompactionCommitSink(conf)) .name("clean_commits") .uid("uid_clean_commits")