From b3b44236fe031f84e99008eed85e14ec4519e40f Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 10 Feb 2022 11:32:01 +0800 Subject: [PATCH] [HUDI-3389] Bump flink version to 1.14.3 (#4776) --- hudi-client/hudi-flink-client/pom.xml | 4 ++-- hudi-flink/pom.xml | 8 +++---- .../sink/StreamWriteOperatorCoordinator.java | 10 ++++----- .../RowDataToHoodieFunctionWithRateLimit.java | 2 +- .../org/apache/hudi/sink/utils/Pipelines.java | 9 +++++++- .../hudi/source/StreamReadOperator.java | 16 +++++++------- .../apache/hudi/sink/StreamWriteITCase.java | 4 ++-- .../hudi/sink/utils/CollectorOutput.java | 6 +++++ .../sink/utils/CompactFunctionWrapper.java | 6 +++++ .../utils/MockStateInitializationContext.java | 7 ++++++ .../utils/MockStreamingRuntimeContext.java | 6 ++--- .../hudi/table/HoodieDataSourceITCase.java | 22 +++++++++++++++++-- pom.xml | 2 +- 13 files changed, 72 insertions(+), 30 deletions(-) diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml index b2cc6949b..b6f1f3d37 100644 --- a/hudi-client/hudi-flink-client/pom.xml +++ b/hudi-client/hudi-flink-client/pom.xml @@ -60,7 +60,7 @@ org.apache.flink - flink-table-runtime-blink_${scala.binary.version} + flink-table-runtime_${scala.binary.version} ${flink.version} provided @@ -159,7 +159,7 @@ org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${flink.version} test tests diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml index c8fac38be..27a4a0b45 100644 --- a/hudi-flink/pom.xml +++ b/hudi-flink/pom.xml @@ -164,13 +164,13 @@ org.apache.flink - flink-table-runtime-blink_${scala.binary.version} + flink-table-runtime_${scala.binary.version} ${flink.version} provided org.apache.flink - flink-table-planner-blink_${scala.binary.version} + flink-table-planner_${scala.binary.version} ${flink.version} provided @@ -307,7 +307,7 @@ org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${flink.version} test test-jar @@ -321,7 +321,7 @@ org.apache.flink - flink-table-runtime-blink_${scala.binary.version} + flink-table-runtime_${scala.binary.version} ${flink.version} test test-jar diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 447cfa420..4782070e3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -415,13 +415,11 @@ public class StreamWriteOperatorCoordinator CompletableFuture[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId))) .toArray(CompletableFuture[]::new); - try { - CompletableFuture.allOf(futures).get(); - } catch (Throwable throwable) { - if (!sendToFinishedTasks(throwable)) { - throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable); + CompletableFuture.allOf(futures).whenComplete((resp, error) -> { + if (!sendToFinishedTasks(error)) { + throw new HoodieException("Error while waiting for the commit ack events to finish sending", error); } - } + }); } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java index 4526c6ff9..3d42ad87d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunctionWithRateLimit.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter; +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 5f156e839..f97f79422 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -39,11 +39,14 @@ import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.data.RowData; @@ -64,7 +67,11 @@ public class Pipelines { if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) { // shuffle by partition keys - dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath); + // use #partitionCustom instead of #keyBy to avoid duplicate sort operations, + // see BatchExecutionUtils#applyBatchExecutionSettings for details. + Partitioner partitioner = (key, channels) -> + KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels); + dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath); } if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) { SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields); diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java index 013043384..0f3d7de1d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java @@ -21,6 +21,7 @@ package org.apache.hudi.source; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.JavaSerializer; @@ -29,7 +30,6 @@ import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; -import org.apache.flink.streaming.api.operators.MailboxExecutor; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -54,7 +54,7 @@ import java.util.concurrent.LinkedBlockingDeque; * StreamReadMonitoringFunction}. Contrary to the {@link StreamReadMonitoringFunction} which has a parallelism of 1, * this operator can have multiple parallelism. * - *

As soon as an input split {@link MergeOnReadInputSplit} is received, it is put in a queue, + *

As soon as an input split {@link MergeOnReadInputSplit} is received, it is put into a queue, * the {@link MailboxExecutor} read the actual data of the split. * This architecture allows the separation of split reading from processing the checkpoint barriers, * thus removing any potential back-pressure. @@ -118,10 +118,10 @@ public class StreamReadOperator extends AbstractStreamOperator getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), new Object(), // no actual locking needed - getContainingTask().getStreamStatusMaintainer(), output, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(), - -1); + -1, + true); // Enqueue to process the recovered input splits. enqueueProcessSplits(); @@ -205,8 +205,8 @@ public class StreamReadOperator extends AbstractStreamOperator } @Override - public void dispose() throws Exception { - super.dispose(); + public void close() throws Exception { + super.close(); if (format != null) { format.close(); @@ -218,8 +218,8 @@ public class StreamReadOperator extends AbstractStreamOperator } @Override - public void close() throws Exception { - super.close(); + public void finish() throws Exception { + super.finish(); output.close(); if (sourceContext != null) { sourceContext.emitWatermark(Watermark.MAX_WATERMARK); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index 028c058ee..cfbcced45 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -172,7 +172,7 @@ public class StreamWriteITCase extends TestLogger { DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); Pipelines.clean(conf, pipeline); Pipelines.compact(conf, pipeline); - JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); + JobClient client = execEnv.executeAsync(execEnv.getStreamGraph()); if (client.getJobStatus().get() != JobStatus.FAILED) { try { TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish @@ -229,7 +229,7 @@ public class StreamWriteITCase extends TestLogger { DataStream pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); execEnv.addOperator(pipeline.getTransformation()); - JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME))); + JobClient client = execEnv.executeAsync(conf.getString(FlinkOptions.TABLE_NAME)); // wait for the streaming job to finish client.getJobExecutionResult().get(); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java index 3da21e6eb..c386e6287 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.OutputTag; @@ -49,6 +50,11 @@ public class CollectorOutput implements Output> { list.add(mark); } + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) { + + } + @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { list.add(latencyMarker); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java index fe2ddad18..e703515de 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -102,6 +103,11 @@ public class CompactFunctionWrapper { } + @Override + public void emitWatermarkStatus(WatermarkStatus watermarkStatus) { + + } + @Override public void collect(OutputTag outputTag, StreamRecord streamRecord) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java index dd89f7111..c582e9553 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateInitializationContext.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StatePartitionStreamProvider; +import java.util.OptionalLong; + /** * A {@link FunctionInitializationContext} for testing purpose. */ @@ -39,6 +41,11 @@ public class MockStateInitializationContext implements StateInitializationContex return false; } + @Override + public OptionalLong getRestoredCheckpointId() { + return OptionalLong.empty(); + } + @Override public MockOperatorStateStore getOperatorStateStore() { return operatorStateStore; diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java index 14305da3d..8a66f1dce 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStreamingRuntimeContext.java @@ -19,7 +19,7 @@ package org.apache.hudi.sink.utils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.KeyedStateStore; -import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; @@ -69,8 +69,8 @@ public class MockStreamingRuntimeContext extends StreamingRuntimeContext { } @Override - public MetricGroup getMetricGroup() { - return new UnregisteredMetricsGroup(); + public OperatorMetricGroup getMetricGroup() { + return UnregisteredMetricsGroup.createOperatorMetricGroup(); } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index f1ca68e63..5734e2257 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -31,10 +31,12 @@ import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.ObjectPath; @@ -86,8 +88,24 @@ public class HoodieDataSourceITCase extends AbstractTestBase { execConf.setString("restart-strategy", "fixed-delay"); execConf.setString("restart-strategy.fixed-delay.attempts", "0"); + Configuration conf = new Configuration(); + // for batch upsert use cases: current suggestion is to disable these 2 options, + // from 1.14, flink runtime execution mode has switched from streaming + // to batch for batch execution mode(before that, both streaming and batch use streaming execution mode), + // current batch execution mode has these limitations: + // + // 1. the keyed stream default to always sort the inputs by key; + // 2. the batch state-backend requires the inputs sort by state key + // + // For our hudi batch pipeline upsert case, we rely on the consuming sequence for index records and data records, + // the index records must be loaded first before data records for BucketAssignFunction to keep upsert semantics correct, + // so we suggest disabling these 2 options to use streaming state-backend for batch execution mode + // to keep the strategy before 1.14. + conf.setBoolean("execution.sorted-inputs.enabled", false); + conf.setBoolean("execution.batch-state-backend.enabled", false); + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(conf); settings = EnvironmentSettings.newInstance().inBatchMode().build(); - batchTableEnv = TableEnvironmentImpl.create(settings); + batchTableEnv = StreamTableEnvironment.create(execEnv, settings); batchTableEnv.getConfig().getConfiguration() .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); } @@ -861,7 +879,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { .getContextClassLoader().getResource("debezium_json.data")).toString(); String sourceDDL = "" + "CREATE TABLE debezium_source(\n" - + " id INT NOT NULL,\n" + + " id INT NOT NULL PRIMARY KEY NOT ENFORCED,\n" + " ts BIGINT,\n" + " name STRING,\n" + " description STRING,\n" diff --git a/pom.xml b/pom.xml index 2778885e8..420da1c5d 100644 --- a/pom.xml +++ b/pom.xml @@ -117,7 +117,7 @@ 4.4.1 ${spark2.version} - 1.13.1 + 1.14.3 2.4.4 3.2.0 hudi-spark2