[HUDI-3362] Fix restore to rollback pending clustering operations followed by other rolling back other commits (#4772)
This commit is contained in:
@@ -439,7 +439,7 @@ public class SparkMain {
|
||||
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime));
|
||||
LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime), e);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@@ -451,7 +451,7 @@ public class SparkMain {
|
||||
LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime));
|
||||
LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime), e);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
import org.apache.hudi.common.util.ClusteringUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
@@ -39,6 +40,7 @@ import org.apache.log4j.Logger;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Plans the restore action and add a restore.requested meta file to timeline.
|
||||
@@ -66,10 +68,25 @@ public class RestorePlanActionExecutor<T extends HoodieRecordPayload, I, K, O> e
|
||||
final HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime);
|
||||
try {
|
||||
// Get all the commits on the timeline after the provided commit time
|
||||
List<HoodieInstantInfo> instantsToRollback = table.getActiveTimeline().getWriteTimeline()
|
||||
.getReverseOrderedInstants()
|
||||
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)).map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction()))
|
||||
.collect(Collectors.toList());
|
||||
// rollback pending clustering instants first before other instants (See HUDI-3362)
|
||||
List<HoodieInstant> pendingClusteringInstantsToRollback = table.getActiveTimeline().filterPendingReplaceTimeline()
|
||||
// filter only clustering related replacecommits (Not insert_overwrite related commits)
|
||||
.filter(instant -> ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant))
|
||||
.getReverseOrderedInstants()
|
||||
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Get all the commits on the timeline after the provided commit time
|
||||
List<HoodieInstant> commitInstantsToRollback = table.getActiveTimeline().getWriteTimeline()
|
||||
.getReverseOrderedInstants()
|
||||
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
|
||||
.filter(instant -> !pendingClusteringInstantsToRollback.contains(instant))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Combine both lists - first rollback pending clustering and then rollback all other commits
|
||||
List<HoodieInstantInfo> instantsToRollback = Stream.concat(pendingClusteringInstantsToRollback.stream(), commitInstantsToRollback.stream())
|
||||
.map(entry -> new HoodieInstantInfo(entry.getTimestamp(), entry.getAction()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
HoodieRestorePlan restorePlan = new HoodieRestorePlan(instantsToRollback, LATEST_RESTORE_PLAN_VERSION);
|
||||
table.getActiveTimeline().saveToRestoreRequested(restoreInstant, TimelineMetadataUtils.serializeRestorePlan(restorePlan));
|
||||
|
||||
@@ -215,6 +215,12 @@ public class ClusteringUtils {
|
||||
}
|
||||
|
||||
public static List<HoodieInstant> getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) {
|
||||
return metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
|
||||
return metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants()
|
||||
.filter(instant -> isPendingClusteringInstant(metaClient, instant))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
|
||||
return getClusteringPlan(metaClient, instant).isPresent();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user