diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 96e038241..d97550257 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -290,13 +290,13 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption INDEX_BOOTSTRAP_TASKS = ConfigOptions .key("write.index_bootstrap.tasks") .intType() - .defaultValue(4) + .noDefaultValue() .withDescription("Parallelism of tasks that do index bootstrap, default is 4"); public static final ConfigOption BUCKET_ASSIGN_TASKS = ConfigOptions .key("write.bucket_assign.tasks") .intType() - .defaultValue(4) + .noDefaultValue() .withDescription("Parallelism of tasks that do bucket assign, default is 4"); public static final ConfigOption WRITE_TASKS = ConfigOptions diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 60f481d29..b2b6b54d9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -84,7 +84,12 @@ public class HoodieFlinkStreamer { RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) .getLogicalType(); + Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); + long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); + int parallelism = env.getParallelism(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); @@ -107,31 +112,31 @@ public class HoodieFlinkStreamer { } } - DataStream hoodieDataStream = dataStream.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + DataStream dataStream2 = dataStream.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.rebalance() + dataStream2 = dataStream2.rebalance() .transform( "index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))) - .setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS)) + .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism)) .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } - DataStream pipeline = hoodieDataStream + DataStream pipeline = dataStream2 // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) - .uid("uid_bucket_assigner") + .uid("uid_bucket_assigner" + conf.getString(FlinkOptions.TABLE_NAME)) + .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write") + .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); if (StreamerUtil.needsAsyncCompaction(conf)) { pipeline.transform("compact_plan_generate", diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index ad42d0c5e..8ad4127a8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -71,6 +71,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType(); long ckpTimeout = dataStream.getExecutionEnvironment() .getCheckpointConfig().getCheckpointTimeout(); + int parallelism = dataStream.getExecutionConfig().getParallelism(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); @@ -84,7 +85,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, "index_bootstrap", TypeInformation.of(HoodieRecord.class), new ProcessOperator<>(new BootstrapFunction<>(conf))) - .setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS)) + .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism)) .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } @@ -95,11 +96,12 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, "bucket_assigner", TypeInformation.of(HoodieRecord.class), new BucketAssignOperator<>(new BucketAssignFunction<>(conf))) - .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) + .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); if (StreamerUtil.needsAsyncCompaction(conf)) { return pipeline.transform("compact_plan_generate", diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index d0ff469c2..6096b4f81 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -69,10 +69,8 @@ public class TestConfigurations { + "with (\n" + " 'connector' = 'hudi'"; StringBuilder builder = new StringBuilder(createTable); - if (options.size() != 0) { - options.forEach((k, v) -> builder.append(",\n") - .append(" '").append(k).append("' = '").append(v).append("'")); - } + options.forEach((k, v) -> builder.append(",\n") + .append(" '").append(k).append("' = '").append(v).append("'")); builder.append("\n)"); return builder.toString(); }