diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 4fbcbb584..c478893d3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -93,17 +93,17 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, "bucket_assigner", TypeInformation.of(HoodieRecord.class), new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner") + .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write") + .name("uid_hoodie_stream_write") .setParallelism(numWriteTasks); if (StreamerUtil.needsScheduleCompaction(conf)) { return pipeline.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(conf)) - .uid("uid_compact_plan_generate") + .name("uid_compact_plan_generate") .setParallelism(1) // plan generate must be singleton .keyBy(event -> event.getOperation().hashCode()) .transform("compact_task", @@ -116,7 +116,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, } else { return pipeline.addSink(new CleanFunction<>(conf)) .setParallelism(1) - .name("clean_commits").uid("uid_clean_commits"); + .name("clean_commits"); } }; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 50420f905..55ec46a2e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -181,17 +181,13 @@ public class HoodieTableSource implements OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") .setParallelism(1) - .uid("uid_streaming_source") .transform("split_reader", typeInfo, factory) - .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) - .uid("uid_split_reader"); + .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); } else { InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); DataStreamSource source = execEnv.addSource(func, asSummaryString(), typeInfo); - return source.name("bounded_source") - .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)) - .uid("uid_bounded_source"); + return source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); } } };