[HUDI-4130] Remove the upgrade/downgrade for flink #initTable (#5642)
This commit is contained in:
@@ -1551,7 +1551,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
|
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
|
||||||
UpgradeDowngrade upgradeDowngrade =
|
UpgradeDowngrade upgradeDowngrade =
|
||||||
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);
|
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);
|
||||||
|
|
||||||
|
|||||||
@@ -407,14 +407,20 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
return getHoodieTable();
|
return getHoodieTable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
|
||||||
|
// do nothing.
|
||||||
|
// flink executes the upgrade/downgrade once when initializing the first instant on start up,
|
||||||
|
// no need to execute the upgrade/downgrade on each write in streaming.
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upgrade downgrade the Hoodie table.
|
* Upgrade downgrade the Hoodie table.
|
||||||
*
|
*
|
||||||
* <p>This action should only be executed once for each commit.
|
* <p>This action should only be executed once for each commit.
|
||||||
* The modification of the table properties is not thread safe.
|
* The modification of the table properties is not thread safe.
|
||||||
*/
|
*/
|
||||||
public void upgradeDowngrade(String instantTime) {
|
public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) {
|
||||||
HoodieTableMetaClient metaClient = createMetaClient(true);
|
|
||||||
new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
|
new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
|
||||||
.run(HoodieTableVersion.current(), instantTime);
|
.run(HoodieTableVersion.current(), instantTime);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -394,7 +394,7 @@ public class StreamWriteOperatorCoordinator
|
|||||||
// starts a new instant
|
// starts a new instant
|
||||||
startInstant();
|
startInstant();
|
||||||
// upgrade downgrade
|
// upgrade downgrade
|
||||||
this.writeClient.upgradeDowngrade(this.instant);
|
this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
|
||||||
}, "initialize instant %s", instant);
|
}, "initialize instant %s", instant);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user