[HUDI-2054] Remove the duplicate name for flink write pipeline (#3135)
This commit is contained in:
@@ -96,13 +96,11 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
// shuffle by fileId(bucket id)
|
// shuffle by fileId(bucket id)
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.name("uid_hoodie_stream_write")
|
|
||||||
.setParallelism(numWriteTasks);
|
.setParallelism(numWriteTasks);
|
||||||
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
if (StreamerUtil.needsAsyncCompaction(conf)) {
|
||||||
return pipeline.transform("compact_plan_generate",
|
return pipeline.transform("compact_plan_generate",
|
||||||
TypeInformation.of(CompactionPlanEvent.class),
|
TypeInformation.of(CompactionPlanEvent.class),
|
||||||
new CompactionPlanOperator(conf))
|
new CompactionPlanOperator(conf))
|
||||||
.name("uid_compact_plan_generate")
|
|
||||||
.setParallelism(1) // plan generate must be singleton
|
.setParallelism(1) // plan generate must be singleton
|
||||||
.rebalance()
|
.rebalance()
|
||||||
.transform("compact_task",
|
.transform("compact_task",
|
||||||
|
|||||||
Reference in New Issue
Block a user