[HUDI-4139]improvement for flink write operator name to identify tables easily (#5744)
Co-authored-by: yanenze <yanenze@keytop.com.cn>
This commit is contained in:
@@ -114,7 +114,7 @@ public class Pipelines {
|
|||||||
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
|
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
|
||||||
}
|
}
|
||||||
return dataStream
|
return dataStream
|
||||||
.transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
|
.transform(writeOpIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
||||||
.addSink(DummySink.INSTANCE)
|
.addSink(DummySink.INSTANCE)
|
||||||
@@ -146,7 +146,7 @@ public class Pipelines {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dataStream
|
return dataStream
|
||||||
.transform("hoodie_bulk_insert_write",
|
.transform(writeOpIdentifier("hoodie_bulk_insert_write", conf),
|
||||||
TypeInformation.of(Object.class),
|
TypeInformation.of(Object.class),
|
||||||
operatorFactory)
|
operatorFactory)
|
||||||
// follow the parallelism of upstream operators to avoid shuffle
|
// follow the parallelism of upstream operators to avoid shuffle
|
||||||
@@ -190,7 +190,7 @@ public class Pipelines {
|
|||||||
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
|
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
|
||||||
|
|
||||||
return dataStream
|
return dataStream
|
||||||
.transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform(writeOpIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
||||||
.addSink(DummySink.INSTANCE)
|
.addSink(DummySink.INSTANCE)
|
||||||
@@ -322,7 +322,7 @@ public class Pipelines {
|
|||||||
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||||
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
|
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
|
||||||
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
|
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
|
||||||
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform(writeOpIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||||
} else {
|
} else {
|
||||||
@@ -338,7 +338,7 @@ public class Pipelines {
|
|||||||
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
|
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
|
||||||
// shuffle by fileId(bucket id)
|
// shuffle by fileId(bucket id)
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||||
.transform("stream_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform(writeOpIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||||
}
|
}
|
||||||
@@ -385,6 +385,10 @@ public class Pipelines {
|
|||||||
.name("clean_commits");
|
.name("clean_commits");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String writeOpIdentifier(String operatorN, Configuration conf) {
|
||||||
|
return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dummy sink that does nothing.
|
* Dummy sink that does nothing.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user