1
0

[HUDI-2015] Fix flink operator uid to allow multiple pipelines in one job (#3091)

This commit is contained in:
Danny Chan
2021-06-17 09:08:19 +08:00
committed by GitHub
parent 5ce64a81bd
commit 0b57483a8e
2 changed files with 6 additions and 10 deletions

View File

@@ -93,17 +93,17 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner")
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write")
.name("uid_hoodie_stream_write")
.setParallelism(numWriteTasks);
if (StreamerUtil.needsScheduleCompaction(conf)) {
return pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.uid("uid_compact_plan_generate")
.name("uid_compact_plan_generate")
.setParallelism(1) // plan generate must be singleton
.keyBy(event -> event.getOperation().hashCode())
.transform("compact_task",
@@ -116,7 +116,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
} else {
return pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits").uid("uid_clean_commits");
.name("clean_commits");
}
};
}

View File

@@ -181,17 +181,13 @@ public class HoodieTableSource implements
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
.setParallelism(1)
.uid("uid_streaming_source")
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
.uid("uid_split_reader");
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
} else {
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
return source.name("bounded_source")
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
.uid("uid_bounded_source");
return source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
}
}
};