From 43de2b470288aa3914ba712b7d2a1a806e317e53 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 13 Apr 2022 14:21:08 +0800 Subject: [PATCH] [HUDI-3868] Disable the sort input for flink streaming append mode (#5309) --- .../java/org/apache/hudi/sink/utils/Pipelines.java | 12 +++++++++++- .../java/org/apache/hudi/table/HoodieTableSink.java | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 28a669075..3b2ee3952 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -173,9 +173,19 @@ public class Pipelines { * @param conf The configuration * @param rowType The input row type * @param dataStream The input data stream + * @param bounded Whether the input stream is bounded * @return the appending data stream sink */ - public static DataStreamSink append(Configuration conf, RowType rowType, DataStream dataStream) { + public static DataStreamSink append( + Configuration conf, + RowType rowType, + DataStream dataStream, + boolean bounded) { + if (!bounded) { + // In principle, the config should be immutable, but the boundedness + // is only visible when creating the sink pipeline. + conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false); + } WriteOperatorFactory operatorFactory = AppendWriteOperator.getFactory(conf, rowType); return dataStream diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index ed99e7b4c..4dd4f89d0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -78,7 +78,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // Append mode if (OptionsResolver.isAppendMode(conf)) { - return Pipelines.append(conf, rowType, dataStream); + return Pipelines.append(conf, rowType, dataStream, context.isBounded()); } // default parallelism