diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 1af4459c3..b8a68e185 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -286,6 +286,12 @@ public class FlinkOptions { .defaultValue(KeyGeneratorType.SIMPLE.name()) .withDescription("Key generator type, that implements will extract the key out of incoming record"); + public static final ConfigOption BUCKET_ASSIGN_TASKS = ConfigOptions + .key("bucket_assign.tasks") + .intType() + .defaultValue(4) + .withDescription("Parallelism of tasks that do bucket assign, default is 4"); + public static final ConfigOption WRITE_TASKS = ConfigOptions .key("write.tasks") .intType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index c024d7c5c..83454b158 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + @Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.") + public Integer bucketAssignNum = 4; + @Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.") public Integer writeTaskNum = 4; @@ -313,6 +316,7 @@ public class FlinkStreamerConfig extends Configuration { } else { conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType); } + conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum); conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName); conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, config.indexBootstrapEnabled); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index cc5f6c030..e229168f8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -82,7 +82,6 @@ public class HoodieFlinkStreamer { (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) .getLogicalType(); Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); - int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS); StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); @@ -111,12 +110,13 @@ public class HoodieFlinkStreamer { "bucket_assigner", TypeInformation.of(HoodieRecord.class), new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) .uid("uid_bucket_assigner") // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write") - .setParallelism(numWriteTask); + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); if (StreamerUtil.needsAsyncCompaction(conf)) { pipeline.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 161fb21e7..2c39c9272 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -69,7 +69,6 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, return (DataStreamSinkProvider) dataStream -> { // Read from kafka source RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType(); - int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS); long ckpTimeout = dataStream.getExecutionEnvironment() .getCheckpointConfig().getCheckpointTimeout(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); @@ -94,11 +93,12 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, "bucket_assigner", TypeInformation.of(HoodieRecord.class), new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) + .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .setParallelism(numWriteTasks); + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); if (StreamerUtil.needsAsyncCompaction(conf)) { return pipeline.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class),