[HUDI-2534] Remove the sort operation when bulk_insert in batch mode (#3772)
This commit is contained in:
@@ -458,8 +458,8 @@ public class FlinkOptions extends HoodieConfig {
|
|||||||
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
|
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
|
||||||
.key("compaction.tasks")
|
.key("compaction.tasks")
|
||||||
.intType()
|
.intType()
|
||||||
.defaultValue(10) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.5 (assumes two commits generate one bucket)
|
.defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket)
|
||||||
.withDescription("Parallelism of tasks that do actual compaction, default is 10");
|
.withDescription("Parallelism of tasks that do actual compaction, default is 4");
|
||||||
|
|
||||||
public static final String NUM_COMMITS = "num_commits";
|
public static final String NUM_COMMITS = "num_commits";
|
||||||
public static final String TIME_ELAPSED = "time_elapsed";
|
public static final String TIME_ELAPSED = "time_elapsed";
|
||||||
|
|||||||
@@ -181,7 +181,7 @@ public class BulkInsertWriterHelper {
|
|||||||
return getHoodieWriteStatuses().stream()
|
return getHoodieWriteStatuses().stream()
|
||||||
.map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList());
|
.map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieException("Error collect the write status for task [" + taskID + "]");
|
throw new HoodieException("Error collect the write status for task [" + taskID + "]", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -83,7 +83,8 @@ public class Pipelines {
|
|||||||
operatorFactory)
|
operatorFactory)
|
||||||
// follow the parallelism of upstream operators to avoid shuffle
|
// follow the parallelism of upstream operators to avoid shuffle
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
||||||
.addSink(DummySink.INSTANCE);
|
.addSink(DummySink.INSTANCE)
|
||||||
|
.name("dummy");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DataStreamSink<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
public static DataStreamSink<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
||||||
@@ -93,7 +94,8 @@ public class Pipelines {
|
|||||||
.transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
||||||
.addSink(DummySink.INSTANCE);
|
.addSink(DummySink.INSTANCE)
|
||||||
|
.name("dummy");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DataStream<HoodieRecord> bootstrap(
|
public static DataStream<HoodieRecord> bootstrap(
|
||||||
|
|||||||
@@ -72,18 +72,18 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
// bulk_insert mode
|
// bulk_insert mode
|
||||||
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
|
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
|
||||||
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
|
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
|
||||||
return Pipelines.bulkInsert(conf, rowType, dataStream);
|
return context.isBounded() ? Pipelines.bulkInsert(conf, rowType, dataStream) : Pipelines.append(conf, rowType, dataStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
// default parallelism
|
|
||||||
int parallelism = dataStream.getExecutionConfig().getParallelism();
|
|
||||||
|
|
||||||
DataStream<Object> pipeline;
|
|
||||||
// Append mode
|
// Append mode
|
||||||
if (StreamerUtil.allowDuplicateInserts(conf)) {
|
if (StreamerUtil.allowDuplicateInserts(conf)) {
|
||||||
return Pipelines.append(conf, rowType, dataStream);
|
return Pipelines.append(conf, rowType, dataStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// default parallelism
|
||||||
|
int parallelism = dataStream.getExecutionConfig().getParallelism();
|
||||||
|
DataStream<Object> pipeline;
|
||||||
|
|
||||||
// bootstrap
|
// bootstrap
|
||||||
final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
|
final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
|
||||||
// write pipeline
|
// write pipeline
|
||||||
|
|||||||
Reference in New Issue
Block a user