[HUDI-2103] Add rebalance before index bootstrap (#3185)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
@@ -99,7 +99,7 @@ public class HoodieFlinkStreamer {
|
|||||||
.uid("uid_kafka_source")
|
.uid("uid_kafka_source")
|
||||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
|
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
|
||||||
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
||||||
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
|
hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap",
|
||||||
TypeInformation.of(HoodieRecord.class),
|
TypeInformation.of(HoodieRecord.class),
|
||||||
new ProcessOperator<>(new BootstrapFunction<>(conf)));
|
new ProcessOperator<>(new BootstrapFunction<>(conf)));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,7 +80,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
|
|
||||||
// TODO: This is a very time-consuming operation, will optimization
|
// TODO: This is a very time-consuming operation, will optimization
|
||||||
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
|
||||||
hoodieDataStream = hoodieDataStream.transform("index_bootstrap",
|
hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap",
|
||||||
TypeInformation.of(HoodieRecord.class),
|
TypeInformation.of(HoodieRecord.class),
|
||||||
new ProcessOperator<>(new BootstrapFunction<>(conf)));
|
new ProcessOperator<>(new BootstrapFunction<>(conf)));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user