[HUDI-2241] Explicit parallelism for flink bulk insert (#3357)
This commit is contained in:
@@ -89,12 +89,19 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
||||
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
|
||||
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
|
||||
this.conf.set(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED, this.supportsGrouping);
|
||||
if (this.supportsGrouping) {
|
||||
// if partition grouping is true, the input records would be sorted by the partition
|
||||
// path, we need to chain the SORT operator and writer operator to keep the record
|
||||
// sequence
|
||||
dataStream.getTransformation()
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||
}
|
||||
BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
|
||||
return dataStream.transform("hoodie_bulk_insert_write",
|
||||
TypeInformation.of(Object.class),
|
||||
operatorFactory)
|
||||
// follow the parallelism of upstream operators to avoid shuffle
|
||||
.setParallelism(dataStream.getParallelism())
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
||||
.addSink(new CleanFunction<>(conf))
|
||||
.setParallelism(1)
|
||||
.name("clean_commits");
|
||||
|
||||
Reference in New Issue
Block a user