From a60fab3a5c6df1b9b3d32f192d4c6ca4ff1d6f6d Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 25 Aug 2021 10:56:14 +0800 Subject: [PATCH] [HUDI-2352] The upgrade downgrade action of flink writer should be singleton (#3531) --- .../apache/hudi/client/HoodieFlinkWriteClient.java | 11 +++++++++++ .../hudi/sink/StreamWriteOperatorCoordinator.java | 1 + 2 files changed, 12 insertions(+) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 7f912c3db..7b553bc39 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -399,6 +399,17 @@ public class HoodieFlinkWriteClient extends } } + /** + * Upgrade downgrade the Hoodie table. + * + *

This action should only be executed once for each commit. + * The modification of the table properties is not thread safe. + */ + public void upgradeDowngrade(String instantTime) { + HoodieTableMetaClient metaClient = createMetaClient(true); + new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); + } + /** * Clean the write handles within a checkpoint interval. * All the handles should have been closed already. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 1a89400ff..f7047ef87 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -335,6 +335,7 @@ public class StreamWriteOperatorCoordinator this.writeClient.startCommitWithTime(instant, tableState.commitAction); this.instant = instant; this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant); + this.writeClient.upgradeDowngrade(this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); }