From d424fe6072ef74e1e2bdb290aa584bff9eca113f Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 2 Jul 2021 19:44:32 +0800 Subject: [PATCH] [HUDI-2121] Add operator uid for flink stateful operators (#3212) --- .../main/java/org/apache/hudi/table/HoodieTableSink.java | 8 +++++--- .../java/org/apache/hudi/table/HoodieTableSource.java | 2 ++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index a8e38a4aa..161fb21e7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -80,9 +80,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // TODO: This is a very time-consuming operation, will optimization if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.rebalance().transform("index_bootstrap", - TypeInformation.of(HoodieRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))); + hoodieDataStream = hoodieDataStream.rebalance() + .transform("index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new ProcessOperator<>(new BootstrapFunction<>(conf))) + .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } DataStream pipeline = hoodieDataStream diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index c8f6e2a78..af2327df5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -179,8 +179,10 @@ public class HoodieTableSource implements } OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") + .uid("uid_streaming_source_" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(1) .transform("split_reader", typeInfo, factory) + .uid("uid_split_reader_" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else {