1
0

[HUDI-2185] Remove the default parallelism of index bootstrap and bucket assigner

This commit is contained in:
yuzhao.cyz
2021-07-16 14:22:53 +08:00
committed by Danny Chan
parent 3b264e80d9
commit c8aaf00819
4 changed files with 20 additions and 15 deletions

View File

@@ -290,13 +290,13 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
.key("write.index_bootstrap.tasks") .key("write.index_bootstrap.tasks")
.intType() .intType()
.defaultValue(4) .noDefaultValue()
.withDescription("Parallelism of tasks that do index bootstrap, default is 4"); .withDescription("Parallelism of tasks that do index bootstrap, default is 4");
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
.key("write.bucket_assign.tasks") .key("write.bucket_assign.tasks")
.intType() .intType()
.defaultValue(4) .noDefaultValue()
.withDescription("Parallelism of tasks that do bucket assign, default is 4"); .withDescription("Parallelism of tasks that do bucket assign, default is 4");
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions

View File

@@ -84,7 +84,12 @@ public class HoodieFlinkStreamer {
RowType rowType = RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType(); .getLogicalType();
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
int parallelism = env.getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf); new StreamWriteOperatorFactory<>(conf);
@@ -107,31 +112,31 @@ public class HoodieFlinkStreamer {
} }
} }
DataStream<HoodieRecord> hoodieDataStream = dataStream.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); DataStream<HoodieRecord> dataStream2 = dataStream.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance() dataStream2 = dataStream2.rebalance()
.transform( .transform(
"index_bootstrap", "index_bootstrap",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf))) new ProcessOperator<>(new BootstrapFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
} }
DataStream<Object> pipeline = hoodieDataStream DataStream<Object> pipeline = dataStream2
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey) .keyBy(HoodieRecord::getRecordKey)
.transform( .transform(
"bucket_assigner", "bucket_assigner",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) .uid("uid_bucket_assigner" + conf.getString(FlinkOptions.TABLE_NAME))
.uid("uid_bucket_assigner") .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
// shuffle by fileId(bucket id) // shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId()) .keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write") .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
if (StreamerUtil.needsAsyncCompaction(conf)) { if (StreamerUtil.needsAsyncCompaction(conf)) {
pipeline.transform("compact_plan_generate", pipeline.transform("compact_plan_generate",

View File

@@ -71,6 +71,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType(); RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
long ckpTimeout = dataStream.getExecutionEnvironment() long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout(); .getCheckpointConfig().getCheckpointTimeout();
int parallelism = dataStream.getExecutionConfig().getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf); StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
@@ -84,7 +85,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
"index_bootstrap", "index_bootstrap",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf))) new ProcessOperator<>(new BootstrapFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
} }
@@ -95,11 +96,12 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
"bucket_assigner", "bucket_assigner",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
// shuffle by fileId(bucket id) // shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId()) .keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
if (StreamerUtil.needsAsyncCompaction(conf)) { if (StreamerUtil.needsAsyncCompaction(conf)) {
return pipeline.transform("compact_plan_generate", return pipeline.transform("compact_plan_generate",

View File

@@ -69,10 +69,8 @@ public class TestConfigurations {
+ "with (\n" + "with (\n"
+ " 'connector' = 'hudi'"; + " 'connector' = 'hudi'";
StringBuilder builder = new StringBuilder(createTable); StringBuilder builder = new StringBuilder(createTable);
if (options.size() != 0) { options.forEach((k, v) -> builder.append(",\n")
options.forEach((k, v) -> builder.append(",\n") .append(" '").append(k).append("' = '").append(v).append("'"));
.append(" '").append(k).append("' = '").append(v).append("'"));
}
builder.append("\n)"); builder.append("\n)");
return builder.toString(); return builder.toString();
} }