diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 7f912c3db..7b553bc39 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -399,6 +399,17 @@ public class HoodieFlinkWriteClient extends } } + /** + * Upgrade downgrade the Hoodie table. + * + *

This action should only be executed once for each commit. + * The modification of the table properties is not thread safe. + */ + public void upgradeDowngrade(String instantTime) { + HoodieTableMetaClient metaClient = createMetaClient(true); + new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); + } + /** * Clean the write handles within a checkpoint interval. * All the handles should have been closed already. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 1a89400ff..f7047ef87 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -335,6 +335,7 @@ public class StreamWriteOperatorCoordinator this.writeClient.startCommitWithTime(instant, tableState.commitAction); this.instant = instant; this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant); + this.writeClient.upgradeDowngrade(this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); }