1
0

Use hoodie table path to be uid avoid that the same name cannot be start in one job

This commit is contained in:
v-zhangjc9
2022-07-01 09:48:06 +08:00
parent 6be03ca56a
commit 2188b8ed8a

View File

@@ -122,7 +122,7 @@ public class Pipelines {
} }
return dataStream return dataStream
.transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) .transform(opIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME)) .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.PATH))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE) .addSink(DummySink.INSTANCE)
.name("dummy"); .name("dummy");
@@ -198,7 +198,7 @@ public class Pipelines {
return dataStream return dataStream
.transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) .transform(opIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.PATH))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} }
@@ -259,7 +259,7 @@ public class Pipelines {
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new BootstrapOperator<>(conf)) new BootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.PATH));
} }
return dataStream1; return dataStream1;
@@ -286,7 +286,7 @@ public class Pipelines {
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new BatchBootstrapOperator<>(conf)) new BatchBootstrapOperator<>(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism))
.uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); .uid("uid_batch_index_bootstrap_" + conf.getString(FlinkOptions.PATH));
} }
/** /**
@@ -328,7 +328,7 @@ public class Pipelines {
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) .transform(opIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) .uid("uid_bucket_write" + conf.getString(FlinkOptions.PATH))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else { } else {
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf); WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
@@ -339,12 +339,12 @@ public class Pipelines {
"bucket_assigner", "bucket_assigner",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.PATH))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
// shuffle by fileId(bucket id) // shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId()) .keyBy(record -> record.getCurrentLocation().getFileId())
.transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) .transform(opIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .uid("uid_stream_write" + conf.getString(FlinkOptions.PATH))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} }
} }