diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 9650ddaeb..6fcce1b0b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.callback.HoodieWriteCommitCallback; import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; @@ -590,12 +591,19 @@ public abstract class AbstractHoodieWriteClient HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) .findFirst()); if (commitInstantOpt.isPresent()) { - HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true); - if (timerContext != null) { - long durationInMs = metrics.getDurationInMs(timerContext.stop()); - metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); + LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime); + Option rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false); + if (rollbackPlanOption.isPresent()) { + // execute rollback + HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true); + if (timerContext != null) { + long durationInMs = metrics.getDurationInMs(timerContext.stop()); + metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); + } + return true; + } else { + throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime); } - return true; } else { LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"); return false; @@ -776,7 +784,9 @@ public abstract class AbstractHoodieWriteClient table) { - table.rollback(context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false); + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + table.scheduleRollback(context, commitTime, inflightInstant, false); + table.rollback(context, commitTime, inflightInstant, false); table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } @@ -978,7 +988,9 @@ public abstract class AbstractHoodieWriteClient table) { - table.rollback(context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false); + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + table.scheduleRollback(context, commitTime, inflightInstant, false); + table.rollback(context, commitTime, inflightInstant, false); table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index ad40c8ec7..f701e4036 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; @@ -316,6 +317,13 @@ public abstract class HoodieTable implem return getActiveTimeline().getCleanerTimeline(); } + /** + * Get rollback timeline. + */ + public HoodieTimeline getRollbackTimeline() { + return getActiveTimeline().getRollbackTimeline(); + } + /** * Get only the completed (no-inflights) savepoint timeline. */ @@ -417,6 +425,19 @@ public abstract class HoodieTable implem */ public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime); + /** + * Schedule rollback for the instant time. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for scheduling rollback + * @param instantToRollback instant to be rolled back + * @return HoodieRollbackPlan containing info on rollback. + */ + public abstract Option scheduleRollback(HoodieEngineContext context, + String instantTime, + HoodieInstant instantToRollback, + boolean skipTimelinePublish); + /** * Rollback the (inflight/committed) record changes with the given commit time. *
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 7dbbaa70e..3dc585121 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.rollback;
 
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
@@ -43,7 +44,6 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -53,11 +53,6 @@ public abstract class BaseRollbackActionExecutor execute(HoodieInstant instantToRollback);
-  }
-
   protected final HoodieInstant instantToRollback;
   protected final boolean deleteInstants;
   protected final boolean skipTimelinePublish;
@@ -92,30 +87,74 @@ public abstract class BaseRollbackActionExecutor executeRollback(HoodieRollbackPlan hoodieRollbackPlan) throws IOException;
 
-  protected abstract List executeRollback() throws IOException;
+  private HoodieRollbackMetadata runRollback(HoodieTable table, HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) {
+    ValidationUtils.checkArgument(rollbackInstant.getState().equals(HoodieInstant.State.REQUESTED)
+        || rollbackInstant.getState().equals(HoodieInstant.State.INFLIGHT));
+    try {
+      final HoodieInstant inflightInstant;
+      final HoodieTimer timer = new HoodieTimer();
+      timer.startTimer();
+      if (rollbackInstant.isRequested()) {
+        inflightInstant = table.getActiveTimeline().transitionRollbackRequestedToInflight(rollbackInstant,
+            TimelineMetadataUtils.serializeRollbackPlan(rollbackPlan));
+      } else {
+        inflightInstant = rollbackInstant;
+      }
 
-  protected abstract List executeRollbackUsingFileListing(HoodieInstant instantToRollback);
+      HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
+      List stats = doRollbackAndGetStats(rollbackPlan);
+      HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
+          instantTime,
+          Option.of(rollbackTimer.endTimer()),
+          Collections.singletonList(instantToRollback),
+          stats);
+      if (!skipTimelinePublish) {
+        finishRollback(inflightInstant, rollbackMetadata);
+      }
+
+      // Finally, remove the markers post rollback.
+      WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
+          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+
+      return rollbackMetadata;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to rollback commit ", e);
+    }
+  }
 
   @Override
   public HoodieRollbackMetadata execute() {
-    HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
-    List stats = doRollbackAndGetStats();
-    HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
-        instantTime,
-        Option.of(rollbackTimer.endTimer()),
-        Collections.singletonList(instantToRollback),
-        stats);
-    if (!skipTimelinePublish) {
-      finishRollback(rollbackMetadata);
+    table.getMetaClient().reloadActiveTimeline();
+    List rollBackInstants = table.getRollbackTimeline()
+        .filterInflightsAndRequested().getInstants().collect(Collectors.toList());
+    if (rollBackInstants.isEmpty()) {
+      throw new HoodieRollbackException("No Requested Rollback Instants found to execute rollback ");
+    }
+    HoodieInstant rollbackInstant = null;
+    for (HoodieInstant instant : rollBackInstants) {
+      if (instantTime.equals(instant.getTimestamp())) {
+        rollbackInstant = instant;
+        break;
+      }
+    }
+    if (rollbackInstant != null) {
+      try {
+        HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant);
+        return runRollback(table, rollBackInstants.get(0), rollbackPlan);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to fetch rollback plan to rollback commit " + rollbackInstant.getTimestamp(), e);
+      }
+    } else {
+      throw new HoodieIOException("No inflight rollback instants found for commit time " + instantTime);
     }
-
-    // Finally, remove the markers post rollback.
-    WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-
-    return rollbackMetadata;
   }
 
   private void validateSavepointRollbacks() {
@@ -173,7 +212,7 @@ public abstract class BaseRollbackActionExecutor doRollbackAndGetStats() {
+  public List doRollbackAndGetStats(HoodieRollbackPlan hoodieRollbackPlan) {
     final String instantTimeToRollback = instantToRollback.getTimestamp();
     final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
         && !instantToRollback.isCompleted();
@@ -186,7 +225,7 @@ public abstract class BaseRollbackActionExecutor stats = executeRollback();
+      List stats = executeRollback(hoodieRollbackPlan);
       LOG.info("Rolled back inflight instant " + instantTimeToRollback);
       if (!isPendingCompaction) {
         rollBackIndex();
@@ -197,12 +236,19 @@ public abstract class BaseRollbackActionExecutor executeRollback(HoodieInstant instantToRollback, HoodieRollbackPlan rollbackPlan) {
+    return new BaseRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackPlan.getRollbackRequests());
+  }
+
+  protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
     try {
-      table.getActiveTimeline().createNewInstant(
-          new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, instantTime));
-      table.getActiveTimeline().saveAsComplete(
-          new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, instantTime),
+      table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
           TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
       LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
     } catch (IOException e) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
new file mode 100644
index 000000000..721ca77b4
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Contains common methods to be used across engines for rollback operation.
+ */
+public class BaseRollbackHelper implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(BaseRollbackHelper.class);
+  protected static final String EMPTY_STRING = "";
+
+  protected final HoodieTableMetaClient metaClient;
+  protected final HoodieWriteConfig config;
+
+  public BaseRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    this.metaClient = metaClient;
+    this.config = config;
+  }
+
+  /**
+   * Performs all rollback actions that we have collected in parallel.
+   */
+  public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                  List rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
+    // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize
+    // is failing with com.esotericsoftware.kryo.KryoException
+    // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
+    // related stack overflow post: https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as GenericData.Array.
+    List serializableRequests = rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
+    return context.reduceByKey(maybeDeleteAndCollectStats(context, instantToRollback, serializableRequests, true, parallelism),
+        RollbackUtils::mergeRollbackStat, parallelism);
+  }
+
+  /**
+   * Collect all file info that needs to be rollbacked.
+   */
+  public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                       List rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
+    // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize
+    // is failing with com.esotericsoftware.kryo.KryoException
+    // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
+    // related stack overflow post: https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as GenericData.Array.
+    List serializableRequests = rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
+    return context.reduceByKey(maybeDeleteAndCollectStats(context, instantToRollback, serializableRequests, false, parallelism),
+        RollbackUtils::mergeRollbackStat, parallelism);
+  }
+
+  /**
+   * May be delete interested files and collect stats or collect stats only.
+   *
+   * @param context           instance of {@link HoodieEngineContext} to use.
+   * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
+   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to be operated on.
+   * @param doDelete          {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
+   * @return stats collected with or w/o actual deletions.
+   */
+  List> maybeDeleteAndCollectStats(HoodieEngineContext context,
+                                                                    HoodieInstant instantToRollback,
+                                                                    List rollbackRequests,
+                                                                    boolean doDelete, int numPartitions) {
+    return context.flatMap(rollbackRequests, (SerializableFunction>>) rollbackRequest -> {
+      List filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
+      if (!filesToBeDeleted.isEmpty()) {
+        List rollbackStats = deleteFiles(metaClient, filesToBeDeleted, doDelete);
+        List> partitionToRollbackStats = new ArrayList<>();
+        rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
+        return partitionToRollbackStats.stream();
+      } else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
+        Map logFilesToBeDeleted = rollbackRequest.getLogBlocksToBeDeleted();
+        String fileId = rollbackRequest.getFileId();
+        String latestBaseInstant = rollbackRequest.getLatestBaseInstant();
+        FileSystem fs = metaClient.getFs();
+        // collect all log files that is supposed to be deleted with this rollback
+        Map writtenLogFileSizeMap = new HashMap<>();
+        for (Map.Entry entry : logFilesToBeDeleted.entrySet()) {
+          writtenLogFileSizeMap.put(fs.getFileStatus(new Path(entry.getKey())), entry.getValue());
+        }
+        HoodieLogFormat.Writer writer = null;
+        try {
+          writer = HoodieLogFormat.newWriterBuilder()
+              .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
+              .withFileId(fileId)
+              .overBaseCommit(latestBaseInstant)
+              .withFs(metaClient.getFs())
+              .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+          // generate metadata
+          if (doDelete) {
+            Map header = generateHeader(instantToRollback.getTimestamp());
+            // if update belongs to an existing log file
+            writer.appendBlock(new HoodieCommandBlock(header));
+          }
+        } catch (IOException | InterruptedException io) {
+          throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
+        } finally {
+          try {
+            if (writer != null) {
+              writer.close();
+            }
+          } catch (IOException io) {
+            throw new HoodieIOException("Error appending rollback block..", io);
+          }
+        }
+
+        // This step is intentionally done after writer is closed. Guarantees that
+        // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
+        // cloud-storage : HUDI-168
+        Map filesToNumBlocksRollback = Collections.singletonMap(
+            metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+            1L
+        );
+        return Collections.singletonList(Pair.of(rollbackRequest.getPartitionPath(),
+            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                .withRollbackBlockAppendResults(filesToNumBlocksRollback)
+                .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build())).stream();
+      } else {
+        return Collections
+            .singletonList(Pair.of(rollbackRequest.getPartitionPath(),
+                HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                    .build())).stream();
+      }
+    }, numPartitions);
+  }
+
+  /**
+   * Common method used for cleaning out files during rollback.
+   */
+  protected List deleteFiles(HoodieTableMetaClient metaClient, List filesToBeDeleted, boolean doDelete) throws IOException {
+    return filesToBeDeleted.stream().map(fileToDelete -> {
+      String basePath = metaClient.getBasePath();
+      try {
+        Path fullDeletePath = new Path(fileToDelete);
+        String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
+        boolean isDeleted = true;
+        if (doDelete) {
+          isDeleted = metaClient.getFs().delete(fullDeletePath);
+        }
+        return HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partitionPath)
+            .withDeletedFileResult(fullDeletePath.toString(), isDeleted)
+            .build();
+      } catch (IOException e) {
+        LOG.error("Fetching file status for ");
+        throw new HoodieIOException("Fetching file status for " + fileToDelete + " failed ", e);
+      }
+    }).collect(Collectors.toList());
+  }
+
+  protected Map generateHeader(String commit) {
+    // generate metadata
+    Map header = new HashMap<>(3);
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    return header;
+  }
+
+  public interface SerializablePathFilter extends PathFilter, Serializable {
+
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
new file mode 100644
index 000000000..24edde276
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base rollback plan action executor to assist in scheduling rollback requests. This phase serialized {@link HoodieRollbackPlan}
+ * to rollback.requested instant.
+ */
+public class BaseRollbackPlanActionExecutor extends BaseActionExecutor> {
+
+  private static final Logger LOG = LogManager.getLogger(BaseRollbackPlanActionExecutor.class);
+
+  protected final HoodieInstant instantToRollback;
+  private final boolean skipTimelinePublish;
+
+  public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
+  public static final Integer LATEST_ROLLBACK_PLAN_VERSION = ROLLBACK_PLAN_VERSION_1;
+
+  public BaseRollbackPlanActionExecutor(HoodieEngineContext context,
+                                        HoodieWriteConfig config,
+                                        HoodieTable table,
+                                        String instantTime,
+                                        HoodieInstant instantToRollback,
+                                        boolean skipTimelinePublish) {
+    super(context, config, table, instantTime);
+    this.instantToRollback = instantToRollback;
+    this.skipTimelinePublish = skipTimelinePublish;
+  }
+
+  /**
+   * Interface for RollbackStrategy. There are two types supported, listing based and marker based.
+   */
+  interface RollbackStrategy extends Serializable {
+
+    /**
+     * Fetch list of {@link HoodieRollbackRequest}s to be added to rollback plan.
+     * @param instantToRollback instant to be rolled back.
+     * @return list of {@link HoodieRollbackRequest}s to be added to rollback plan
+     */
+    List getRollbackRequests(HoodieInstant instantToRollback);
+  }
+
+  /**
+   * Fetch the Rollback strategy used.
+   *
+   * @return
+   */
+  private BaseRollbackPlanActionExecutor.RollbackStrategy getRollbackStrategy() {
+    if (config.shouldRollbackUsingMarkers()) {
+      return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
+    } else {
+      return new ListingBasedRollbackStrategy(table, context, config, instantTime);
+    }
+  }
+
+  /**
+   * Creates a Rollback plan if there are files to be rolledback and stores them in instant file.
+   * Rollback Plan contains absolute file paths.
+   *
+   * @param startRollbackTime Rollback Instant Time
+   * @return Rollback Plan if generated
+   */
+  protected Option requestRollback(String startRollbackTime) {
+    final HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime);
+    try {
+      List rollbackRequests = new ArrayList<>();
+      if (!instantToRollback.isRequested()) {
+        rollbackRequests.addAll(getRollbackStrategy().getRollbackRequests(instantToRollback));
+      }
+      HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new HoodieInstantInfo(instantToRollback.getTimestamp(),
+          instantToRollback.getAction()), rollbackRequests, LATEST_ROLLBACK_PLAN_VERSION);
+      if (!skipTimelinePublish) {
+        if (table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp())) {
+          LOG.warn("Request Rollback found with instant time " + rollbackInstant + ", hence skipping scheduling rollback");
+        } else {
+          table.getActiveTimeline().saveToRollbackRequested(rollbackInstant, TimelineMetadataUtils.serializeRollbackPlan(rollbackPlan));
+          table.getMetaClient().reloadActiveTimeline();
+          LOG.info("Requesting Rollback with instant time " + rollbackInstant);
+        }
+      }
+      return Option.of(rollbackPlan);
+    } catch (IOException e) {
+      LOG.error("Got exception when saving rollback requested file", e);
+      throw new HoodieIOException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public Option execute() {
+    // Plan a new rollback action
+    return requestRollback(instantTime);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index 44b5492e7..9187179ff 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -58,16 +59,7 @@ public class CopyOnWriteRollbackActionExecutor executeRollback() {
+  protected List executeRollback(HoodieRollbackPlan hoodieRollbackPlan) {
     HoodieTimer rollbackTimer = new HoodieTimer();
     rollbackTimer.startTimer();
 
@@ -87,7 +79,7 @@ public class CopyOnWriteRollbackActionExecutor executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
-    List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
-        context, table.getMetaClient().getBasePath(), config);
-    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
index 849087222..b47136fa0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -19,21 +19,14 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,13 +36,15 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
 /**
  * Performs Rollback of Hoodie Tables.
  */
@@ -65,119 +60,85 @@ public class ListingBasedRollbackHelper implements Serializable {
   }
 
   /**
-   * Performs all rollback actions that we have collected in parallel.
+   * Collects info for Rollback plan.
    */
-  public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
-                                                  List rollbackRequests) {
-    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
-    context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
-    return context.mapToPairAndReduceByKey(rollbackRequests,
-        rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, true),
-        RollbackUtils::mergeRollbackStat,
-        parallelism);
-  }
-
-  /**
-   * Collect all file info that needs to be rollbacked.
-   */
-  public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
-                                                       List rollbackRequests) {
-    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
-    context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
-    return context.mapToPairAndReduceByKey(rollbackRequests,
-        rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false),
-        RollbackUtils::mergeRollbackStat,
-        parallelism);
+  public List getRollbackRequestsForRollbackPlan(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) {
+    int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Creating Rollback Plan");
+    return getListingBasedRollbackRequests(context, instantToRollback, rollbackRequests, sparkPartitions);
   }
 
   /**
    * May be delete interested files and collect stats or collect stats only.
    *
+   * @param context           instance of {@link HoodieEngineContext} to use.
    * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
-   * @param doDelete          {@code true} if deletion has to be done.
-   *                          {@code false} if only stats are to be collected w/o performing any deletes.
+   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to be operated on.
+   * @param numPartitions     number of spark partitions to use for parallelism.
    * @return stats collected with or w/o actual deletions.
    */
-  private Pair maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest,
-                                                                      HoodieInstant instantToRollback,
-                                                                      boolean doDelete) throws IOException {
-    switch (rollbackRequest.getType()) {
-      case DELETE_DATA_FILES_ONLY: {
-        final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
-            rollbackRequest.getPartitionPath(), doDelete);
-        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                .withDeletedFileResults(filesToDeletedStatus).build());
-      }
-      case DELETE_DATA_AND_LOG_FILES: {
-        final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
-        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                .withDeletedFileResults(filesToDeletedStatus).build());
-      }
-      case APPEND_ROLLBACK_BLOCK: {
-        String fileId = rollbackRequest.getFileId().get();
-        String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
-
-        // collect all log files that is supposed to be deleted with this rollback
-        Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
-            FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
-            fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
-            .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
-
-        HoodieLogFormat.Writer writer = null;
-        try {
-          writer = HoodieLogFormat.newWriterBuilder()
-              .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
-              .withFileId(fileId)
-              .overBaseCommit(latestBaseInstant)
-              .withFs(metaClient.getFs())
-              .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-          // generate metadata
-          if (doDelete) {
-            Map header = generateHeader(instantToRollback.getTimestamp());
-            // if update belongs to an existing log file
-            writer.appendBlock(new HoodieCommandBlock(header));
-          }
-        } catch (IOException | InterruptedException io) {
-          throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
-        } finally {
-          try {
-            if (writer != null) {
-              writer.close();
-            }
-          } catch (IOException io) {
-            throw new HoodieIOException("Error appending rollback block..", io);
-          }
+  private List getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                                      List rollbackRequests, int numPartitions) {
+    return context.map(rollbackRequests, rollbackRequest -> {
+      switch (rollbackRequest.getType()) {
+        case DELETE_DATA_FILES_ONLY: {
+          final FileStatus[] filesToDeletedStatus = getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+              rollbackRequest.getPartitionPath(), metaClient.getFs());
+          List filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+            String fileToBeDeleted = fileStatus.getPath().toString();
+            // strip scheme
+            return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+          }).collect(Collectors.toList());
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(),
+              EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
         }
-
-        // This step is intentionally done after writer is closed. Guarantees that
-        // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
-        // cloud-storage : HUDI-168
-        Map filesToNumBlocksRollback = Collections.singletonMap(
-            metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-            1L
-        );
-
-        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-                .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
+        case DELETE_DATA_AND_LOG_FILES: {
+          final FileStatus[] filesToDeletedStatus = getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), metaClient.getFs());
+          List filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+            String fileToBeDeleted = fileStatus.getPath().toString();
+            // strip scheme
+            return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+          }).collect(Collectors.toList());
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
+        }
+        case APPEND_ROLLBACK_BLOCK: {
+          String fileId = rollbackRequest.getFileId().get();
+          String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
+          // collect all log files that is supposed to be deleted with this rollback
+          Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
+              FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
+              fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
+              .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+          Map logFilesToBeDeleted = new HashMap<>();
+          for (Map.Entry fileToBeDeleted : writtenLogFileSizeMap.entrySet()) {
+            logFilesToBeDeleted.put(fileToBeDeleted.getKey().getPath().toString(), fileToBeDeleted.getValue());
+          }
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), fileId, latestBaseInstant,
+              Collections.EMPTY_LIST, logFilesToBeDeleted);
+        }
+        default:
+          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
       }
-      default:
-        throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
-    }
+    }, numPartitions).stream().collect(Collectors.toList());
   }
 
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                         String commit, String partitionPath, boolean doDelete) throws IOException {
-    LOG.info("Cleaning path " + partitionPath);
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+                                               String commit, String partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
     String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    SerializablePathFilter filter = (path) -> {
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(String commit, String partitionPath, FileSystem fs) throws IOException {
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    BaseRollbackHelper.SerializablePathFilter filter = (path) -> {
       if (path.toString().endsWith(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
@@ -188,62 +149,6 @@ public class ListingBasedRollbackHelper implements Serializable {
       }
       return false;
     };
-
-    final Map results = new HashMap<>();
-    FileSystem fs = metaClient.getFs();
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                   String commit, String partitionPath, boolean doDelete) throws IOException {
-    final Map results = new HashMap<>();
-    LOG.info("Cleaning path " + partitionPath);
-    FileSystem fs = metaClient.getFs();
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  private Map generateHeader(String commit) {
-    // generate metadata
-    Map header = new HashMap<>(3);
-    header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
-    header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
-    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
-        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
-    return header;
-  }
-
-  public interface SerializablePathFilter extends PathFilter, Serializable {
-
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
new file mode 100644
index 000000000..266fa39cb
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Listing based rollback strategy to fetch list of {@link HoodieRollbackRequest}s.
+ */
+public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecutor.RollbackStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackStrategy.class);
+
+  protected final HoodieTable table;
+  protected final HoodieEngineContext context;
+  protected final HoodieWriteConfig config;
+  protected final String instantTime;
+
+  public ListingBasedRollbackStrategy(HoodieTable table,
+                                      HoodieEngineContext context,
+                                      HoodieWriteConfig config,
+                                      String instantTime) {
+    this.table = table;
+    this.context = context;
+    this.config = config;
+    this.instantTime = instantTime;
+  }
+
+  @Override
+  public List getRollbackRequests(HoodieInstant instantToRollback) {
+    try {
+      List rollbackRequests = null;
+      if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
+        rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
+            table.getMetaClient().getBasePath(), config);
+      } else {
+        rollbackRequests = RollbackUtils
+            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context);
+      }
+      List listingBasedRollbackRequests = new ListingBasedRollbackHelper(table.getMetaClient(), config)
+          .getRollbackRequestsForRollbackPlan(context, instantToRollback, rollbackRequests);
+      return listingBasedRollbackRequests;
+    } catch (IOException e) {
+      LOG.error("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e);
+      throw new HoodieRollbackException("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e);
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 1bfd4b165..9d04e3036 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -18,41 +18,38 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
 /**
  * Performs rollback using marker files generated during the write..
  */
-public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
+public class MarkerBasedRollbackStrategy implements BaseRollbackPlanActionExecutor.RollbackStrategy {
 
   private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
 
@@ -74,72 +71,46 @@ public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.R
     this.instantTime = instantTime;
   }
 
-  protected HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws IOException {
-    LOG.info("Rolling back by deleting the merged base file:" + mergedBaseFilePath);
-    return deleteBaseFile(mergedBaseFilePath);
+  @Override
+  public List getRollbackRequests(HoodieInstant instantToRollback) {
+    try {
+      List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
+          table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
+      int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
+      return context.map(markerPaths, markerFilePath -> {
+        String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
+        IOType type = IOType.valueOf(typeStr);
+        switch (type) {
+          case MERGE:
+          case CREATE:
+            String fileToDelete = WriteMarkers.stripMarkerSuffix(markerFilePath);
+            Path fullDeletePath = new Path(basePath, fileToDelete);
+            String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
+            return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, EMPTY_STRING,
+                Collections.singletonList(fullDeletePath.toString()),
+                Collections.emptyMap());
+          case APPEND:
+            return getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
+          default:
+            throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
+        }
+      }, parallelism).stream().collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
+    }
   }
 
-  protected HoodieRollbackStat undoCreate(String createdBaseFilePath) throws IOException {
-    LOG.info("Rolling back by deleting the created base file:" + createdBaseFilePath);
-    return deleteBaseFile(createdBaseFilePath);
-  }
-
-  private HoodieRollbackStat deleteBaseFile(String baseFilePath) throws IOException {
-    Path fullDeletePath = new Path(basePath, baseFilePath);
-    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
-    boolean isDeleted = table.getMetaClient().getFs().delete(fullDeletePath);
-    return HoodieRollbackStat.newBuilder()
-        .withPartitionPath(partitionPath)
-        .withDeletedFileResult(baseFilePath, isDeleted)
-        .build();
-  }
-
-  protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant instantToRollback) throws IOException, InterruptedException {
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException {
     Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
     String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
-    final Map writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-
-    HoodieLogFormat.Writer writer = null;
-    try {
-      Path partitionFullPath = FSUtils.getPartitionPath(basePath, partitionPath);
-
-      if (!table.getMetaClient().getFs().exists(partitionFullPath)) {
-        return HoodieRollbackStat.newBuilder()
-            .withPartitionPath(partitionPath)
-            .build();
-      }
-      writer = HoodieLogFormat.newWriterBuilder()
-          .onParentPath(partitionFullPath)
-          .withFileId(fileId)
-          .overBaseCommit(baseCommitTime)
-          .withFs(table.getMetaClient().getFs())
-          .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-      // generate metadata
-      Map header = RollbackUtils.generateHeader(instantToRollback.getTimestamp(), instantTime);
-      // if update belongs to an existing log file
-      writer.appendBlock(new HoodieCommandBlock(header));
-    } finally {
-      try {
-        if (writer != null) {
-          writer.close();
-        }
-      } catch (IOException io) {
-        throw new HoodieIOException("Error closing append of rollback block..", io);
-      }
+    Map writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
+    Map writtenLogFileStrSizeMap = new HashMap<>();
+    for (Map.Entry entry : writtenLogFileSizeMap.entrySet()) {
+      writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue());
     }
-
-    // the information of files appended to is required for metadata sync
-    Map filesToNumBlocksRollback = Collections.singletonMap(
-        table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-        1L);
-
-    return HoodieRollbackStat.newBuilder()
-        .withPartitionPath(partitionPath)
-        .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-        .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build();
+    return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap);
   }
 
   /**
@@ -151,41 +122,10 @@ public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.R
    * @return Map
    * @throws IOException
    */
-  protected Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
+  private Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
     // collect all log files that is supposed to be deleted with this rollback
     return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
         FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
         .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
   }
-
-  @Override
-  public List execute(HoodieInstant instantToRollback) {
-    try {
-      List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
-          table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
-      int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
-      context.setJobStatus(this.getClass().getSimpleName(), "Rolling back using marker files");
-      return context.mapToPairAndReduceByKey(markerPaths, markerFilePath -> {
-        String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
-        IOType type = IOType.valueOf(typeStr);
-        HoodieRollbackStat rollbackStat;
-        switch (type) {
-          case MERGE:
-            rollbackStat = undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
-            break;
-          case APPEND:
-            rollbackStat = undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
-            break;
-          case CREATE:
-            rollbackStat = undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
-            break;
-          default:
-            throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
-        }
-        return new ImmutablePair<>(rollbackStat.getPartitionPath(), rollbackStat);
-      }, RollbackUtils::mergeRollbackStat, parallelism);
-    } catch (Exception e) {
-      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
-    }
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index 87d26281d..23af44552 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -19,19 +19,18 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -60,16 +59,7 @@ public class MergeOnReadRollbackActionExecutor executeRollback() {
+  protected List executeRollback(HoodieRollbackPlan hoodieRollbackPlan) {
     HoodieTimer rollbackTimer = new HoodieTimer();
     rollbackTimer.startTimer();
 
@@ -96,7 +86,7 @@ public class MergeOnReadRollbackActionExecutor executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
-    List rollbackRequests;
-    try {
-      rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
-    } catch (IOException e) {
-      throw new HoodieIOException("Error generating rollback requests by file listing.", e);
-    }
-    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index d213fb18f..6ad4e1c98 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -22,17 +22,20 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -51,6 +54,20 @@ public class RollbackUtils {
 
   private static final Logger LOG = LogManager.getLogger(RollbackUtils.class);
 
+  /**
+   * Get Latest version of Rollback plan corresponding to a clean instant.
+   * @param metaClient  Hoodie Table Meta Client
+   * @param rollbackInstant Instant referring to rollback action
+   * @return Rollback plan corresponding to rollback instant
+   * @throws IOException
+   */
+  static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant)
+      throws IOException {
+    // TODO: add upgrade step if required.
+    return TimelineMetadataUtils.deserializeAvroMetadata(
+        metaClient.getActiveTimeline().readRollbackInfoAsBytes(rollbackInstant).get(), HoodieRollbackPlan.class);
+  }
+
   static Map generateHeader(String instantToRollback, String rollbackInstantTime) {
     // generate metadata
     Map header = new HashMap<>(3);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializableHoodieRollbackRequest.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializableHoodieRollbackRequest.java
new file mode 100644
index 000000000..acd1c50ba
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializableHoodieRollbackRequest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieRollbackRequest in HoodieRollbackPlan (avro pojo) is not operable direclty within spark parallel engine.
+ * Hence converting the same to this {@link SerializableHoodieRollbackRequest} and then using it within spark.parallelize.
+ */
+public class SerializableHoodieRollbackRequest {
+
+  private final String partitionPath;
+  private final String fileId;
+  private final String latestBaseInstant;
+  private final List filesToBeDeleted = new ArrayList<>();
+  private final Map logBlocksToBeDeleted = new HashMap<>();
+
+  public SerializableHoodieRollbackRequest(HoodieRollbackRequest rollbackRequest) {
+    this.partitionPath = rollbackRequest.getPartitionPath();
+    this.fileId = rollbackRequest.getFileId();
+    this.latestBaseInstant = rollbackRequest.getLatestBaseInstant();
+    this.filesToBeDeleted.addAll(rollbackRequest.getFilesToBeDeleted());
+    this.logBlocksToBeDeleted.putAll(rollbackRequest.getLogBlocksToBeDeleted());
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public String getFileId() {
+    return fileId;
+  }
+
+  public String getLatestBaseInstant() {
+    return latestBaseInstant;
+  }
+
+  public List getFilesToBeDeleted() {
+    return filesToBeDeleted;
+  }
+
+  public Map getLogBlocksToBeDeleted() {
+    return logBlocksToBeDeleted;
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 66b7e78d4..174122c68 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -86,6 +86,16 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism) {
+    return data.stream().parallel()
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
   @Override
   public  List flatMap(List data, SerializableFunction> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 27571bcdb..93785b919 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -53,6 +54,7 @@ import org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecuto
 import org.apache.hudi.table.action.commit.FlinkMergeHelper;
 import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 
 import org.slf4j.Logger;
@@ -298,6 +300,12 @@ public class HoodieFlinkCopyOnWriteTable extends
     return new FlinkScheduleCleanActionExecutor(context, config, this, instantTime, extraMetadata).execute();
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
+                                                     boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
     return new FlinkCleanActionExecutor(context, config, this, cleanInstantTime).execute();
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index 461427098..f4a4b0eb4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -37,6 +38,7 @@ import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExe
 import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
 import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
 import java.util.List;
@@ -106,6 +108,12 @@ public class HoodieFlinkMergeOnReadTable
         + "should not invoke directly through HoodieFlinkMergeOnReadTable");
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
+                                                     boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   @Override
   public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
     return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index cb024c603..284d3bcdf 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -27,8 +28,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
+import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
 import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
 
 import java.util.List;
 
@@ -45,7 +47,8 @@ public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
   @Override
   List getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option commitInstantOpt,
                                                      List rollbackRequests) {
-    return new ListingBasedRollbackHelper(metaClient, config)
-        .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
+    List hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
+        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
+    return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
   }
 }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index f7a28e283..4cdbff264 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -69,6 +70,16 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism) {
+    return data.stream().parallel()
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
   @Override
   public  List flatMap(List data, SerializableFunction> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 7715bf965..72d63d5a0 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
@@ -50,6 +51,7 @@ import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor
 import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
@@ -177,6 +179,12 @@ public class HoodieJavaCopyOnWriteTable extends H
     throw new HoodieNotSupportedException("RollbackBootstrap is not supported yet");
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
+                                                     boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   @Override
   public Option scheduleCleaning(HoodieEngineContext context, String instantTime, Option> extraMetadata) {
     return new JavaScheduleCleanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index ad1d7cd92..de06ea4b4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -82,6 +82,13 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
     }).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
   }
 
+  @Override
+  public  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism) {
+    return javaSparkContext.parallelize(data, parallelism).mapToPair(pair -> new Tuple2(pair.getLeft(), pair.getRight()))
+        .reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
+  }
+
   @Override
   public  List flatMap(List data, SerializableFunction> func, int parallelism) {
     return javaSparkContext.parallelize(data, parallelism).flatMap(x -> func.apply(x).iterator()).collect();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 26d14cfa9..c2770a784 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -65,6 +66,7 @@ import org.apache.hudi.table.action.commit.SparkMergeHelper;
 import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 import org.apache.log4j.LogManager;
@@ -187,6 +189,13 @@ public class HoodieSparkCopyOnWriteTable extends
     return new SparkCleanPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context,
+                                                              String instantTime,
+                                                              HoodieInstant instantToRollback, boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   public Iterator> handleUpdate(String instantTime, String partitionPath, String fileId,
       Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
     // these are updates
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 2db4eeb70..ee66d7b0a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -49,6 +50,7 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExec
 import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
 import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
 import org.apache.spark.api.java.JavaRDD;
@@ -142,6 +144,13 @@ public class HoodieSparkMergeOnReadTable extends
     new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context,
+                                                     String instantTime,
+                                                     HoodieInstant instantToRollback, boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   @Override
   public HoodieRollbackMetadata rollback(HoodieEngineContext context,
                                          String rollbackInstantTime,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
index 9c6ec6e70..7d60b28e0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
@@ -48,20 +48,23 @@ public class SparkCopyOnWriteRestoreActionExecutor getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option commitInstantOpt,
                                                      List rollbackRequests) {
-    return new ListingBasedRollbackHelper(metaClient, config)
-        .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
+    List hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
+        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
+    return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 69cc25feb..2eb2f380e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -1282,7 +1282,9 @@ public class TestCleaner extends HoodieClientTestBase {
     table.getActiveTimeline().transitionRequestedToInflight(
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
     metaClient.reloadActiveTimeline();
-    table.rollback(context, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
+    HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000");
+    table.scheduleRollback(context, "001", rollbackInstant, false);
+    table.rollback(context, "001", rollbackInstant, true);
     final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
     assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 810733c64..2e93602c4 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.rollback;
 
 import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileGroup;
@@ -79,13 +80,16 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
         .withBaseFilesInPartition(p1, "id21")
         .withBaseFilesInPartition(p2, "id22");
 
-    HoodieTable table = this.getHoodieTable(metaClient, getConfig());
+    HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(false).build();
+    HoodieTable table = this.getHoodieTable(metaClient, writeConfig);
     HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
 
     // execute CopyOnWriteRollbackActionExecutor with filelisting mode
+    BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false);
+    HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
     CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
-    assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    List hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
+    List hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback(rollbackPlan);
 
     // assert hoodieRollbackStats
     assertEquals(hoodieRollbackStats.size(), 3);
@@ -96,14 +100,14 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
           assertEquals(0, stat.getFailedDeleteFiles().size());
           assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
           assertEquals(testTable.forCommit("002").getBaseFilePath(p1, "id21").toString(),
-              stat.getSuccessDeleteFiles().get(0));
+              this.fs.getScheme() + ":" + stat.getSuccessDeleteFiles().get(0));
           break;
         case p2:
           assertEquals(1, stat.getSuccessDeleteFiles().size());
           assertEquals(0, stat.getFailedDeleteFiles().size());
           assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
           assertEquals(testTable.forCommit("002").getBaseFilePath(p2, "id22").toString(),
-              stat.getSuccessDeleteFiles().get(0));
+              this.fs.getScheme() + ":" + stat.getSuccessDeleteFiles().get(0));
           break;
         case p3:
           assertEquals(0, stat.getSuccessDeleteFiles().size());
@@ -150,7 +154,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     HoodieTable table = this.getHoodieTable(metaClient, cfg);
     performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
   }
-  
+
   private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfig cfg, HoodieTable table,
                                           List firstPartitionCommit2FileSlices,
                                           List secondPartitionCommit2FileSlices) throws IOException {
@@ -162,12 +166,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
       commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
     }
 
+    BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false);
+    HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
     CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
-    if (!isUsingMarkers) {
-      assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    } else {
-      assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    }
     Map rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
 
     //3. assert the rollback stat
@@ -175,9 +177,9 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     for (Map.Entry entry : rollbackMetadata.entrySet()) {
       HoodieRollbackPartitionMetadata meta = entry.getValue();
       assertTrue(meta.getFailedDeleteFiles() == null
-              || meta.getFailedDeleteFiles().size() == 0);
+          || meta.getFailedDeleteFiles().size() == 0);
       assertTrue(meta.getSuccessDeleteFiles() == null
-              || meta.getSuccessDeleteFiles().size() == 1);
+          || meta.getSuccessDeleteFiles().size() == 1);
     }
 
     //4. assert filegroup after rollback, and compare to the rollbackstat
@@ -187,15 +189,11 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     List firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
     assertEquals(1, firstPartitionRollBack1FileSlices.size());
 
-    if (!isUsingMarkers) {
-      firstPartitionCommit2FileSlices.removeAll(firstPartitionRollBack1FileSlices);
-      assertEquals(1, firstPartitionCommit2FileSlices.size());
-      assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
-          rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0));
-    } else {
-      assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
-          String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
-    }
+    firstPartitionCommit2FileSlices.removeAll(firstPartitionRollBack1FileSlices);
+    assertEquals(1, firstPartitionCommit2FileSlices.size());
+    assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
+        this.fs.getScheme() + ":" + rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0));
+
 
     // assert the second partition file group and file slice
     List secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
@@ -204,15 +202,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     assertEquals(1, secondPartitionRollBack1FileSlices.size());
 
     // assert the second partition rollback file is equals rollBack1SecondPartitionStat
-    if (!isUsingMarkers) {
-      secondPartitionCommit2FileSlices.removeAll(secondPartitionRollBack1FileSlices);
-      assertEquals(1, secondPartitionCommit2FileSlices.size());
-      assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
-          rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0));
-    } else {
-      assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
-          String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
-    }
+    secondPartitionCommit2FileSlices.removeAll(secondPartitionRollBack1FileSlices);
+    assertEquals(1, secondPartitionCommit2FileSlices.size());
+    assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
+        this.fs.getScheme() + ":" + rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0));
 
     assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, commitInstant.getTimestamp()).doesMarkerDirExist());
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 5d269cf6a..af77dc753 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -61,7 +61,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
     initPath();
     initSparkContexts();
     //just generate tow partitions
-    dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+    dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
     initFileSystem();
     initMetaClient();
   }
@@ -89,6 +89,9 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
 
     //2. rollback
     HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
+    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false);
+    mergeOnReadRollbackPlanActionExecutor.execute().get();
     MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
         context,
         cfg,
@@ -96,13 +99,6 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
         "003",
         rollBackInstant,
         true);
-    // assert is filelist mode
-    if (!isUsingMarkers) {
-      assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    } else {
-      assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    }
-
     //3. assert the rollback stat
     Map rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
     assertEquals(2, rollbackMetadata.size());
@@ -145,15 +141,13 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
   public void testFailForCompletedInstants() {
     Assertions.assertThrows(IllegalArgumentException.class, () -> {
       HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
-      new MergeOnReadRollbackActionExecutor(
-          context,
-          getConfigBuilder().build(),
+      new MergeOnReadRollbackActionExecutor(context, getConfigBuilder().build(),
           getHoodieTable(metaClient, getConfigBuilder().build()),
           "003",
           rollBackInstant,
           true,
           true,
-          true);
+          true).execute();
     });
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 94fa6974d..8b23cf257 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.functional;
 
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -32,6 +33,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
 import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
@@ -93,8 +96,13 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
         .withMarkerFile("partA", f2, IOType.CREATE);
 
     // when
-    List stats = new MarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
-        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
+    HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);
+    List rollbackRequests = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+        "002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
+
+    List stats = new BaseRollbackHelper(hoodieTable.getMetaClient(), getConfig()).performRollback(context,
+        new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
+        rollbackRequests);
 
     // then: ensure files are deleted correctly, non-existent files reported as failed deletes
     assertEquals(2, stats.size());
@@ -175,9 +183,14 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
     writeStatuses = writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
     writeStatuses.collect();
 
+    HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);
+    List rollbackRequests = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+        "003").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
+
     // rollback 2nd commit and ensure stats reflect the info.
-    return new MarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
-        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
+    return new BaseRollbackHelper(hoodieTable.getMetaClient(), getConfig()).performRollback(context,
+        new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"),
+        rollbackRequests);
   }
 
 }
diff --git a/hudi-common/src/main/avro/HoodieRollbackPlan.avsc b/hudi-common/src/main/avro/HoodieRollbackPlan.avsc
new file mode 100644
index 000000000..99e0755bd
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieRollbackPlan.avsc
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "org.apache.hudi.avro.model",
+  "type": "record",
+  "name": "HoodieRollbackPlan",
+  "fields": [
+     {
+           "name": "instantToRollback",
+           "doc": "Hoodie instant that needs to be rolled back",
+           "type": ["null", "HoodieInstantInfo"],
+           "default": null
+    },
+    {
+      "name": "RollbackRequests",
+      "type":["null", {
+                "type":"array",
+                "items":{
+                 "type": "record",
+                         "name": "HoodieRollbackRequest",
+                         "fields": [
+                            {"name": "partitionPath", "type": "string"},
+                            {"name": "fileId",
+                              "type":["null", "string"],
+                              "default": null
+                             },
+                            {"name": "latestBaseInstant",
+                              "type":["null", "string"],
+                              "default": null
+                            },
+                            {"name": "filesToBeDeleted",
+                             "default": [],
+                             "type": {
+                                       "type": "array",
+                                       "default": [],
+                                       "items": "string"
+                                    }
+                            },
+                            {"name": "logBlocksToBeDeleted",
+                             "type": ["null", {
+                               "type": "map",
+                               "doc": "Log blocks that need to be deleted as part of the rollback",
+                               "values": {
+                                   "type": "long",
+                                   "doc": "Size of this file/block in bytes"
+                               }
+                             }],
+                             "default":null
+                            }
+                         ]
+                }
+                }],
+       "default" : null
+    },
+    {
+       "name":"version",
+       "type":["int", "null"],
+       "default": 1
+    }
+  ]
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 8ea6a43e0..10c7ced07 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.List;
 import java.util.Map;
@@ -60,6 +61,9 @@ public abstract class HoodieEngineContext {
   public abstract  List mapToPairAndReduceByKey(
       List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, int parallelism);
 
+  public abstract  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism);
+
   public abstract  List flatMap(List data, SerializableFunction> func, int parallelism);
 
   public abstract  void foreach(List data, SerializableConsumer consumer, int parallelism);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 0aeb9d8c0..1c935ff06 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -67,6 +68,16 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism) {
+    return data.stream().parallel()
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
   @Override
   public  List flatMap(List data, SerializableFunction> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index e6abed677..5b60b033c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -67,7 +67,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
       CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
       INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION,
       INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
-      ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
+      ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
       REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
   private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
   protected HoodieTableMetaClient metaClient;
@@ -229,6 +229,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
   }
 
+  public Option readRollbackInfoAsBytes(HoodieInstant instant) {
+    // Rollback metadata are always stored only in timeline .hoodie
+    return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
+  }
+
   //-----------------------------------------------------------------
   //      BEGIN - COMPACTION RELATED META-DATA MANAGEMENT.
   //-----------------------------------------------------------------
@@ -339,6 +344,37 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     return inflight;
   }
 
+  /**
+   * Transition Rollback State from inflight to Committed.
+   *
+   * @param inflightInstant Inflight instant
+   * @param data Extra Metadata
+   * @return commit instant
+   */
+  public HoodieInstant transitionRollbackInflightToComplete(HoodieInstant inflightInstant, Option data) {
+    ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
+    ValidationUtils.checkArgument(inflightInstant.isInflight());
+    HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, inflightInstant.getTimestamp());
+    // Then write to timeline
+    transitionState(inflightInstant, commitInstant, data);
+    return commitInstant;
+  }
+
+  /**
+   * Transition Rollback State from requested to inflight.
+   *
+   * @param requestedInstant requested instant
+   * @param data Optional data to be stored
+   * @return commit instant
+   */
+  public HoodieInstant transitionRollbackRequestedToInflight(HoodieInstant requestedInstant, Option data) {
+    ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
+    ValidationUtils.checkArgument(requestedInstant.isRequested());
+    HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, ROLLBACK_ACTION, requestedInstant.getTimestamp());
+    transitionState(requestedInstant, inflight, data);
+    return inflight;
+  }
+
   /**
    * Transition replace requested file to replace inflight.
    *
@@ -497,6 +533,13 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     createFileInMetaPath(instant.getFileName(), content, false);
   }
 
+  public void saveToRollbackRequested(HoodieInstant instant, Option content) {
+    ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
+    ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));
+    // Plan is stored in meta path
+    createFileInMetaPath(instant.getFileName(), content, false);
+  }
+
   private void createFileInMetaPath(String filename, Option content, boolean allowOverwrite) {
     Path fullPath = new Path(metaClient.getMetaPath(), filename);
     if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 65376b48e..a8df62c64 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -147,7 +147,8 @@ public class HoodieInstant implements Serializable, Comparable {
               : HoodieTimeline.makeCleanerFileName(timestamp);
     } else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) {
       return isInflight() ? HoodieTimeline.makeInflightRollbackFileName(timestamp)
-          : HoodieTimeline.makeRollbackFileName(timestamp);
+          : isRequested() ? HoodieTimeline.makeRequestedRollbackFileName(timestamp)
+              : HoodieTimeline.makeRollbackFileName(timestamp);
     } else if (HoodieTimeline.SAVEPOINT_ACTION.equals(action)) {
       return isInflight() ? HoodieTimeline.makeInflightSavePointFileName(timestamp)
           : HoodieTimeline.makeSavePointFileName(timestamp);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 1e366147a..b473c7b1f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -73,6 +73,7 @@ public interface HoodieTimeline extends Serializable {
   String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION;
   String REQUESTED_CLEAN_EXTENSION = "." + CLEAN_ACTION + REQUESTED_EXTENSION;
   String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION;
+  String REQUESTED_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + REQUESTED_EXTENSION;
   String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION;
   String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION);
   String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX);
@@ -363,6 +364,10 @@ public interface HoodieTimeline extends Serializable {
     return StringUtils.join(instant, HoodieTimeline.ROLLBACK_EXTENSION);
   }
 
+  static String makeRequestedRollbackFileName(String instant) {
+    return StringUtils.join(instant, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION);
+  }
+
   static String makeInflightRollbackFileName(String instant) {
     return StringUtils.join(instant, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index a50c2998a..32e42ee58 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
 import org.apache.hudi.common.HoodieRollbackStat;
@@ -109,6 +110,10 @@ public class TimelineMetadataUtils {
     return serializeAvroMetadata(cleanPlan, HoodieCleanerPlan.class);
   }
 
+  public static Option serializeRollbackPlan(HoodieRollbackPlan rollbackPlan) throws IOException {
+    return serializeAvroMetadata(rollbackPlan, HoodieRollbackPlan.class);
+  }
+
   public static Option serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException {
     return serializeAvroMetadata(metadata, HoodieCleanMetadata.class);
   }