diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index bee5fe641..cc5f6c030 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -99,7 +99,7 @@ public class HoodieFlinkStreamer { .uid("uid_kafka_source") .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index d648bc081..a8e38a4aa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -80,7 +80,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // TODO: This is a very time-consuming operation, will optimization if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.transform("index_bootstrap", + hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))); }