diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 723bd33b8..270027df1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1551,7 +1551,7 @@ public abstract class BaseHoodieWriteClient instantTime) { + protected void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index ce75452d2..2d23c3afb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -407,14 +407,20 @@ public class HoodieFlinkWriteClient extends return getHoodieTable(); } + @Override + protected void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { + // do nothing. + // flink executes the upgrade/downgrade once when initializing the first instant on start up, + // no need to execute the upgrade/downgrade on each write in streaming. + } + /** * Upgrade downgrade the Hoodie table. * *

This action should only be executed once for each commit. * The modification of the table properties is not thread safe. */ - public void upgradeDowngrade(String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); + public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) { new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.current(), instantTime); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 023b1e696..39976e5ee 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -394,7 +394,7 @@ public class StreamWriteOperatorCoordinator // starts a new instant startInstant(); // upgrade downgrade - this.writeClient.upgradeDowngrade(this.instant); + this.writeClient.upgradeDowngrade(this.instant, this.metaClient); }, "initialize instant %s", instant); }