[HUDI-2106] Fix flink batch compaction bug while user don't set compaction tasks (#3192)
This commit is contained in:
@@ -86,8 +86,8 @@ public class FlinkCompactionConfig extends Configuration {
|
||||
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.", required = false)
|
||||
public Long compactionTargetIo = 512000L;
|
||||
|
||||
@Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10", required = false)
|
||||
public Integer compactionTasks = 10;
|
||||
@Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is -1", required = false)
|
||||
public Integer compactionTasks = -1;
|
||||
|
||||
/**
|
||||
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
|
||||
|
||||
@@ -112,7 +112,8 @@ public class HoodieFlinkCompactor {
|
||||
}
|
||||
|
||||
// get compactionParallelism.
|
||||
int compactionParallelism = Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS), compactionPlan.getOperations().size());
|
||||
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
|
||||
? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
|
||||
|
||||
env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
|
||||
.name("compaction_source")
|
||||
|
||||
Reference in New Issue
Block a user