[HUDI-3389] Bump flink version to 1.14.3 (#4776)
This commit is contained in:
@@ -415,13 +415,11 @@ public class StreamWriteOperatorCoordinator
|
||||
CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
|
||||
.map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
|
||||
.toArray(CompletableFuture<?>[]::new);
|
||||
try {
|
||||
CompletableFuture.allOf(futures).get();
|
||||
} catch (Throwable throwable) {
|
||||
if (!sendToFinishedTasks(throwable)) {
|
||||
throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable);
|
||||
CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
|
||||
if (!sendToFinishedTasks(error)) {
|
||||
throw new HoodieException("Error while waiting for the commit ack events to finish sending", error);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
|
||||
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
|
||||
@@ -39,11 +39,14 @@ import org.apache.hudi.sink.partitioner.BucketAssignFunction;
|
||||
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
|
||||
import org.apache.hudi.table.format.FilePathUtils;
|
||||
|
||||
import org.apache.flink.api.common.functions.Partitioner;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSink;
|
||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
||||
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
|
||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
@@ -64,7 +67,11 @@ public class Pipelines {
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
|
||||
|
||||
// shuffle by partition keys
|
||||
dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
|
||||
// use #partitionCustom instead of #keyBy to avoid duplicate sort operations,
|
||||
// see BatchExecutionUtils#applyBatchExecutionSettings for details.
|
||||
Partitioner<String> partitioner = (key, channels) ->
|
||||
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels);
|
||||
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
|
||||
}
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
|
||||
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
|
||||
|
||||
@@ -21,6 +21,7 @@ package org.apache.hudi.source;
|
||||
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
|
||||
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
|
||||
|
||||
import org.apache.flink.api.common.operators.MailboxExecutor;
|
||||
import org.apache.flink.api.common.state.ListState;
|
||||
import org.apache.flink.api.common.state.ListStateDescriptor;
|
||||
import org.apache.flink.runtime.state.JavaSerializer;
|
||||
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
|
||||
import org.apache.flink.streaming.api.functions.source.SourceFunction;
|
||||
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
|
||||
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
|
||||
import org.apache.flink.streaming.api.operators.MailboxExecutor;
|
||||
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
|
||||
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
|
||||
import org.apache.flink.streaming.api.operators.StreamOperator;
|
||||
@@ -54,7 +54,7 @@ import java.util.concurrent.LinkedBlockingDeque;
|
||||
* StreamReadMonitoringFunction}. Contrary to the {@link StreamReadMonitoringFunction} which has a parallelism of 1,
|
||||
* this operator can have multiple parallelism.
|
||||
*
|
||||
* <p>As soon as an input split {@link MergeOnReadInputSplit} is received, it is put in a queue,
|
||||
* <p>As soon as an input split {@link MergeOnReadInputSplit} is received, it is put into a queue,
|
||||
* the {@link MailboxExecutor} read the actual data of the split.
|
||||
* This architecture allows the separation of split reading from processing the checkpoint barriers,
|
||||
* thus removing any potential back-pressure.
|
||||
@@ -118,10 +118,10 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
|
||||
getOperatorConfig().getTimeCharacteristic(),
|
||||
getProcessingTimeService(),
|
||||
new Object(), // no actual locking needed
|
||||
getContainingTask().getStreamStatusMaintainer(),
|
||||
output,
|
||||
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
|
||||
-1);
|
||||
-1,
|
||||
true);
|
||||
|
||||
// Enqueue to process the recovered input splits.
|
||||
enqueueProcessSplits();
|
||||
@@ -205,8 +205,8 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() throws Exception {
|
||||
super.dispose();
|
||||
public void close() throws Exception {
|
||||
super.close();
|
||||
|
||||
if (format != null) {
|
||||
format.close();
|
||||
@@ -218,8 +218,8 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
super.close();
|
||||
public void finish() throws Exception {
|
||||
super.finish();
|
||||
output.close();
|
||||
if (sourceContext != null) {
|
||||
sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
|
||||
|
||||
Reference in New Issue
Block a user