diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml
index b2cc6949b..b6f1f3d37 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -60,7 +60,7 @@
org.apache.flink
- flink-table-runtime-blink_${scala.binary.version}
+ flink-table-runtime_${scala.binary.version}${flink.version}provided
@@ -159,7 +159,7 @@
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime${flink.version}testtests
diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml
index c8fac38be..27a4a0b45 100644
--- a/hudi-flink/pom.xml
+++ b/hudi-flink/pom.xml
@@ -164,13 +164,13 @@
org.apache.flink
- flink-table-runtime-blink_${scala.binary.version}
+ flink-table-runtime_${scala.binary.version}${flink.version}providedorg.apache.flink
- flink-table-planner-blink_${scala.binary.version}
+ flink-table-planner_${scala.binary.version}${flink.version}provided
@@ -307,7 +307,7 @@
org.apache.flink
- flink-runtime_${scala.binary.version}
+ flink-runtime${flink.version}testtest-jar
@@ -321,7 +321,7 @@
org.apache.flink
- flink-table-runtime-blink_${scala.binary.version}
+ flink-table-runtime_${scala.binary.version}${flink.version}testtest-jar
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 447cfa420..4782070e3 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -415,13 +415,11 @@ public class StreamWriteOperatorCoordinator
CompletableFuture>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
.map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
.toArray(CompletableFuture>[]::new);
- try {
- CompletableFuture.allOf(futures).get();
- } catch (Throwable throwable) {
- if (!sendToFinishedTasks(throwable)) {
- throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable);
+ CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
+ if (!sendToFinishedTasks(error)) {
+ throw new HoodieException("Error while waiting for the commit ack events to finish sending", error);
}
- }
+ });
}
/**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java
index 4526c6ff9..3d42ad87d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 5f156e839..f97f79422 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -39,11 +39,14 @@ import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
@@ -64,7 +67,11 @@ public class Pipelines {
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
// shuffle by partition keys
- dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
+ // use #partitionCustom instead of #keyBy to avoid duplicate sort operations,
+ // see BatchExecutionUtils#applyBatchExecutionSettings for details.
+ Partitioner partitioner = (key, channels) ->
+ KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels);
+ dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
}
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
index 013043384..0f3d7de1d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
@@ -21,6 +21,7 @@ package org.apache.hudi.source;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.JavaSerializer;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -54,7 +54,7 @@ import java.util.concurrent.LinkedBlockingDeque;
* StreamReadMonitoringFunction}. Contrary to the {@link StreamReadMonitoringFunction} which has a parallelism of 1,
* this operator can have multiple parallelism.
*
- *
As soon as an input split {@link MergeOnReadInputSplit} is received, it is put in a queue,
+ *
As soon as an input split {@link MergeOnReadInputSplit} is received, it is put into a queue,
* the {@link MailboxExecutor} read the actual data of the split.
* This architecture allows the separation of split reading from processing the checkpoint barriers,
* thus removing any potential back-pressure.
@@ -118,10 +118,10 @@ public class StreamReadOperator extends AbstractStreamOperator
getOperatorConfig().getTimeCharacteristic(),
getProcessingTimeService(),
new Object(), // no actual locking needed
- getContainingTask().getStreamStatusMaintainer(),
output,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
- -1);
+ -1,
+ true);
// Enqueue to process the recovered input splits.
enqueueProcessSplits();
@@ -205,8 +205,8 @@ public class StreamReadOperator extends AbstractStreamOperator
}
@Override
- public void dispose() throws Exception {
- super.dispose();
+ public void close() throws Exception {
+ super.close();
if (format != null) {
format.close();
@@ -218,8 +218,8 @@ public class StreamReadOperator extends AbstractStreamOperator
}
@Override
- public void close() throws Exception {
- super.close();
+ public void finish() throws Exception {
+ super.finish();
output.close();
if (sourceContext != null) {
sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 028c058ee..cfbcced45 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -172,7 +172,7 @@ public class StreamWriteITCase extends TestLogger {
DataStream