1
0

Merge pull request #3268 from yuzhaojing/HUDI-2171

[HUDI-2171] Add parallelism conf for bootstrap operator
This commit is contained in:
Danny Chan
2021-07-14 17:01:30 +08:00
committed by GitHub
4 changed files with 35 additions and 19 deletions

View File

@@ -286,8 +286,14 @@ public class FlinkOptions {
.defaultValue(KeyGeneratorType.SIMPLE.name()) .defaultValue(KeyGeneratorType.SIMPLE.name())
.withDescription("Key generator type, that implements will extract the key out of incoming record"); .withDescription("Key generator type, that implements will extract the key out of incoming record");
public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
.key("write.index_bootstrap.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do index bootstrap, default is 4");
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("bucket_assign.tasks") .key("write.bucket_assign.tasks")
.intType() .intType()
.defaultValue(4) .defaultValue(4)
.withDescription("Parallelism of tasks that do bucket assign, default is 4"); .withDescription("Parallelism of tasks that do bucket assign, default is 4");

View File

@@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--help", "-h"}, help = true) @Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false; public Boolean help = false;
@Parameter(names = {"--index-bootstrap-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.")
public Integer indexBootstrapNum = 4;
@Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.") @Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.")
public Integer bucketAssignNum = 4; public Integer bucketAssignNum = 4;
@@ -316,6 +319,7 @@ public class FlinkStreamerConfig extends Configuration {
} else { } else {
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType); conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
} }
conf.setInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS, config.indexBootstrapNum);
conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum); conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName); conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName);

View File

@@ -52,7 +52,7 @@ import java.util.Properties;
/** /**
* An Utility which can incrementally consume data from Kafka and apply it to the target table. * An Utility which can incrementally consume data from Kafka and apply it to the target table.
* currently, it only support COW table and insert, upsert operation. * currently, it only support COW table and insert, upsert operation.
* * <p>
* note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from. * note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from.
*/ */
public class HoodieFlinkStreamer { public class HoodieFlinkStreamer {
@@ -98,9 +98,13 @@ public class HoodieFlinkStreamer {
.uid("uid_kafka_source") .uid("uid_kafka_source")
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", hoodieDataStream = hoodieDataStream.rebalance()
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf))); new ProcessOperator<>(new BootstrapFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
} }
DataStream<Object> pipeline = hoodieDataStream DataStream<Object> pipeline = hoodieDataStream
@@ -119,22 +123,22 @@ public class HoodieFlinkStreamer {
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
if (StreamerUtil.needsAsyncCompaction(conf)) { if (StreamerUtil.needsAsyncCompaction(conf)) {
pipeline.transform("compact_plan_generate", pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class), TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf)) new CompactionPlanOperator(conf))
.uid("uid_compact_plan_generate") .uid("uid_compact_plan_generate")
.setParallelism(1) // plan generate must be singleton .setParallelism(1) // plan generate must be singleton
.rebalance() .rebalance()
.transform("compact_task", .transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class), TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf))) new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf)) .addSink(new CompactionCommitSink(conf))
.name("compact_commit") .name("compact_commit")
.setParallelism(1); // compaction commit should be singleton .setParallelism(1); // compaction commit should be singleton
} else { } else {
pipeline.addSink(new CleanFunction<>(conf)) pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1) .setParallelism(1)
.name("clean_commits").uid("uid_clean_commits"); .name("clean_commits").uid("uid_clean_commits");
} }
env.execute(cfg.targetTableName); env.execute(cfg.targetTableName);

View File

@@ -80,9 +80,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// TODO: This is a very time-consuming operation, will optimization // TODO: This is a very time-consuming operation, will optimization
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance() hoodieDataStream = hoodieDataStream.rebalance()
.transform("index_bootstrap", .transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf))) new ProcessOperator<>(new BootstrapFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
} }