Removing rollbacks instants from timeline for restore operation (#4518)
This commit is contained in:
committed by
GitHub
parent
e9a7f49f55
commit
56f93f4ebd
@@ -99,6 +99,16 @@ public abstract class BaseRestoreActionExecutor<T extends HoodieRecordPayload, I
|
|||||||
writeToMetadata(restoreMetadata);
|
writeToMetadata(restoreMetadata);
|
||||||
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
|
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
|
||||||
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
|
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
|
||||||
|
// get all rollbacks instants after restore instant time and delete them.
|
||||||
|
// if not, rollbacks will be considered not completed and might hinder metadata table compaction.
|
||||||
|
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getRollbackTimeline()
|
||||||
|
.getReverseOrderedInstants()
|
||||||
|
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
instantsToRollback.forEach(entry -> {
|
||||||
|
table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
|
||||||
|
table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp()));
|
||||||
|
});
|
||||||
LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime);
|
LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime);
|
||||||
return restoreMetadata;
|
return restoreMetadata;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -577,6 +577,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
|||||||
client = getHoodieWriteClient(newConfig);
|
client = getHoodieWriteClient(newConfig);
|
||||||
client.restoreToInstant("004");
|
client.restoreToInstant("004");
|
||||||
|
|
||||||
|
assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());
|
||||||
|
|
||||||
// Check the entire dataset has all records still
|
// Check the entire dataset has all records still
|
||||||
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
||||||
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||||
|
|||||||
Reference in New Issue
Block a user