diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 37d10684f..8f3ddbc84 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -89,12 +89,19 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, final String writeOperation = this.conf.get(FlinkOptions.OPERATION); if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { this.conf.set(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED, this.supportsGrouping); + if (this.supportsGrouping) { + // if partition grouping is true, the input records would be sorted by the partition + // path, we need to chain the SORT operator and writer operator to keep the record + // sequence + dataStream.getTransformation() + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } BulkInsertWriteOperator.OperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType); return dataStream.transform("hoodie_bulk_insert_write", TypeInformation.of(Object.class), operatorFactory) // follow the parallelism of upstream operators to avoid shuffle - .setParallelism(dataStream.getParallelism()) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .addSink(new CleanFunction<>(conf)) .setParallelism(1) .name("clean_commits");