[HUDI-2142] Support setting bucket assign parallelism for flink write task (#3239)
This commit is contained in:
@@ -286,6 +286,12 @@ public class FlinkOptions {
|
|||||||
.defaultValue(KeyGeneratorType.SIMPLE.name())
|
.defaultValue(KeyGeneratorType.SIMPLE.name())
|
||||||
.withDescription("Key generator type, that implements will extract the key out of incoming record");
|
.withDescription("Key generator type, that implements will extract the key out of incoming record");
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
|
||||||
|
.key("bucket_assign.tasks")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(4)
|
||||||
|
.withDescription("Parallelism of tasks that do bucket assign, default is 4");
|
||||||
|
|
||||||
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
|
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
|
||||||
.key("write.tasks")
|
.key("write.tasks")
|
||||||
.intType()
|
.intType()
|
||||||
|
|||||||
@@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
@Parameter(names = {"--help", "-h"}, help = true)
|
@Parameter(names = {"--help", "-h"}, help = true)
|
||||||
public Boolean help = false;
|
public Boolean help = false;
|
||||||
|
|
||||||
|
@Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of tasks that do bucket assign, default is 4.")
|
||||||
|
public Integer bucketAssignNum = 4;
|
||||||
|
|
||||||
@Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.")
|
@Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.")
|
||||||
public Integer writeTaskNum = 4;
|
public Integer writeTaskNum = 4;
|
||||||
|
|
||||||
@@ -313,6 +316,7 @@ public class FlinkStreamerConfig extends Configuration {
|
|||||||
} else {
|
} else {
|
||||||
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
|
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
|
||||||
}
|
}
|
||||||
|
conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum);
|
||||||
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
|
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
|
||||||
conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName);
|
conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, config.partitionDefaultName);
|
||||||
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, config.indexBootstrapEnabled);
|
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, config.indexBootstrapEnabled);
|
||||||
|
|||||||
@@ -82,7 +82,6 @@ public class HoodieFlinkStreamer {
|
|||||||
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
|
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
|
||||||
.getLogicalType();
|
.getLogicalType();
|
||||||
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
|
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
|
||||||
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
|
|
||||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
|
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
|
||||||
new StreamWriteOperatorFactory<>(conf);
|
new StreamWriteOperatorFactory<>(conf);
|
||||||
|
|
||||||
@@ -111,12 +110,13 @@ public class HoodieFlinkStreamer {
|
|||||||
"bucket_assigner",
|
"bucket_assigner",
|
||||||
TypeInformation.of(HoodieRecord.class),
|
TypeInformation.of(HoodieRecord.class),
|
||||||
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
|
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
|
||||||
|
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
|
||||||
.uid("uid_bucket_assigner")
|
.uid("uid_bucket_assigner")
|
||||||
// shuffle by fileId(bucket id)
|
// shuffle by fileId(bucket id)
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_hoodie_stream_write")
|
.uid("uid_hoodie_stream_write")
|
||||||
.setParallelism(numWriteTask);
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||||
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||||
pipeline.transform("compact_plan_generate",
|
pipeline.transform("compact_plan_generate",
|
||||||
TypeInformation.of(CompactionPlanEvent.class),
|
TypeInformation.of(CompactionPlanEvent.class),
|
||||||
|
|||||||
@@ -69,7 +69,6 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
return (DataStreamSinkProvider) dataStream -> {
|
return (DataStreamSinkProvider) dataStream -> {
|
||||||
// Read from kafka source
|
// Read from kafka source
|
||||||
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
|
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
|
||||||
int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
|
|
||||||
long ckpTimeout = dataStream.getExecutionEnvironment()
|
long ckpTimeout = dataStream.getExecutionEnvironment()
|
||||||
.getCheckpointConfig().getCheckpointTimeout();
|
.getCheckpointConfig().getCheckpointTimeout();
|
||||||
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
|
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
|
||||||
@@ -94,11 +93,12 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
"bucket_assigner",
|
"bucket_assigner",
|
||||||
TypeInformation.of(HoodieRecord.class),
|
TypeInformation.of(HoodieRecord.class),
|
||||||
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
|
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
|
||||||
|
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
|
||||||
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
// shuffle by fileId(bucket id)
|
// shuffle by fileId(bucket id)
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.setParallelism(numWriteTasks);
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||||
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||||
return pipeline.transform("compact_plan_generate",
|
return pipeline.transform("compact_plan_generate",
|
||||||
TypeInformation.of(CompactionPlanEvent.class),
|
TypeInformation.of(CompactionPlanEvent.class),
|
||||||
|
|||||||
Reference in New Issue
Block a user