diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index c736821ce..f5f25bc15 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -458,8 +458,8 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption COMPACTION_TASKS = ConfigOptions .key("compaction.tasks") .intType() - .defaultValue(10) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.5 (assumes two commits generate one bucket) - .withDescription("Parallelism of tasks that do actual compaction, default is 10"); + .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket) + .withDescription("Parallelism of tasks that do actual compaction, default is 4"); public static final String NUM_COMMITS = "num_commits"; public static final String TIME_ELAPSED = "time_elapsed"; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index e0cbab602..4bc8ae27f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -181,7 +181,7 @@ public class BulkInsertWriterHelper { return getHoodieWriteStatuses().stream() .map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList()); } catch (IOException e) { - throw new HoodieException("Error collect the write status for task [" + taskID + "]"); + throw new HoodieException("Error collect the write status for task [" + taskID + "]", e); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 121118877..4f8036620 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -83,7 +83,8 @@ public class Pipelines { operatorFactory) // follow the parallelism of upstream operators to avoid shuffle .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) - .addSink(DummySink.INSTANCE); + .addSink(DummySink.INSTANCE) + .name("dummy"); } public static DataStreamSink append(Configuration conf, RowType rowType, DataStream dataStream) { @@ -93,7 +94,8 @@ public class Pipelines { .transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) - .addSink(DummySink.INSTANCE); + .addSink(DummySink.INSTANCE) + .name("dummy"); } public static DataStream bootstrap( diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 2fdd0fd68..c6432e5b5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -72,18 +72,18 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // bulk_insert mode final String writeOperation = this.conf.get(FlinkOptions.OPERATION); if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { - return Pipelines.bulkInsert(conf, rowType, dataStream); + return context.isBounded() ? Pipelines.bulkInsert(conf, rowType, dataStream) : Pipelines.append(conf, rowType, dataStream); } - // default parallelism - int parallelism = dataStream.getExecutionConfig().getParallelism(); - - DataStream pipeline; // Append mode if (StreamerUtil.allowDuplicateInserts(conf)) { return Pipelines.append(conf, rowType, dataStream); } + // default parallelism + int parallelism = dataStream.getExecutionConfig().getParallelism(); + DataStream pipeline; + // bootstrap final DataStream hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded()); // write pipeline