1
0

[HUDI-2171] Add parallelism conf for bootstrap operator

This commit is contained in:
喻兆靖
2021-07-13 17:55:12 +08:00
parent b0089b894a
commit aff1a1ed29
4 changed files with 35 additions and 19 deletions

View File

@@ -286,8 +286,14 @@ public class FlinkOptions {
.defaultValue(KeyGeneratorType.SIMPLE.name())
.withDescription("Key generator type, that implements will extract the key out of incoming record");
public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
.key("write.index_bootstrap.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do index bootstrap, default is 4");
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("bucket_assign.tasks")
.key("write.bucket_assign.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do bucket assign, default is 4");

View File

@@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--index-bootstrap-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.")
public Integer indexBootstrapNum = 4;
@Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.")
public Integer bucketAssignNum = 4;
@@ -316,6 +319,7 @@ public class FlinkStreamerConfig extends Configuration {
} else {
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
}
conf.setInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS, config.indexBootstrapNum);
conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName);

View File

@@ -52,7 +52,7 @@ import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation.
*
* <p>
* note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from.
*/
public class HoodieFlinkStreamer {
@@ -98,9 +98,13 @@ public class HoodieFlinkStreamer {
.uid("uid_kafka_source")
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap",
hoodieDataStream = hoodieDataStream.rebalance()
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)));
new ProcessOperator<>(new BootstrapFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
DataStream<Object> pipeline = hoodieDataStream
@@ -119,22 +123,22 @@ public class HoodieFlinkStreamer {
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
if (StreamerUtil.needsAsyncCompaction(conf)) {
pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.uid("uid_compact_plan_generate")
.setParallelism(1) // plan generate must be singleton
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.uid("uid_compact_plan_generate")
.setParallelism(1) // plan generate must be singleton
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
} else {
pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits").uid("uid_clean_commits");
.setParallelism(1)
.name("clean_commits").uid("uid_clean_commits");
}
env.execute(cfg.targetTableName);

View File

@@ -80,9 +80,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// TODO: This is a very time-consuming operation, will optimization
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance()
.transform("index_bootstrap",
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}