1
0

[HUDI-2085] Support specify compaction paralleism and compaction target io for flink batch compaction (#3169)

This commit is contained in:
swuferhong
2021-06-29 22:53:01 +08:00
committed by GitHub
parent 5a7d1b3d6c
commit f665db071f
2 changed files with 12 additions and 1 deletions

View File

@@ -83,6 +83,12 @@ public class FlinkCompactionConfig extends Configuration {
@Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false) @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false)
public Integer compactionMaxMemory = 100; public Integer compactionMaxMemory = 100;
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write) for batching compaction, default 512000M.", required = false)
public Long compactionTargetIo = 512000L;
@Parameter(names = {"--compaction-tasks"}, description = "Parallelism of tasks that do actual compaction, default is 10", required = false)
public Integer compactionTasks = 10;
/** /**
* Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}.
* The latter is more suitable for the table APIs. It reads all the properties * The latter is more suitable for the table APIs. It reads all the properties
@@ -100,6 +106,8 @@ public class FlinkCompactionConfig extends Configuration {
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, config.compactionDeltaCommits); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, config.compactionDeltaCommits);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds); conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds);
conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory);
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, config.compactionTargetIo);
conf.setInteger(FlinkOptions.COMPACTION_TASKS, config.compactionTasks);
conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable);
// use synchronous compaction always // use synchronous compaction always
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);

View File

@@ -111,6 +111,9 @@ public class HoodieFlinkCompactor {
return; return;
} }
// get compactionParallelism.
int compactionParallelism = Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS), compactionPlan.getOperations().size());
env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime))
.name("compaction_source") .name("compaction_source")
.uid("uid_compaction_source") .uid("uid_compaction_source")
@@ -118,7 +121,7 @@ public class HoodieFlinkCompactor {
.transform("compact_task", .transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class), TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf))) new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionPlan.getOperations().size()) .setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf)) .addSink(new CompactionCommitSink(conf))
.name("clean_commits") .name("clean_commits")
.uid("uid_clean_commits") .uid("uid_clean_commits")