From bc313727e3e89640edad85364022e057c9864ee9 Mon Sep 17 00:00:00 2001 From: swuferhong <337361684@qq.com> Date: Tue, 6 Jul 2021 09:10:37 +0800 Subject: [PATCH] [HUDI-2106] Fix flink batch compaction bug while user don't set compaction tasks (#3192) --- .../org/apache/hudi/sink/compact/FlinkCompactionConfig.java | 4 ++-- .../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 699f078a7..26ad824ce 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -86,8 +86,8 @@ public class FlinkCompactionConfig extends Configuration { @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.", required = false) public Long compactionTargetIo = 512000L; - @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10", required = false) - public Integer compactionTasks = 10; + @Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1", required = false) + public Integer compactionTasks = -1; /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 3e0a4375b..8ee6c111e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -112,7 +112,8 @@ public class HoodieFlinkCompactor { } // get compactionParallelism. - int compactionParallelism = Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS), compactionPlan.getOperations().size()); + int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 + ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) .name("compaction_source")