From efbbb67420b7082a3960f3d32215dabd959f5525 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 29 Jul 2021 09:57:37 +0800 Subject: [PATCH] [HUDI-2241] Explicit parallelism for flink bulk insert (#3357) --- .../main/java/org/apache/hudi/table/HoodieTableSink.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 37d10684f..8f3ddbc84 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -89,12 +89,19 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, final String writeOperation = this.conf.get(FlinkOptions.OPERATION); if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { this.conf.set(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED, this.supportsGrouping); + if (this.supportsGrouping) { + // if partition grouping is true, the input records would be sorted by the partition + // path, we need to chain the SORT operator and writer operator to keep the record + // sequence + dataStream.getTransformation() + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } BulkInsertWriteOperator.OperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType); return dataStream.transform("hoodie_bulk_insert_write", TypeInformation.of(Object.class), operatorFactory) // follow the parallelism of upstream operators to avoid shuffle - .setParallelism(dataStream.getParallelism()) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .addSink(new CleanFunction<>(conf)) .setParallelism(1) .name("clean_commits");