diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 4e193fab2..f0dbffd47 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -180,7 +180,7 @@ public class HoodieTableSource implements conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); - SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "split_monitor") + SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); @@ -188,7 +188,7 @@ public class HoodieTableSource implements } else { InputFormatSourceFunction func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo); DataStreamSource source = execEnv.addSource(func, asSummaryString(), typeInfo); - return source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); + return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); } } }; @@ -266,6 +266,21 @@ public class HoodieTableSource implements return requiredPartitions; } + private String getSourceOperatorName(String operatorName) { + String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]); + List fields = Arrays.stream(this.requiredPos) + .mapToObj(i -> schemaFieldNames[i]) + .collect(Collectors.toList()); + StringBuilder sb = new StringBuilder(); + sb.append(operatorName) + .append("(") + .append("table=").append(Collections.singletonList(conf.getString(FlinkOptions.TABLE_NAME))) + .append(", ") + .append("fields=").append(fields) + .append(")"); + return sb.toString(); + } + @Nullable private Set getRequiredPartitionPaths() { if (this.requiredPartitions == null) {