diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 444eae62b..d98da346e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -599,7 +599,7 @@ public abstract class AbstractHoodieWriteClient rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, - commitInstantOpt.get(), false); + commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()); if (rollbackPlanOption.isPresent()) { // execute rollback HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, @@ -1024,7 +1024,7 @@ public abstract class AbstractHoodieWriteClient table) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); - table.scheduleRollback(context, commitTime, inflightInstant, false); + table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); table.rollback(context, commitTime, inflightInstant, false, false); table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 6046374ba..747470f1d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -442,12 +442,13 @@ public abstract class HoodieTable implem * @param context HoodieEngineContext * @param instantTime Instant Time for scheduling rollback * @param instantToRollback instant to be rolled back + * @param shouldRollbackUsingMarkers uses marker based rollback strategy when set to true. uses list based rollback when false. * @return HoodieRollbackPlan containing info on rollback. */ public abstract Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish); + boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers); /** * Rollback the (inflight/committed) record changes with the given commit time. @@ -490,7 +491,7 @@ public abstract class HoodieTable implem */ public void rollbackInflightCompaction(HoodieInstant inflightInstant) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); - scheduleRollback(context, commitTime, inflightInstant, false); + scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); rollback(context, commitTime, inflightInstant, false, false); getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java index 1116ef9a4..facab71c6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java @@ -48,7 +48,7 @@ public class CopyOnWriteRestoreActionExecutor table, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish) { + boolean skipTimelinePublish, + boolean shouldRollbackUsingMarkers) { super(context, config, table, instantTime); this.instantToRollback = instantToRollback; this.skipTimelinePublish = skipTimelinePublish; + this.shouldRollbackUsingMarkers = shouldRollbackUsingMarkers; } /** @@ -84,7 +87,7 @@ public class BaseRollbackPlanActionExecutor @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 5ad87e083..a65e03da7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -120,8 +120,9 @@ public class HoodieFlinkMergeOnReadTable @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index a9e582110..4107adb8f 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -193,8 +193,9 @@ public class HoodieJavaCopyOnWriteTable extends H @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback, - boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 0971b87c4..74d4718a9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -230,8 +230,9 @@ public class HoodieSparkCopyOnWriteTable @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, - HoodieInstant instantToRollback, boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index 9e053aaa0..75af5d0f6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -156,8 +156,9 @@ public class HoodieSparkMergeOnReadTable extends @Override public Option scheduleRollback(HoodieEngineContext context, String instantTime, - HoodieInstant instantToRollback, boolean skipTimelinePublish) { - return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish).execute(); + HoodieInstant instantToRollback, boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) { + return new BaseRollbackPlanActionExecutor<>(context, config, this, instantTime, instantToRollback, skipTimelinePublish, + shouldRollbackUsingMarkers).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index caffb476b..86d18fe28 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -495,7 +495,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @ParameterizedTest @MethodSource("populateMetaFieldsParams") public void testUpserts(boolean populateMetaFields) throws Exception { - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withRollbackUsingMarkers(true); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsert, false); } @@ -506,7 +506,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @ParameterizedTest @MethodSource("populateMetaFieldsParams") public void testUpsertsPrepped(boolean populateMetaFields) throws Exception { - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(); + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withRollbackUsingMarkers(true); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); testUpsertsInternal(cfgBuilder.build(), SparkRDDWriteClient::upsertPreppedRecords, true); } @@ -523,6 +523,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { throws Exception { // Force using older timeline layout HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .withRollbackUsingMarkers(true) .withProps(config.getProps()).withTimelineLayoutVersion( VERSION_0).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index cb468e903..2305d7bde 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -1307,7 +1307,7 @@ public class TestCleaner extends HoodieClientTestBase { new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); metaClient.reloadActiveTimeline(); HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"); - table.scheduleRollback(context, "001", rollbackInstant, false); + table.scheduleRollback(context, "001", rollbackInstant, false, config.shouldRollbackUsingMarkers()); table.rollback(context, "001", rollbackInstant, true, false); final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length; assertEquals(0, numTempFilesAfter, "All temp files are deleted."); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 3225dcd04..1bee6ac0a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -86,7 +86,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT // execute CopyOnWriteRollbackActionExecutor with filelisting mode BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = - new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false); + new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false, + table.getConfig().shouldRollbackUsingMarkers()); HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true, false); @@ -168,7 +169,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT } BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = - new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false); + new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false, + table.getConfig().shouldRollbackUsingMarkers()); HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get(); CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false, false); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index 5a829e294..1c6015f70 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -91,7 +91,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT //2. rollback HoodieInstant rollBackInstant = new HoodieInstant(isUsingMarkers, HoodieTimeline.DELTA_COMMIT_ACTION, "002"); BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor = - new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false); + new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false, + cfg.shouldRollbackUsingMarkers()); mergeOnReadRollbackPlanActionExecutor.execute().get(); MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor( context,