From 1cbf43b6e7731c68661c74bdfa41399f6398172f Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Wed, 30 Jun 2021 16:40:55 +0800 Subject: [PATCH] [HUDI-2103] Add rebalance before index bootstrap (#3185) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 喻兆靖 --- .../main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java | 2 +- .../src/main/java/org/apache/hudi/table/HoodieTableSink.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index bee5fe641..cc5f6c030 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -99,7 +99,7 @@ public class HoodieFlinkStreamer { .uid("uid_kafka_source") .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index d648bc081..a8e38a4aa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -80,7 +80,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // TODO: This is a very time-consuming operation, will optimization if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))); }