From b8dad628e584e0acfa8ef6ba0056f7cb6efafad0 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Thu, 16 Sep 2021 11:16:06 -0400 Subject: [PATCH] [HUDI-2422] Adding rollback plan and rollback requested instant (#3651) - This patch introduces rollback plan and rollback.requested instant. Rollback will be done in two phases, namely rollback plan and rollback action. In planning, we prepare the rollback plan and serialize it to rollback.requested. In the rollback action phase, we fetch details from the plan and just delete the files as per the plan. This will ensure final rollback commit metadata will contain all files that got rolled back even if rollback failed midway and retried again. --- .../client/AbstractHoodieWriteClient.java | 26 +- .../org/apache/hudi/table/HoodieTable.java | 21 ++ .../rollback/BaseRollbackActionExecutor.java | 108 +++++--- .../action/rollback/BaseRollbackHelper.java | 217 ++++++++++++++++ .../BaseRollbackPlanActionExecutor.java | 131 ++++++++++ .../CopyOnWriteRollbackActionExecutor.java | 21 +- .../rollback/ListingBasedRollbackHelper.java | 235 ++++++------------ .../ListingBasedRollbackStrategy.java | 76 ++++++ .../rollback/MarkerBasedRollbackStrategy.java | 144 ++++------- .../MergeOnReadRollbackActionExecutor.java | 27 +- .../table/action/rollback/RollbackUtils.java | 17 ++ .../SerializableHoodieRollbackRequest.java | 67 +++++ .../common/HoodieFlinkEngineContext.java | 10 + .../table/HoodieFlinkCopyOnWriteTable.java | 8 + .../table/HoodieFlinkMergeOnReadTable.java | 8 + .../upgrade/ZeroToOneUpgradeHandler.java | 9 +- .../common/HoodieJavaEngineContext.java | 11 + .../table/HoodieJavaCopyOnWriteTable.java | 8 + .../common/HoodieSparkEngineContext.java | 7 + .../table/HoodieSparkCopyOnWriteTable.java | 9 + .../table/HoodieSparkMergeOnReadTable.java | 9 + ...SparkCopyOnWriteRestoreActionExecutor.java | 13 +- ...SparkMergeOnReadRestoreActionExecutor.java | 31 ++- .../upgrade/ZeroToOneUpgradeHandler.java | 7 +- .../org/apache/hudi/table/TestCleaner.java | 4 +- ...TestCopyOnWriteRollbackActionExecutor.java | 55 ++-- ...TestMergeOnReadRollbackActionExecutor.java | 18 +- .../TestMarkerBasedRollbackStrategy.java | 21 +- .../src/main/avro/HoodieRollbackPlan.avsc | 76 ++++++ .../common/engine/HoodieEngineContext.java | 4 + .../engine/HoodieLocalEngineContext.java | 11 + .../table/timeline/HoodieActiveTimeline.java | 45 +++- .../common/table/timeline/HoodieInstant.java | 3 +- .../common/table/timeline/HoodieTimeline.java | 5 + .../table/timeline/TimelineMetadataUtils.java | 5 + 35 files changed, 1048 insertions(+), 419 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializableHoodieRollbackRequest.java create mode 100644 hudi-common/src/main/avro/HoodieRollbackPlan.avsc diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 9650ddaeb..6fcce1b0b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.callback.HoodieWriteCommitCallback; import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; @@ -590,12 +591,19 @@ public abstract class AbstractHoodieWriteClient HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) .findFirst()); if (commitInstantOpt.isPresent()) { - HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true); - if (timerContext != null) { - long durationInMs = metrics.getDurationInMs(timerContext.stop()); - metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); + LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime); + Option rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false); + if (rollbackPlanOption.isPresent()) { + // execute rollback + HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true); + if (timerContext != null) { + long durationInMs = metrics.getDurationInMs(timerContext.stop()); + metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); + } + return true; + } else { + throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime); } - return true; } else { LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"); return false; @@ -776,7 +784,9 @@ public abstract class AbstractHoodieWriteClient table) { - table.rollback(context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false); + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + table.scheduleRollback(context, commitTime, inflightInstant, false); + table.rollback(context, commitTime, inflightInstant, false); table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } @@ -978,7 +988,9 @@ public abstract class AbstractHoodieWriteClient table) { - table.rollback(context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false); + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + table.scheduleRollback(context, commitTime, inflightInstant, false); + table.rollback(context, commitTime, inflightInstant, false); table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index ad40c8ec7..f701e4036 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; @@ -316,6 +317,13 @@ public abstract class HoodieTable implem return getActiveTimeline().getCleanerTimeline(); } + /** + * Get rollback timeline. + */ + public HoodieTimeline getRollbackTimeline() { + return getActiveTimeline().getRollbackTimeline(); + } + /** * Get only the completed (no-inflights) savepoint timeline. */ @@ -417,6 +425,19 @@ public abstract class HoodieTable implem */ public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime); + /** + * Schedule rollback for the instant time. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for scheduling rollback + * @param instantToRollback instant to be rolled back + * @return HoodieRollbackPlan containing info on rollback. + */ + public abstract Option scheduleRollback(HoodieEngineContext context, + String instantTime, + HoodieInstant instantToRollback, + boolean skipTimelinePublish); + /** * Rollback the (inflight/committed) record changes with the given commit time. *
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 7dbbaa70e..3dc585121 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.rollback;
 
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
@@ -43,7 +44,6 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -53,11 +53,6 @@ public abstract class BaseRollbackActionExecutor execute(HoodieInstant instantToRollback);
-  }
-
   protected final HoodieInstant instantToRollback;
   protected final boolean deleteInstants;
   protected final boolean skipTimelinePublish;
@@ -92,30 +87,74 @@ public abstract class BaseRollbackActionExecutor executeRollback(HoodieRollbackPlan hoodieRollbackPlan) throws IOException;
 
-  protected abstract List executeRollback() throws IOException;
+  private HoodieRollbackMetadata runRollback(HoodieTable table, HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) {
+    ValidationUtils.checkArgument(rollbackInstant.getState().equals(HoodieInstant.State.REQUESTED)
+        || rollbackInstant.getState().equals(HoodieInstant.State.INFLIGHT));
+    try {
+      final HoodieInstant inflightInstant;
+      final HoodieTimer timer = new HoodieTimer();
+      timer.startTimer();
+      if (rollbackInstant.isRequested()) {
+        inflightInstant = table.getActiveTimeline().transitionRollbackRequestedToInflight(rollbackInstant,
+            TimelineMetadataUtils.serializeRollbackPlan(rollbackPlan));
+      } else {
+        inflightInstant = rollbackInstant;
+      }
 
-  protected abstract List executeRollbackUsingFileListing(HoodieInstant instantToRollback);
+      HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
+      List stats = doRollbackAndGetStats(rollbackPlan);
+      HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
+          instantTime,
+          Option.of(rollbackTimer.endTimer()),
+          Collections.singletonList(instantToRollback),
+          stats);
+      if (!skipTimelinePublish) {
+        finishRollback(inflightInstant, rollbackMetadata);
+      }
+
+      // Finally, remove the markers post rollback.
+      WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
+          .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+
+      return rollbackMetadata;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to rollback commit ", e);
+    }
+  }
 
   @Override
   public HoodieRollbackMetadata execute() {
-    HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
-    List stats = doRollbackAndGetStats();
-    HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
-        instantTime,
-        Option.of(rollbackTimer.endTimer()),
-        Collections.singletonList(instantToRollback),
-        stats);
-    if (!skipTimelinePublish) {
-      finishRollback(rollbackMetadata);
+    table.getMetaClient().reloadActiveTimeline();
+    List rollBackInstants = table.getRollbackTimeline()
+        .filterInflightsAndRequested().getInstants().collect(Collectors.toList());
+    if (rollBackInstants.isEmpty()) {
+      throw new HoodieRollbackException("No Requested Rollback Instants found to execute rollback ");
+    }
+    HoodieInstant rollbackInstant = null;
+    for (HoodieInstant instant : rollBackInstants) {
+      if (instantTime.equals(instant.getTimestamp())) {
+        rollbackInstant = instant;
+        break;
+      }
+    }
+    if (rollbackInstant != null) {
+      try {
+        HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant);
+        return runRollback(table, rollBackInstants.get(0), rollbackPlan);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to fetch rollback plan to rollback commit " + rollbackInstant.getTimestamp(), e);
+      }
+    } else {
+      throw new HoodieIOException("No inflight rollback instants found for commit time " + instantTime);
     }
-
-    // Finally, remove the markers post rollback.
-    WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
-        .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
-
-    return rollbackMetadata;
   }
 
   private void validateSavepointRollbacks() {
@@ -173,7 +212,7 @@ public abstract class BaseRollbackActionExecutor doRollbackAndGetStats() {
+  public List doRollbackAndGetStats(HoodieRollbackPlan hoodieRollbackPlan) {
     final String instantTimeToRollback = instantToRollback.getTimestamp();
     final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
         && !instantToRollback.isCompleted();
@@ -186,7 +225,7 @@ public abstract class BaseRollbackActionExecutor stats = executeRollback();
+      List stats = executeRollback(hoodieRollbackPlan);
       LOG.info("Rolled back inflight instant " + instantTimeToRollback);
       if (!isPendingCompaction) {
         rollBackIndex();
@@ -197,12 +236,19 @@ public abstract class BaseRollbackActionExecutor executeRollback(HoodieInstant instantToRollback, HoodieRollbackPlan rollbackPlan) {
+    return new BaseRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackPlan.getRollbackRequests());
+  }
+
+  protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
     try {
-      table.getActiveTimeline().createNewInstant(
-          new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, instantTime));
-      table.getActiveTimeline().saveAsComplete(
-          new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, instantTime),
+      table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
           TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
       LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
     } catch (IOException e) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
new file mode 100644
index 000000000..721ca77b4
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieRollbackException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Contains common methods to be used across engines for rollback operation.
+ */
+public class BaseRollbackHelper implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(BaseRollbackHelper.class);
+  protected static final String EMPTY_STRING = "";
+
+  protected final HoodieTableMetaClient metaClient;
+  protected final HoodieWriteConfig config;
+
+  public BaseRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+    this.metaClient = metaClient;
+    this.config = config;
+  }
+
+  /**
+   * Performs all rollback actions that we have collected in parallel.
+   */
+  public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                  List rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
+    // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize
+    // is failing with com.esotericsoftware.kryo.KryoException
+    // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
+    // related stack overflow post: https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as GenericData.Array.
+    List serializableRequests = rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
+    return context.reduceByKey(maybeDeleteAndCollectStats(context, instantToRollback, serializableRequests, true, parallelism),
+        RollbackUtils::mergeRollbackStat, parallelism);
+  }
+
+  /**
+   * Collect all file info that needs to be rollbacked.
+   */
+  public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                       List rollbackRequests) {
+    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
+    // If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize
+    // is failing with com.esotericsoftware.kryo.KryoException
+    // stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
+    // related stack overflow post: https://issues.apache.org/jira/browse/SPARK-3601. Avro deserializes list as GenericData.Array.
+    List serializableRequests = rollbackRequests.stream().map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
+    return context.reduceByKey(maybeDeleteAndCollectStats(context, instantToRollback, serializableRequests, false, parallelism),
+        RollbackUtils::mergeRollbackStat, parallelism);
+  }
+
+  /**
+   * May be delete interested files and collect stats or collect stats only.
+   *
+   * @param context           instance of {@link HoodieEngineContext} to use.
+   * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
+   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to be operated on.
+   * @param doDelete          {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes.
+   * @return stats collected with or w/o actual deletions.
+   */
+  List> maybeDeleteAndCollectStats(HoodieEngineContext context,
+                                                                    HoodieInstant instantToRollback,
+                                                                    List rollbackRequests,
+                                                                    boolean doDelete, int numPartitions) {
+    return context.flatMap(rollbackRequests, (SerializableFunction>>) rollbackRequest -> {
+      List filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
+      if (!filesToBeDeleted.isEmpty()) {
+        List rollbackStats = deleteFiles(metaClient, filesToBeDeleted, doDelete);
+        List> partitionToRollbackStats = new ArrayList<>();
+        rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
+        return partitionToRollbackStats.stream();
+      } else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
+        Map logFilesToBeDeleted = rollbackRequest.getLogBlocksToBeDeleted();
+        String fileId = rollbackRequest.getFileId();
+        String latestBaseInstant = rollbackRequest.getLatestBaseInstant();
+        FileSystem fs = metaClient.getFs();
+        // collect all log files that is supposed to be deleted with this rollback
+        Map writtenLogFileSizeMap = new HashMap<>();
+        for (Map.Entry entry : logFilesToBeDeleted.entrySet()) {
+          writtenLogFileSizeMap.put(fs.getFileStatus(new Path(entry.getKey())), entry.getValue());
+        }
+        HoodieLogFormat.Writer writer = null;
+        try {
+          writer = HoodieLogFormat.newWriterBuilder()
+              .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
+              .withFileId(fileId)
+              .overBaseCommit(latestBaseInstant)
+              .withFs(metaClient.getFs())
+              .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+
+          // generate metadata
+          if (doDelete) {
+            Map header = generateHeader(instantToRollback.getTimestamp());
+            // if update belongs to an existing log file
+            writer.appendBlock(new HoodieCommandBlock(header));
+          }
+        } catch (IOException | InterruptedException io) {
+          throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
+        } finally {
+          try {
+            if (writer != null) {
+              writer.close();
+            }
+          } catch (IOException io) {
+            throw new HoodieIOException("Error appending rollback block..", io);
+          }
+        }
+
+        // This step is intentionally done after writer is closed. Guarantees that
+        // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
+        // cloud-storage : HUDI-168
+        Map filesToNumBlocksRollback = Collections.singletonMap(
+            metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
+            1L
+        );
+        return Collections.singletonList(Pair.of(rollbackRequest.getPartitionPath(),
+            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                .withRollbackBlockAppendResults(filesToNumBlocksRollback)
+                .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build())).stream();
+      } else {
+        return Collections
+            .singletonList(Pair.of(rollbackRequest.getPartitionPath(),
+                HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
+                    .build())).stream();
+      }
+    }, numPartitions);
+  }
+
+  /**
+   * Common method used for cleaning out files during rollback.
+   */
+  protected List deleteFiles(HoodieTableMetaClient metaClient, List filesToBeDeleted, boolean doDelete) throws IOException {
+    return filesToBeDeleted.stream().map(fileToDelete -> {
+      String basePath = metaClient.getBasePath();
+      try {
+        Path fullDeletePath = new Path(fileToDelete);
+        String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
+        boolean isDeleted = true;
+        if (doDelete) {
+          isDeleted = metaClient.getFs().delete(fullDeletePath);
+        }
+        return HoodieRollbackStat.newBuilder()
+            .withPartitionPath(partitionPath)
+            .withDeletedFileResult(fullDeletePath.toString(), isDeleted)
+            .build();
+      } catch (IOException e) {
+        LOG.error("Fetching file status for ");
+        throw new HoodieIOException("Fetching file status for " + fileToDelete + " failed ", e);
+      }
+    }).collect(Collectors.toList());
+  }
+
+  protected Map generateHeader(String commit) {
+    // generate metadata
+    Map header = new HashMap<>(3);
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    return header;
+  }
+
+  public interface SerializablePathFilter extends PathFilter, Serializable {
+
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
new file mode 100644
index 000000000..24edde276
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base rollback plan action executor to assist in scheduling rollback requests. This phase serialized {@link HoodieRollbackPlan}
+ * to rollback.requested instant.
+ */
+public class BaseRollbackPlanActionExecutor extends BaseActionExecutor> {
+
+  private static final Logger LOG = LogManager.getLogger(BaseRollbackPlanActionExecutor.class);
+
+  protected final HoodieInstant instantToRollback;
+  private final boolean skipTimelinePublish;
+
+  public static final Integer ROLLBACK_PLAN_VERSION_1 = 1;
+  public static final Integer LATEST_ROLLBACK_PLAN_VERSION = ROLLBACK_PLAN_VERSION_1;
+
+  public BaseRollbackPlanActionExecutor(HoodieEngineContext context,
+                                        HoodieWriteConfig config,
+                                        HoodieTable table,
+                                        String instantTime,
+                                        HoodieInstant instantToRollback,
+                                        boolean skipTimelinePublish) {
+    super(context, config, table, instantTime);
+    this.instantToRollback = instantToRollback;
+    this.skipTimelinePublish = skipTimelinePublish;
+  }
+
+  /**
+   * Interface for RollbackStrategy. There are two types supported, listing based and marker based.
+   */
+  interface RollbackStrategy extends Serializable {
+
+    /**
+     * Fetch list of {@link HoodieRollbackRequest}s to be added to rollback plan.
+     * @param instantToRollback instant to be rolled back.
+     * @return list of {@link HoodieRollbackRequest}s to be added to rollback plan
+     */
+    List getRollbackRequests(HoodieInstant instantToRollback);
+  }
+
+  /**
+   * Fetch the Rollback strategy used.
+   *
+   * @return
+   */
+  private BaseRollbackPlanActionExecutor.RollbackStrategy getRollbackStrategy() {
+    if (config.shouldRollbackUsingMarkers()) {
+      return new MarkerBasedRollbackStrategy(table, context, config, instantTime);
+    } else {
+      return new ListingBasedRollbackStrategy(table, context, config, instantTime);
+    }
+  }
+
+  /**
+   * Creates a Rollback plan if there are files to be rolledback and stores them in instant file.
+   * Rollback Plan contains absolute file paths.
+   *
+   * @param startRollbackTime Rollback Instant Time
+   * @return Rollback Plan if generated
+   */
+  protected Option requestRollback(String startRollbackTime) {
+    final HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime);
+    try {
+      List rollbackRequests = new ArrayList<>();
+      if (!instantToRollback.isRequested()) {
+        rollbackRequests.addAll(getRollbackStrategy().getRollbackRequests(instantToRollback));
+      }
+      HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new HoodieInstantInfo(instantToRollback.getTimestamp(),
+          instantToRollback.getAction()), rollbackRequests, LATEST_ROLLBACK_PLAN_VERSION);
+      if (!skipTimelinePublish) {
+        if (table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp())) {
+          LOG.warn("Request Rollback found with instant time " + rollbackInstant + ", hence skipping scheduling rollback");
+        } else {
+          table.getActiveTimeline().saveToRollbackRequested(rollbackInstant, TimelineMetadataUtils.serializeRollbackPlan(rollbackPlan));
+          table.getMetaClient().reloadActiveTimeline();
+          LOG.info("Requesting Rollback with instant time " + rollbackInstant);
+        }
+      }
+      return Option.of(rollbackPlan);
+    } catch (IOException e) {
+      LOG.error("Got exception when saving rollback requested file", e);
+      throw new HoodieIOException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public Option execute() {
+    // Plan a new rollback action
+    return requestRollback(instantTime);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
index 44b5492e7..9187179ff 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -58,16 +59,7 @@ public class CopyOnWriteRollbackActionExecutor executeRollback() {
+  protected List executeRollback(HoodieRollbackPlan hoodieRollbackPlan) {
     HoodieTimer rollbackTimer = new HoodieTimer();
     rollbackTimer.startTimer();
 
@@ -87,7 +79,7 @@ public class CopyOnWriteRollbackActionExecutor executeRollbackUsingFileListing(HoodieInstant instantToRollback) {
-    List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(
-        context, table.getMetaClient().getBasePath(), config);
-    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackRequests);
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
index 849087222..b47136fa0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -19,21 +19,14 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.ImmutablePair;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,13 +36,15 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
 /**
  * Performs Rollback of Hoodie Tables.
  */
@@ -65,119 +60,85 @@ public class ListingBasedRollbackHelper implements Serializable {
   }
 
   /**
-   * Performs all rollback actions that we have collected in parallel.
+   * Collects info for Rollback plan.
    */
-  public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
-                                                  List rollbackRequests) {
-    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
-    context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
-    return context.mapToPairAndReduceByKey(rollbackRequests,
-        rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, true),
-        RollbackUtils::mergeRollbackStat,
-        parallelism);
-  }
-
-  /**
-   * Collect all file info that needs to be rollbacked.
-   */
-  public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
-                                                       List rollbackRequests) {
-    int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
-    context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
-    return context.mapToPairAndReduceByKey(rollbackRequests,
-        rollbackRequest -> maybeDeleteAndCollectStats(rollbackRequest, instantToRollback, false),
-        RollbackUtils::mergeRollbackStat,
-        parallelism);
+  public List getRollbackRequestsForRollbackPlan(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) {
+    int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
+    context.setJobStatus(this.getClass().getSimpleName(), "Creating Rollback Plan");
+    return getListingBasedRollbackRequests(context, instantToRollback, rollbackRequests, sparkPartitions);
   }
 
   /**
    * May be delete interested files and collect stats or collect stats only.
    *
+   * @param context           instance of {@link HoodieEngineContext} to use.
    * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested.
-   * @param doDelete          {@code true} if deletion has to be done.
-   *                          {@code false} if only stats are to be collected w/o performing any deletes.
+   * @param rollbackRequests  List of {@link ListingBasedRollbackRequest} to be operated on.
+   * @param numPartitions     number of spark partitions to use for parallelism.
    * @return stats collected with or w/o actual deletions.
    */
-  private Pair maybeDeleteAndCollectStats(ListingBasedRollbackRequest rollbackRequest,
-                                                                      HoodieInstant instantToRollback,
-                                                                      boolean doDelete) throws IOException {
-    switch (rollbackRequest.getType()) {
-      case DELETE_DATA_FILES_ONLY: {
-        final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(),
-            rollbackRequest.getPartitionPath(), doDelete);
-        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                .withDeletedFileResults(filesToDeletedStatus).build());
-      }
-      case DELETE_DATA_AND_LOG_FILES: {
-        final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete);
-        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                .withDeletedFileResults(filesToDeletedStatus).build());
-      }
-      case APPEND_ROLLBACK_BLOCK: {
-        String fileId = rollbackRequest.getFileId().get();
-        String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
-
-        // collect all log files that is supposed to be deleted with this rollback
-        Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
-            FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
-            fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
-            .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
-
-        HoodieLogFormat.Writer writer = null;
-        try {
-          writer = HoodieLogFormat.newWriterBuilder()
-              .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
-              .withFileId(fileId)
-              .overBaseCommit(latestBaseInstant)
-              .withFs(metaClient.getFs())
-              .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-          // generate metadata
-          if (doDelete) {
-            Map header = generateHeader(instantToRollback.getTimestamp());
-            // if update belongs to an existing log file
-            writer.appendBlock(new HoodieCommandBlock(header));
-          }
-        } catch (IOException | InterruptedException io) {
-          throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
-        } finally {
-          try {
-            if (writer != null) {
-              writer.close();
-            }
-          } catch (IOException io) {
-            throw new HoodieIOException("Error appending rollback block..", io);
-          }
+  private List getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant instantToRollback,
+                                                                      List rollbackRequests, int numPartitions) {
+    return context.map(rollbackRequests, rollbackRequest -> {
+      switch (rollbackRequest.getType()) {
+        case DELETE_DATA_FILES_ONLY: {
+          final FileStatus[] filesToDeletedStatus = getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(),
+              rollbackRequest.getPartitionPath(), metaClient.getFs());
+          List filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+            String fileToBeDeleted = fileStatus.getPath().toString();
+            // strip scheme
+            return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+          }).collect(Collectors.toList());
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(),
+              EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
         }
-
-        // This step is intentionally done after writer is closed. Guarantees that
-        // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in
-        // cloud-storage : HUDI-168
-        Map filesToNumBlocksRollback = Collections.singletonMap(
-            metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-            1L
-        );
-
-        return new ImmutablePair<>(rollbackRequest.getPartitionPath(),
-            HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath())
-                .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-                .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build());
+        case DELETE_DATA_AND_LOG_FILES: {
+          final FileStatus[] filesToDeletedStatus = getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), metaClient.getFs());
+          List filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> {
+            String fileToBeDeleted = fileStatus.getPath().toString();
+            // strip scheme
+            return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1);
+          }).collect(Collectors.toList());
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP);
+        }
+        case APPEND_ROLLBACK_BLOCK: {
+          String fileId = rollbackRequest.getFileId().get();
+          String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get();
+          // collect all log files that is supposed to be deleted with this rollback
+          Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(),
+              FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()),
+              fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant)
+              .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
+          Map logFilesToBeDeleted = new HashMap<>();
+          for (Map.Entry fileToBeDeleted : writtenLogFileSizeMap.entrySet()) {
+            logFilesToBeDeleted.put(fileToBeDeleted.getKey().getPath().toString(), fileToBeDeleted.getValue());
+          }
+          return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), fileId, latestBaseInstant,
+              Collections.EMPTY_LIST, logFilesToBeDeleted);
+        }
+        default:
+          throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
       }
-      default:
-        throw new IllegalStateException("Unknown Rollback action " + rollbackRequest);
-    }
+    }, numPartitions).stream().collect(Collectors.toList());
   }
 
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                         String commit, String partitionPath, boolean doDelete) throws IOException {
-    LOG.info("Cleaning path " + partitionPath);
+  private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
+                                               String commit, String partitionPath, FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
     String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    SerializablePathFilter filter = (path) -> {
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
+  }
+
+  private FileStatus[] getBaseAndLogFilesToBeDeleted(String commit, String partitionPath, FileSystem fs) throws IOException {
+    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    BaseRollbackHelper.SerializablePathFilter filter = (path) -> {
       if (path.toString().endsWith(basefileExtension)) {
         String fileCommitTime = FSUtils.getCommitTime(path.getName());
         return commit.equals(fileCommitTime);
@@ -188,62 +149,6 @@ public class ListingBasedRollbackHelper implements Serializable {
       }
       return false;
     };
-
-    final Map results = new HashMap<>();
-    FileSystem fs = metaClient.getFs();
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Common method used for cleaning out base files under a partition path during rollback of a set of commits.
-   */
-  private Map deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config,
-                                                   String commit, String partitionPath, boolean doDelete) throws IOException {
-    final Map results = new HashMap<>();
-    LOG.info("Cleaning path " + partitionPath);
-    FileSystem fs = metaClient.getFs();
-    String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(basefileExtension)) {
-        String fileCommitTime = FSUtils.getCommitTime(path.getName());
-        return commit.equals(fileCommitTime);
-      }
-      return false;
-    };
-    FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
-    for (FileStatus file : toBeDeleted) {
-      if (doDelete) {
-        boolean success = fs.delete(file.getPath(), false);
-        results.put(file, success);
-        LOG.info("Delete file " + file.getPath() + "\t" + success);
-      } else {
-        results.put(file, true);
-      }
-    }
-    return results;
-  }
-
-  private Map generateHeader(String commit) {
-    // generate metadata
-    Map header = new HashMap<>(3);
-    header.put(HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
-    header.put(HeaderMetadataType.TARGET_INSTANT_TIME, commit);
-    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE,
-        String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
-    return header;
-  }
-
-  public interface SerializablePathFilter extends PathFilter, Serializable {
-
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter);
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
new file mode 100644
index 000000000..266fa39cb
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Listing based rollback strategy to fetch list of {@link HoodieRollbackRequest}s.
+ */
+public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecutor.RollbackStrategy {
+
+  private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackStrategy.class);
+
+  protected final HoodieTable table;
+  protected final HoodieEngineContext context;
+  protected final HoodieWriteConfig config;
+  protected final String instantTime;
+
+  public ListingBasedRollbackStrategy(HoodieTable table,
+                                      HoodieEngineContext context,
+                                      HoodieWriteConfig config,
+                                      String instantTime) {
+    this.table = table;
+    this.context = context;
+    this.config = config;
+    this.instantTime = instantTime;
+  }
+
+  @Override
+  public List getRollbackRequests(HoodieInstant instantToRollback) {
+    try {
+      List rollbackRequests = null;
+      if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
+        rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
+            table.getMetaClient().getBasePath(), config);
+      } else {
+        rollbackRequests = RollbackUtils
+            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context);
+      }
+      List listingBasedRollbackRequests = new ListingBasedRollbackHelper(table.getMetaClient(), config)
+          .getRollbackRequestsForRollbackPlan(context, instantToRollback, rollbackRequests);
+      return listingBasedRollbackRequests;
+    } catch (IOException e) {
+      LOG.error("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e);
+      throw new HoodieRollbackException("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e);
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 1bfd4b165..9d04e3036 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -18,41 +18,38 @@
 
 package org.apache.hudi.table.action.rollback;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
 import org.apache.hudi.table.marker.WriteMarkers;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
+
 /**
  * Performs rollback using marker files generated during the write..
  */
-public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.RollbackStrategy {
+public class MarkerBasedRollbackStrategy implements BaseRollbackPlanActionExecutor.RollbackStrategy {
 
   private static final Logger LOG = LogManager.getLogger(MarkerBasedRollbackStrategy.class);
 
@@ -74,72 +71,46 @@ public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.R
     this.instantTime = instantTime;
   }
 
-  protected HoodieRollbackStat undoMerge(String mergedBaseFilePath) throws IOException {
-    LOG.info("Rolling back by deleting the merged base file:" + mergedBaseFilePath);
-    return deleteBaseFile(mergedBaseFilePath);
+  @Override
+  public List getRollbackRequests(HoodieInstant instantToRollback) {
+    try {
+      List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
+          table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
+      int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
+      return context.map(markerPaths, markerFilePath -> {
+        String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
+        IOType type = IOType.valueOf(typeStr);
+        switch (type) {
+          case MERGE:
+          case CREATE:
+            String fileToDelete = WriteMarkers.stripMarkerSuffix(markerFilePath);
+            Path fullDeletePath = new Path(basePath, fileToDelete);
+            String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
+            return new HoodieRollbackRequest(partitionPath, EMPTY_STRING, EMPTY_STRING,
+                Collections.singletonList(fullDeletePath.toString()),
+                Collections.emptyMap());
+          case APPEND:
+            return getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath));
+          default:
+            throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
+        }
+      }, parallelism).stream().collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
+    }
   }
 
-  protected HoodieRollbackStat undoCreate(String createdBaseFilePath) throws IOException {
-    LOG.info("Rolling back by deleting the created base file:" + createdBaseFilePath);
-    return deleteBaseFile(createdBaseFilePath);
-  }
-
-  private HoodieRollbackStat deleteBaseFile(String baseFilePath) throws IOException {
-    Path fullDeletePath = new Path(basePath, baseFilePath);
-    String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
-    boolean isDeleted = table.getMetaClient().getFs().delete(fullDeletePath);
-    return HoodieRollbackStat.newBuilder()
-        .withPartitionPath(partitionPath)
-        .withDeletedFileResult(baseFilePath, isDeleted)
-        .build();
-  }
-
-  protected HoodieRollbackStat undoAppend(String appendBaseFilePath, HoodieInstant instantToRollback) throws IOException, InterruptedException {
+  protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException {
     Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath);
     String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
     String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName());
     String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent());
-    final Map writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
-
-    HoodieLogFormat.Writer writer = null;
-    try {
-      Path partitionFullPath = FSUtils.getPartitionPath(basePath, partitionPath);
-
-      if (!table.getMetaClient().getFs().exists(partitionFullPath)) {
-        return HoodieRollbackStat.newBuilder()
-            .withPartitionPath(partitionPath)
-            .build();
-      }
-      writer = HoodieLogFormat.newWriterBuilder()
-          .onParentPath(partitionFullPath)
-          .withFileId(fileId)
-          .overBaseCommit(baseCommitTime)
-          .withFs(table.getMetaClient().getFs())
-          .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
-      // generate metadata
-      Map header = RollbackUtils.generateHeader(instantToRollback.getTimestamp(), instantTime);
-      // if update belongs to an existing log file
-      writer.appendBlock(new HoodieCommandBlock(header));
-    } finally {
-      try {
-        if (writer != null) {
-          writer.close();
-        }
-      } catch (IOException io) {
-        throw new HoodieIOException("Error closing append of rollback block..", io);
-      }
+    Map writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId);
+    Map writtenLogFileStrSizeMap = new HashMap<>();
+    for (Map.Entry entry : writtenLogFileSizeMap.entrySet()) {
+      writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue());
     }
-
-    // the information of files appended to is required for metadata sync
-    Map filesToNumBlocksRollback = Collections.singletonMap(
-        table.getMetaClient().getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()),
-        1L);
-
-    return HoodieRollbackStat.newBuilder()
-        .withPartitionPath(partitionPath)
-        .withRollbackBlockAppendResults(filesToNumBlocksRollback)
-        .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build();
+    return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap);
   }
 
   /**
@@ -151,41 +122,10 @@ public class MarkerBasedRollbackStrategy implements BaseRollbackActionExecutor.R
    * @return Map
    * @throws IOException
    */
-  protected Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
+  private Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException {
     // collect all log files that is supposed to be deleted with this rollback
     return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
         FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime)
         .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen()));
   }
-
-  @Override
-  public List execute(HoodieInstant instantToRollback) {
-    try {
-      List markerPaths = MarkerBasedRollbackUtils.getAllMarkerPaths(
-          table, context, instantToRollback.getTimestamp(), config.getRollbackParallelism());
-      int parallelism = Math.max(Math.min(markerPaths.size(), config.getRollbackParallelism()), 1);
-      context.setJobStatus(this.getClass().getSimpleName(), "Rolling back using marker files");
-      return context.mapToPairAndReduceByKey(markerPaths, markerFilePath -> {
-        String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
-        IOType type = IOType.valueOf(typeStr);
-        HoodieRollbackStat rollbackStat;
-        switch (type) {
-          case MERGE:
-            rollbackStat = undoMerge(WriteMarkers.stripMarkerSuffix(markerFilePath));
-            break;
-          case APPEND:
-            rollbackStat = undoAppend(WriteMarkers.stripMarkerSuffix(markerFilePath), instantToRollback);
-            break;
-          case CREATE:
-            rollbackStat = undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
-            break;
-          default:
-            throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback);
-        }
-        return new ImmutablePair<>(rollbackStat.getPartitionPath(), rollbackStat);
-      }, RollbackUtils::mergeRollbackStat, parallelism);
-    } catch (Exception e) {
-      throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e);
-    }
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
index 87d26281d..23af44552 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java
@@ -19,19 +19,18 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -60,16 +59,7 @@ public class MergeOnReadRollbackActionExecutor executeRollback() {
+  protected List executeRollback(HoodieRollbackPlan hoodieRollbackPlan) {
     HoodieTimer rollbackTimer = new HoodieTimer();
     rollbackTimer.startTimer();
 
@@ -96,7 +86,7 @@ public class MergeOnReadRollbackActionExecutor executeRollbackUsingFileListing(HoodieInstant resolvedInstant) {
-    List rollbackRequests;
-    try {
-      rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(resolvedInstant, table, context);
-    } catch (IOException e) {
-      throw new HoodieIOException("Error generating rollback requests by file listing.", e);
-    }
-    return new ListingBasedRollbackHelper(table.getMetaClient(), config).performRollback(context, resolvedInstant, rollbackRequests);
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
index d213fb18f..6ad4e1c98 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
@@ -22,17 +22,20 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -51,6 +54,20 @@ public class RollbackUtils {
 
   private static final Logger LOG = LogManager.getLogger(RollbackUtils.class);
 
+  /**
+   * Get Latest version of Rollback plan corresponding to a clean instant.
+   * @param metaClient  Hoodie Table Meta Client
+   * @param rollbackInstant Instant referring to rollback action
+   * @return Rollback plan corresponding to rollback instant
+   * @throws IOException
+   */
+  static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant)
+      throws IOException {
+    // TODO: add upgrade step if required.
+    return TimelineMetadataUtils.deserializeAvroMetadata(
+        metaClient.getActiveTimeline().readRollbackInfoAsBytes(rollbackInstant).get(), HoodieRollbackPlan.class);
+  }
+
   static Map generateHeader(String instantToRollback, String rollbackInstantTime) {
     // generate metadata
     Map header = new HashMap<>(3);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializableHoodieRollbackRequest.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializableHoodieRollbackRequest.java
new file mode 100644
index 000000000..acd1c50ba
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializableHoodieRollbackRequest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieRollbackRequest in HoodieRollbackPlan (avro pojo) is not operable direclty within spark parallel engine.
+ * Hence converting the same to this {@link SerializableHoodieRollbackRequest} and then using it within spark.parallelize.
+ */
+public class SerializableHoodieRollbackRequest {
+
+  private final String partitionPath;
+  private final String fileId;
+  private final String latestBaseInstant;
+  private final List filesToBeDeleted = new ArrayList<>();
+  private final Map logBlocksToBeDeleted = new HashMap<>();
+
+  public SerializableHoodieRollbackRequest(HoodieRollbackRequest rollbackRequest) {
+    this.partitionPath = rollbackRequest.getPartitionPath();
+    this.fileId = rollbackRequest.getFileId();
+    this.latestBaseInstant = rollbackRequest.getLatestBaseInstant();
+    this.filesToBeDeleted.addAll(rollbackRequest.getFilesToBeDeleted());
+    this.logBlocksToBeDeleted.putAll(rollbackRequest.getLogBlocksToBeDeleted());
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public String getFileId() {
+    return fileId;
+  }
+
+  public String getLatestBaseInstant() {
+    return latestBaseInstant;
+  }
+
+  public List getFilesToBeDeleted() {
+    return filesToBeDeleted;
+  }
+
+  public Map getLogBlocksToBeDeleted() {
+    return logBlocksToBeDeleted;
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 66b7e78d4..174122c68 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -86,6 +86,16 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism) {
+    return data.stream().parallel()
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
   @Override
   public  List flatMap(List data, SerializableFunction> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 27571bcdb..93785b919 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -53,6 +54,7 @@ import org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecuto
 import org.apache.hudi.table.action.commit.FlinkMergeHelper;
 import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 
 import org.slf4j.Logger;
@@ -298,6 +300,12 @@ public class HoodieFlinkCopyOnWriteTable extends
     return new FlinkScheduleCleanActionExecutor(context, config, this, instantTime, extraMetadata).execute();
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
+                                                     boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
     return new FlinkCleanActionExecutor(context, config, this, cleanInstantTime).execute();
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index 461427098..f4a4b0eb4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -37,6 +38,7 @@ import org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExe
 import org.apache.hudi.table.action.commit.delta.FlinkUpsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
 import org.apache.hudi.table.action.compact.FlinkScheduleCompactionActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
 import java.util.List;
@@ -106,6 +108,12 @@ public class HoodieFlinkMergeOnReadTable
         + "should not invoke directly through HoodieFlinkMergeOnReadTable");
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
+                                                     boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   @Override
   public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
     return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index cb024c603..284d3bcdf 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -27,8 +28,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
+import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
 import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
+import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest;
 
 import java.util.List;
 
@@ -45,7 +47,8 @@ public class ZeroToOneUpgradeHandler extends BaseZeroToOneUpgradeHandler {
   @Override
   List getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option commitInstantOpt,
                                                      List rollbackRequests) {
-    return new ListingBasedRollbackHelper(metaClient, config)
-        .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
+    List hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
+        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
+    return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
   }
 }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index f7a28e283..4cdbff264 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -69,6 +70,16 @@ public class HoodieJavaEngineContext extends HoodieEngineContext {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism) {
+    return data.stream().parallel()
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
   @Override
   public  List flatMap(List data, SerializableFunction> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 7715bf965..72d63d5a0 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
@@ -50,6 +51,7 @@ import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor
 import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
@@ -177,6 +179,12 @@ public class HoodieJavaCopyOnWriteTable extends H
     throw new HoodieNotSupportedException("RollbackBootstrap is not supported yet");
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
+                                                     boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   @Override
   public Option scheduleCleaning(HoodieEngineContext context, String instantTime, Option> extraMetadata) {
     return new JavaScheduleCleanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index ad1d7cd92..de06ea4b4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -82,6 +82,13 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
     }).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
   }
 
+  @Override
+  public  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism) {
+    return javaSparkContext.parallelize(data, parallelism).mapToPair(pair -> new Tuple2(pair.getLeft(), pair.getRight()))
+        .reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
+  }
+
   @Override
   public  List flatMap(List data, SerializableFunction> func, int parallelism) {
     return javaSparkContext.parallelize(data, parallelism).flatMap(x -> func.apply(x).iterator()).collect();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 26d14cfa9..c2770a784 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -65,6 +66,7 @@ import org.apache.hudi.table.action.commit.SparkMergeHelper;
 import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
 import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 import org.apache.log4j.LogManager;
@@ -187,6 +189,13 @@ public class HoodieSparkCopyOnWriteTable extends
     return new SparkCleanPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context,
+                                                              String instantTime,
+                                                              HoodieInstant instantToRollback, boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   public Iterator> handleUpdate(String instantTime, String partitionPath, String fileId,
       Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
     // these are updates
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 2db4eeb70..ee66d7b0a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -49,6 +50,7 @@ import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExec
 import org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor;
 import org.apache.hudi.table.action.compact.BaseScheduleCompactionActionExecutor;
 import org.apache.hudi.table.action.restore.SparkMergeOnReadRestoreActionExecutor;
+import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
 import org.apache.spark.api.java.JavaRDD;
@@ -142,6 +144,13 @@ public class HoodieSparkMergeOnReadTable extends
     new SparkMergeOnReadRestoreActionExecutor((HoodieSparkEngineContext) context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
   }
 
+  @Override
+  public Option scheduleRollback(HoodieEngineContext context,
+                                                     String instantTime,
+                                                     HoodieInstant instantToRollback, boolean skipTimelinePublish) {
+    return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute();
+  }
+
   @Override
   public HoodieRollbackMetadata rollback(HoodieEngineContext context,
                                          String rollbackInstantTime,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
index 9c6ec6e70..7d60b28e0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/restore/SparkCopyOnWriteRestoreActionExecutor.java
@@ -48,20 +48,23 @@ public class SparkCopyOnWriteRestoreActionExecutor getListBasedRollBackStats(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, Option commitInstantOpt,
                                                      List rollbackRequests) {
-    return new ListingBasedRollbackHelper(metaClient, config)
-        .collectRollbackStats(context, commitInstantOpt.get(), rollbackRequests);
+    List hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config)
+        .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests);
+    return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests);
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 69cc25feb..2eb2f380e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -1282,7 +1282,9 @@ public class TestCleaner extends HoodieClientTestBase {
     table.getActiveTimeline().transitionRequestedToInflight(
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
     metaClient.reloadActiveTimeline();
-    table.rollback(context, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
+    HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000");
+    table.scheduleRollback(context, "001", rollbackInstant, false);
+    table.rollback(context, "001", rollbackInstant, true);
     final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
     assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index 810733c64..2e93602c4 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.action.rollback;
 
 import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileGroup;
@@ -79,13 +80,16 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
         .withBaseFilesInPartition(p1, "id21")
         .withBaseFilesInPartition(p2, "id22");
 
-    HoodieTable table = this.getHoodieTable(metaClient, getConfig());
+    HoodieWriteConfig writeConfig = getConfigBuilder().withRollbackUsingMarkers(false).build();
+    HoodieTable table = this.getHoodieTable(metaClient, writeConfig);
     HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002");
 
     // execute CopyOnWriteRollbackActionExecutor with filelisting mode
+    BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false);
+    HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
     CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
-    assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    List hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback();
+    List hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback(rollbackPlan);
 
     // assert hoodieRollbackStats
     assertEquals(hoodieRollbackStats.size(), 3);
@@ -96,14 +100,14 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
           assertEquals(0, stat.getFailedDeleteFiles().size());
           assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
           assertEquals(testTable.forCommit("002").getBaseFilePath(p1, "id21").toString(),
-              stat.getSuccessDeleteFiles().get(0));
+              this.fs.getScheme() + ":" + stat.getSuccessDeleteFiles().get(0));
           break;
         case p2:
           assertEquals(1, stat.getSuccessDeleteFiles().size());
           assertEquals(0, stat.getFailedDeleteFiles().size());
           assertEquals(Collections.EMPTY_MAP, stat.getCommandBlocksCount());
           assertEquals(testTable.forCommit("002").getBaseFilePath(p2, "id22").toString(),
-              stat.getSuccessDeleteFiles().get(0));
+              this.fs.getScheme() + ":" + stat.getSuccessDeleteFiles().get(0));
           break;
         case p3:
           assertEquals(0, stat.getSuccessDeleteFiles().size());
@@ -150,7 +154,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     HoodieTable table = this.getHoodieTable(metaClient, cfg);
     performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
   }
-  
+
   private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfig cfg, HoodieTable table,
                                           List firstPartitionCommit2FileSlices,
                                           List secondPartitionCommit2FileSlices) throws IOException {
@@ -162,12 +166,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
       commitInstant = table.getCompletedCommitTimeline().lastInstant().get();
     }
 
+    BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false);
+    HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
     CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
-    if (!isUsingMarkers) {
-      assertFalse(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    } else {
-      assertTrue(copyOnWriteRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    }
     Map rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
 
     //3. assert the rollback stat
@@ -175,9 +177,9 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     for (Map.Entry entry : rollbackMetadata.entrySet()) {
       HoodieRollbackPartitionMetadata meta = entry.getValue();
       assertTrue(meta.getFailedDeleteFiles() == null
-              || meta.getFailedDeleteFiles().size() == 0);
+          || meta.getFailedDeleteFiles().size() == 0);
       assertTrue(meta.getSuccessDeleteFiles() == null
-              || meta.getSuccessDeleteFiles().size() == 1);
+          || meta.getSuccessDeleteFiles().size() == 1);
     }
 
     //4. assert filegroup after rollback, and compare to the rollbackstat
@@ -187,15 +189,11 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     List firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
     assertEquals(1, firstPartitionRollBack1FileSlices.size());
 
-    if (!isUsingMarkers) {
-      firstPartitionCommit2FileSlices.removeAll(firstPartitionRollBack1FileSlices);
-      assertEquals(1, firstPartitionCommit2FileSlices.size());
-      assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
-          rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0));
-    } else {
-      assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
-          String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
-    }
+    firstPartitionCommit2FileSlices.removeAll(firstPartitionRollBack1FileSlices);
+    assertEquals(1, firstPartitionCommit2FileSlices.size());
+    assertEquals(firstPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
+        this.fs.getScheme() + ":" + rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().get(0));
+
 
     // assert the second partition file group and file slice
     List secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
@@ -204,15 +202,10 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
     assertEquals(1, secondPartitionRollBack1FileSlices.size());
 
     // assert the second partition rollback file is equals rollBack1SecondPartitionStat
-    if (!isUsingMarkers) {
-      secondPartitionCommit2FileSlices.removeAll(secondPartitionRollBack1FileSlices);
-      assertEquals(1, secondPartitionCommit2FileSlices.size());
-      assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
-          rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0));
-    } else {
-      assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
-          String.format("%s:%s/%s", this.fs.getScheme(), basePath, rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0)));
-    }
+    secondPartitionCommit2FileSlices.removeAll(secondPartitionRollBack1FileSlices);
+    assertEquals(1, secondPartitionCommit2FileSlices.size());
+    assertEquals(secondPartitionCommit2FileSlices.get(0).getBaseFile().get().getPath(),
+        this.fs.getScheme() + ":" + rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().get(0));
 
     assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, commitInstant.getTimestamp()).doesMarkerDirExist());
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 5d269cf6a..af77dc753 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -61,7 +61,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
     initPath();
     initSparkContexts();
     //just generate tow partitions
-    dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
+    dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
     initFileSystem();
     initMetaClient();
   }
@@ -89,6 +89,9 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
 
     //2. rollback
     HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
+    BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false);
+    mergeOnReadRollbackPlanActionExecutor.execute().get();
     MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
         context,
         cfg,
@@ -96,13 +99,6 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
         "003",
         rollBackInstant,
         true);
-    // assert is filelist mode
-    if (!isUsingMarkers) {
-      assertFalse(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    } else {
-      assertTrue(mergeOnReadRollbackActionExecutor.getRollbackStrategy() instanceof MarkerBasedRollbackStrategy);
-    }
-
     //3. assert the rollback stat
     Map rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
     assertEquals(2, rollbackMetadata.size());
@@ -145,15 +141,13 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
   public void testFailForCompletedInstants() {
     Assertions.assertThrows(IllegalArgumentException.class, () -> {
       HoodieInstant rollBackInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
-      new MergeOnReadRollbackActionExecutor(
-          context,
-          getConfigBuilder().build(),
+      new MergeOnReadRollbackActionExecutor(context, getConfigBuilder().build(),
           getHoodieTable(metaClient, getConfigBuilder().build()),
           "003",
           rollBackInstant,
           true,
           true,
-          true);
+          true).execute();
     });
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 94fa6974d..8b23cf257 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.functional;
 
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -32,6 +33,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.rollback.BaseRollbackHelper;
 import org.apache.hudi.table.action.rollback.MarkerBasedRollbackStrategy;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
@@ -93,8 +96,13 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
         .withMarkerFile("partA", f2, IOType.CREATE);
 
     // when
-    List stats = new MarkerBasedRollbackStrategy(HoodieSparkTable.create(getConfig(), context, metaClient), context, getConfig(), "002")
-        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
+    HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);
+    List rollbackRequests = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+        "002").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"));
+
+    List stats = new BaseRollbackHelper(hoodieTable.getMetaClient(), getConfig()).performRollback(context,
+        new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
+        rollbackRequests);
 
     // then: ensure files are deleted correctly, non-existent files reported as failed deletes
     assertEquals(2, stats.size());
@@ -175,9 +183,14 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
     writeStatuses = writeClient.upsert(jsc.parallelize(records, 1), newCommitTime);
     writeStatuses.collect();
 
+    HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context, metaClient);
+    List rollbackRequests = new MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+        "003").getRollbackRequests(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
+
     // rollback 2nd commit and ensure stats reflect the info.
-    return new MarkerBasedRollbackStrategy(HoodieSparkTable.create(writeConfig, context, metaClient), context, writeConfig, "003")
-        .execute(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"));
+    return new BaseRollbackHelper(hoodieTable.getMetaClient(), getConfig()).performRollback(context,
+        new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002"),
+        rollbackRequests);
   }
 
 }
diff --git a/hudi-common/src/main/avro/HoodieRollbackPlan.avsc b/hudi-common/src/main/avro/HoodieRollbackPlan.avsc
new file mode 100644
index 000000000..99e0755bd
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieRollbackPlan.avsc
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "org.apache.hudi.avro.model",
+  "type": "record",
+  "name": "HoodieRollbackPlan",
+  "fields": [
+     {
+           "name": "instantToRollback",
+           "doc": "Hoodie instant that needs to be rolled back",
+           "type": ["null", "HoodieInstantInfo"],
+           "default": null
+    },
+    {
+      "name": "RollbackRequests",
+      "type":["null", {
+                "type":"array",
+                "items":{
+                 "type": "record",
+                         "name": "HoodieRollbackRequest",
+                         "fields": [
+                            {"name": "partitionPath", "type": "string"},
+                            {"name": "fileId",
+                              "type":["null", "string"],
+                              "default": null
+                             },
+                            {"name": "latestBaseInstant",
+                              "type":["null", "string"],
+                              "default": null
+                            },
+                            {"name": "filesToBeDeleted",
+                             "default": [],
+                             "type": {
+                                       "type": "array",
+                                       "default": [],
+                                       "items": "string"
+                                    }
+                            },
+                            {"name": "logBlocksToBeDeleted",
+                             "type": ["null", {
+                               "type": "map",
+                               "doc": "Log blocks that need to be deleted as part of the rollback",
+                               "values": {
+                                   "type": "long",
+                                   "doc": "Size of this file/block in bytes"
+                               }
+                             }],
+                             "default":null
+                            }
+                         ]
+                }
+                }],
+       "default" : null
+    },
+    {
+       "name":"version",
+       "type":["int", "null"],
+       "default": 1
+    }
+  ]
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index 8ea6a43e0..10c7ced07 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.function.SerializableConsumer;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.List;
 import java.util.Map;
@@ -60,6 +61,9 @@ public abstract class HoodieEngineContext {
   public abstract  List mapToPairAndReduceByKey(
       List data, SerializablePairFunction mapToPairFunc, SerializableBiFunction reduceFunc, int parallelism);
 
+  public abstract  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism);
+
   public abstract  List flatMap(List data, SerializableFunction> func, int parallelism);
 
   public abstract  void foreach(List data, SerializableConsumer consumer, int parallelism);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 0aeb9d8c0..1c935ff06 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -67,6 +68,16 @@ public final class HoodieLocalEngineContext extends HoodieEngineContext {
         .collect(Collectors.toList());
   }
 
+  @Override
+  public  List reduceByKey(
+      List> data, SerializableBiFunction reduceFunc, int parallelism) {
+    return data.stream().parallel()
+        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+        .map(list -> list.stream().map(e -> e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
   @Override
   public  List flatMap(List data, SerializableFunction> func, int parallelism) {
     return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index e6abed677..5b60b033c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -67,7 +67,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
       CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
       INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION,
       INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
-      ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
+      ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
       REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
   private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
   protected HoodieTableMetaClient metaClient;
@@ -229,6 +229,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
   }
 
+  public Option readRollbackInfoAsBytes(HoodieInstant instant) {
+    // Rollback metadata are always stored only in timeline .hoodie
+    return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
+  }
+
   //-----------------------------------------------------------------
   //      BEGIN - COMPACTION RELATED META-DATA MANAGEMENT.
   //-----------------------------------------------------------------
@@ -339,6 +344,37 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     return inflight;
   }
 
+  /**
+   * Transition Rollback State from inflight to Committed.
+   *
+   * @param inflightInstant Inflight instant
+   * @param data Extra Metadata
+   * @return commit instant
+   */
+  public HoodieInstant transitionRollbackInflightToComplete(HoodieInstant inflightInstant, Option data) {
+    ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
+    ValidationUtils.checkArgument(inflightInstant.isInflight());
+    HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, inflightInstant.getTimestamp());
+    // Then write to timeline
+    transitionState(inflightInstant, commitInstant, data);
+    return commitInstant;
+  }
+
+  /**
+   * Transition Rollback State from requested to inflight.
+   *
+   * @param requestedInstant requested instant
+   * @param data Optional data to be stored
+   * @return commit instant
+   */
+  public HoodieInstant transitionRollbackRequestedToInflight(HoodieInstant requestedInstant, Option data) {
+    ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
+    ValidationUtils.checkArgument(requestedInstant.isRequested());
+    HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, ROLLBACK_ACTION, requestedInstant.getTimestamp());
+    transitionState(requestedInstant, inflight, data);
+    return inflight;
+  }
+
   /**
    * Transition replace requested file to replace inflight.
    *
@@ -497,6 +533,13 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     createFileInMetaPath(instant.getFileName(), content, false);
   }
 
+  public void saveToRollbackRequested(HoodieInstant instant, Option content) {
+    ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
+    ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));
+    // Plan is stored in meta path
+    createFileInMetaPath(instant.getFileName(), content, false);
+  }
+
   private void createFileInMetaPath(String filename, Option content, boolean allowOverwrite) {
     Path fullPath = new Path(metaClient.getMetaPath(), filename);
     if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 65376b48e..a8df62c64 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -147,7 +147,8 @@ public class HoodieInstant implements Serializable, Comparable {
               : HoodieTimeline.makeCleanerFileName(timestamp);
     } else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) {
       return isInflight() ? HoodieTimeline.makeInflightRollbackFileName(timestamp)
-          : HoodieTimeline.makeRollbackFileName(timestamp);
+          : isRequested() ? HoodieTimeline.makeRequestedRollbackFileName(timestamp)
+              : HoodieTimeline.makeRollbackFileName(timestamp);
     } else if (HoodieTimeline.SAVEPOINT_ACTION.equals(action)) {
       return isInflight() ? HoodieTimeline.makeInflightSavePointFileName(timestamp)
           : HoodieTimeline.makeSavePointFileName(timestamp);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 1e366147a..b473c7b1f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -73,6 +73,7 @@ public interface HoodieTimeline extends Serializable {
   String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION;
   String REQUESTED_CLEAN_EXTENSION = "." + CLEAN_ACTION + REQUESTED_EXTENSION;
   String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION;
+  String REQUESTED_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + REQUESTED_EXTENSION;
   String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION;
   String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION);
   String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX);
@@ -363,6 +364,10 @@ public interface HoodieTimeline extends Serializable {
     return StringUtils.join(instant, HoodieTimeline.ROLLBACK_EXTENSION);
   }
 
+  static String makeRequestedRollbackFileName(String instant) {
+    return StringUtils.join(instant, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION);
+  }
+
   static String makeInflightRollbackFileName(String instant) {
     return StringUtils.join(instant, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index a50c2998a..32e42ee58 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
 import org.apache.hudi.common.HoodieRollbackStat;
@@ -109,6 +110,10 @@ public class TimelineMetadataUtils {
     return serializeAvroMetadata(cleanPlan, HoodieCleanerPlan.class);
   }
 
+  public static Option serializeRollbackPlan(HoodieRollbackPlan rollbackPlan) throws IOException {
+    return serializeAvroMetadata(rollbackPlan, HoodieRollbackPlan.class);
+  }
+
   public static Option serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException {
     return serializeAvroMetadata(metadata, HoodieCleanMetadata.class);
   }