From 89ed6f062eead82f441c695e027bbbd67a272dfc Mon Sep 17 00:00:00 2001 From: satishkotha Date: Fri, 11 Feb 2022 11:12:45 -0800 Subject: [PATCH] [HUDI-3362] Fix restore to rollback pending clustering operations followed by other rolling back other commits (#4772) --- .../apache/hudi/cli/commands/SparkMain.java | 4 +-- .../rollback/RestorePlanActionExecutor.java | 25 ++++++++++++++++--- .../hudi/common/util/ClusteringUtils.java | 8 +++++- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index e7866b94a..1b6d10b17 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -439,7 +439,7 @@ public class SparkMain { LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime)); return 0; } catch (Exception e) { - LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime)); + LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime), e); return -1; } } @@ -451,7 +451,7 @@ public class SparkMain { LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime)); return 0; } catch (Exception e) { - LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime)); + LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime), e); return -1; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java index a25eaed6d..e33dffcb7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RestorePlanActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -39,6 +40,7 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Plans the restore action and add a restore.requested meta file to timeline. @@ -66,10 +68,25 @@ public class RestorePlanActionExecutor e final HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime); try { // Get all the commits on the timeline after the provided commit time - List instantsToRollback = table.getActiveTimeline().getWriteTimeline() - .getReverseOrderedInstants() - .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)).map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction())) - .collect(Collectors.toList()); + // rollback pending clustering instants first before other instants (See HUDI-3362) + List pendingClusteringInstantsToRollback = table.getActiveTimeline().filterPendingReplaceTimeline() + // filter only clustering related replacecommits (Not insert_overwrite related commits) + .filter(instant -> ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant)) + .getReverseOrderedInstants() + .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)) + .collect(Collectors.toList()); + + // Get all the commits on the timeline after the provided commit time + List commitInstantsToRollback = table.getActiveTimeline().getWriteTimeline() + .getReverseOrderedInstants() + .filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)) + .filter(instant -> !pendingClusteringInstantsToRollback.contains(instant)) + .collect(Collectors.toList()); + + // Combine both lists - first rollback pending clustering and then rollback all other commits + List instantsToRollback = Stream.concat(pendingClusteringInstantsToRollback.stream(), commitInstantsToRollback.stream()) + .map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction())) + .collect(Collectors.toList()); HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION); table.getActiveTimeline().saveToRestoreRequested(restoreInstant, TimelineMetadataUtils.serializeRestorePlan(restorePlan)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index 15e53705b..9d741a03f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -215,6 +215,12 @@ public class ClusteringUtils { } public static List getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) { - return metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList()); + return metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants() + .filter(instant -> isPendingClusteringInstant(metaClient, instant)) + .collect(Collectors.toList()); + } + + public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { + return getClusteringPlan(metaClient, instant).isPresent(); } }