diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 31355255f..877ecbab6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -122,7 +122,7 @@ public class Pipelines { } return dataStream .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) - .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME)) + .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.PATH)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .addSink(DummySink.INSTANCE) .name("dummy"); @@ -198,7 +198,7 @@ public class Pipelines { return dataStream .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.PATH)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } @@ -259,7 +259,7 @@ public class Pipelines { TypeInformation.of(HoodieRecord.class), new BootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) - .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); + .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.PATH)); } return dataStream1; @@ -286,7 +286,7 @@ public class Pipelines { TypeInformation.of(HoodieRecord.class), new BatchBootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) - .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); + .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.PATH)); } /** @@ -328,7 +328,7 @@ public class Pipelines { BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) - .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .uid("uid_bucket_write" + conf.getString(FlinkOptions.PATH)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } else { WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); @@ -339,12 +339,12 @@ public class Pipelines { "bucket_assigner", TypeInformation.of(HoodieRecord.class), new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) - .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) + .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.PATH)) .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) - .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .uid("uid_stream_write" + conf.getString(FlinkOptions.PATH)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } }