1
0

[MINOR] Show source table operator details on the flink web when reading hudi table (#3842)

This commit is contained in:
mincwang
2021-10-24 23:18:01 +08:00
committed by GitHub
parent c9d641cc30
commit 91845e241d

View File

@@ -180,7 +180,7 @@ public class HoodieTableSource implements
conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "split_monitor")
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.setParallelism(1)
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
@@ -188,7 +188,7 @@ public class HoodieTableSource implements
} else {
InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
DataStreamSource<RowData> source = execEnv.addSource(func, asSummaryString(), typeInfo);
return source.name("bounded_source").setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return source.name(getSourceOperatorName("bounded_source")).setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
}
}
};
@@ -266,6 +266,21 @@ public class HoodieTableSource implements
return requiredPartitions;
}
private String getSourceOperatorName(String operatorName) {
String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]);
List<String> fields = Arrays.stream(this.requiredPos)
.mapToObj(i -> schemaFieldNames[i])
.collect(Collectors.toList());
StringBuilder sb = new StringBuilder();
sb.append(operatorName)
.append("(")
.append("table=").append(Collections.singletonList(conf.getString(FlinkOptions.TABLE_NAME)))
.append(", ")
.append("fields=").append(fields)
.append(")");
return sb.toString();
}
@Nullable
private Set<String> getRequiredPartitionPaths() {
if (this.requiredPartitions == null) {