[HUDI-2352] The upgrade downgrade action of flink writer should be singleton (#3531)
This commit is contained in:
@@ -399,6 +399,17 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Upgrade downgrade the Hoodie table.
|
||||||
|
*
|
||||||
|
* <p>This action should only be executed once for each commit.
|
||||||
|
* The modification of the table properties is not thread safe.
|
||||||
|
*/
|
||||||
|
public void upgradeDowngrade(String instantTime) {
|
||||||
|
HoodieTableMetaClient metaClient = createMetaClient(true);
|
||||||
|
new FlinkUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clean the write handles within a checkpoint interval.
|
* Clean the write handles within a checkpoint interval.
|
||||||
* All the handles should have been closed already.
|
* All the handles should have been closed already.
|
||||||
|
|||||||
@@ -335,6 +335,7 @@ public class StreamWriteOperatorCoordinator
|
|||||||
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
|
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
|
||||||
this.instant = instant;
|
this.instant = instant;
|
||||||
this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
|
this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
|
||||||
|
this.writeClient.upgradeDowngrade(this.instant);
|
||||||
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
|
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
|
||||||
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
|
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user