From bda3db078e927421c10932cfcb3019cfddb125b6 Mon Sep 17 00:00:00 2001 From: hehuiyuan <471627698@qq.com> Date: Sun, 24 Apr 2022 19:09:39 +0800 Subject: [PATCH] support generan parameter 'sink.parallelism' for flink-hudi (#5405) Co-authored-by: hehuiyuan1 --- .../main/java/org/apache/hudi/configuration/FlinkOptions.java | 3 +++ .../src/main/java/org/apache/hudi/table/HoodieTableSink.java | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index e2be7d364..c6b16e6ec 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -34,6 +34,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.factories.FactoryUtil; import java.lang.reflect.Field; import java.util.ArrayList; @@ -232,6 +233,8 @@ public class FlinkOptions extends HoodieConfig { // ------------------------------------------------------------------------ // Write Options // ------------------------------------------------------------------------ + public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + public static final ConfigOption TABLE_NAME = ConfigOptions .key(HoodieWriteConfig.TBL_NAME.key()) .stringType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 4dd4f89d0..46e360d3a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -82,7 +82,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, } // default parallelism - int parallelism = dataStream.getExecutionConfig().getParallelism(); + int parallelism = conf.getInteger(FlinkOptions.SINK_PARALLELISM, + dataStream.getExecutionConfig().getParallelism()); DataStream pipeline; // bootstrap final DataStream hoodieRecordDataStream =