[HUDI-3868] Disable the sort input for flink streaming append mode (#5309)
This commit is contained in:
@@ -173,9 +173,19 @@ public class Pipelines {
|
|||||||
* @param conf The configuration
|
* @param conf The configuration
|
||||||
* @param rowType The input row type
|
* @param rowType The input row type
|
||||||
* @param dataStream The input data stream
|
* @param dataStream The input data stream
|
||||||
|
* @param bounded Whether the input stream is bounded
|
||||||
* @return the appending data stream sink
|
* @return the appending data stream sink
|
||||||
*/
|
*/
|
||||||
public static DataStreamSink<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
public static DataStreamSink<Object> append(
|
||||||
|
Configuration conf,
|
||||||
|
RowType rowType,
|
||||||
|
DataStream<RowData> dataStream,
|
||||||
|
boolean bounded) {
|
||||||
|
if (!bounded) {
|
||||||
|
// In principle, the config should be immutable, but the boundedness
|
||||||
|
// is only visible when creating the sink pipeline.
|
||||||
|
conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false);
|
||||||
|
}
|
||||||
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
|
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
|
||||||
|
|
||||||
return dataStream
|
return dataStream
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
|||||||
|
|
||||||
// Append mode
|
// Append mode
|
||||||
if (OptionsResolver.isAppendMode(conf)) {
|
if (OptionsResolver.isAppendMode(conf)) {
|
||||||
return Pipelines.append(conf, rowType, dataStream);
|
return Pipelines.append(conf, rowType, dataStream, context.isBounded());
|
||||||
}
|
}
|
||||||
|
|
||||||
// default parallelism
|
// default parallelism
|
||||||
|
|||||||
Reference in New Issue
Block a user