diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b8a68e185..7254f3c56 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -286,8 +286,14 @@ public class FlinkOptions { .defaultValue(KeyGeneratorType.SIMPLE.name()) .withDescription("Key generator type, that implements will extract the key out of incoming record"); + public static final ConfigOption INDEX_BOOTSTRAP_TASKS = ConfigOptions + .key("write.index_bootstrap.tasks") + .intType() + .defaultValue(4) + .withDescription("Parallelism of tasks that do index bootstrap, default is 4"); + public static final ConfigOption BUCKET_ASSIGN_TASKS = ConfigOptions - .key("bucket_assign.tasks") + .key("write.bucket_assign.tasks") .intType() .defaultValue(4) .withDescription("Parallelism of tasks that do bucket assign, default is 4"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 83454b158..d81fd3d39 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + @Parameter(names = {"--index-bootstrap-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.") + public Integer indexBootstrapNum = 4; + @Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.") public Integer bucketAssignNum = 4; @@ -316,6 +319,7 @@ public class FlinkStreamerConfig extends Configuration { } else { conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType); } + conf.setInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS, config.indexBootstrapNum); conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum); conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index e229168f8..f8cf84085 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -52,7 +52,7 @@ import java.util.Properties; /** * An Utility which can incrementally consume data from Kafka and apply it to the target table. * currently, it only support COW table and insert, upsert operation. - * + *

* note: HoodieFlinkStreamer is not suitable to initialize on large tables when we have no checkpoint to restore from. */ public class HoodieFlinkStreamer { @@ -98,9 +98,13 @@ public class HoodieFlinkStreamer { .uid("uid_kafka_source") .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", + hoodieDataStream = hoodieDataStream.rebalance() + .transform( + "index_bootstrap", TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))); + new ProcessOperator<>(new BootstrapFunction<>(conf))) + .setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS)) + .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } DataStream pipeline = hoodieDataStream @@ -119,22 +123,22 @@ public class HoodieFlinkStreamer { .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); if (StreamerUtil.needsAsyncCompaction(conf)) { pipeline.transform("compact_plan_generate", - TypeInformation.of(CompactionPlanEvent.class), - new CompactionPlanOperator(conf)) - .uid("uid_compact_plan_generate") - .setParallelism(1) // plan generate must be singleton - .rebalance() - .transform("compact_task", - TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) - .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) - .addSink(new CompactionCommitSink(conf)) - .name("compact_commit") - .setParallelism(1); // compaction commit should be singleton + TypeInformation.of(CompactionPlanEvent.class), + new CompactionPlanOperator(conf)) + .uid("uid_compact_plan_generate") + .setParallelism(1) // plan generate must be singleton + .rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new ProcessOperator<>(new CompactFunction(conf))) + .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) + .addSink(new CompactionCommitSink(conf)) + .name("compact_commit") + .setParallelism(1); // compaction commit should be singleton } else { pipeline.addSink(new CleanFunction<>(conf)) - .setParallelism(1) - .name("clean_commits").uid("uid_clean_commits"); + .setParallelism(1) + .name("clean_commits").uid("uid_clean_commits"); } env.execute(cfg.targetTableName); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 2c39c9272..ad42d0c5e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -80,9 +80,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // TODO: This is a very time-consuming operation, will optimization if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { hoodieDataStream = hoodieDataStream.rebalance() - .transform("index_bootstrap", + .transform( + "index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))) + .setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS)) .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); }