1
0

[HUDI-2121] Add operator uid for flink stateful operators (#3212)

This commit is contained in:
Danny Chan
2021-07-02 19:44:32 +08:00
committed by GitHub
parent ac65189458
commit d424fe6072
2 changed files with 7 additions and 3 deletions

View File

@@ -80,9 +80,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// TODO: This is a very time-consuming operation, will optimization // TODO: This is a very time-consuming operation, will optimization
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", hoodieDataStream = hoodieDataStream.rebalance()
TypeInformation.of(HoodieRecord.class), .transform("index_bootstrap",
new ProcessOperator<>(new BootstrapFunction<>(conf))); TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
} }
DataStream<Object> pipeline = hoodieDataStream DataStream<Object> pipeline = hoodieDataStream

View File

@@ -179,8 +179,10 @@ public class HoodieTableSource implements
} }
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source") SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
.uid("uid_streaming_source_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(1) .setParallelism(1)
.transform("split_reader", typeInfo, factory) .transform("split_reader", typeInfo, factory)
.uid("uid_split_reader_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source); return new DataStreamSource<>(source);
} else { } else {