diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 9517234a0..db1cd207d 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -232,7 +232,9 @@ public class CommitsCommand implements CommandMarker { @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", - help = "Spark executor memory") final String sparkMemory) + help = "Spark executor memory") final String sparkMemory, + @CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "true", + help = "Enabling marker based rollback") final String rollbackUsingMarkers) throws Exception { HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); @@ -243,7 +245,7 @@ public class CommitsCommand implements CommandMarker { SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime, - HoodieCLI.getTableMetaClient().getBasePath()); + HoodieCLI.getTableMetaClient().getBasePath(), rollbackUsingMarkers); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index f2fb9511a..e7866b94a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -92,8 +92,8 @@ public class SparkMain { try { switch (cmd) { case ROLLBACK: - assert (args.length == 5); - returnCode = rollback(jsc, args[3], args[4]); + assert (args.length == 6); + returnCode = rollback(jsc, args[3], args[4], Boolean.parseBoolean(args[5])); break; case DEDUPLICATE: assert (args.length == 8); @@ -408,8 +408,8 @@ public class SparkMain { return 0; } - private static int rollback(JavaSparkContext jsc, String instantTime, String basePath) throws Exception { - SparkRDDWriteClient client = createHoodieClient(jsc, basePath); + private static int rollback(JavaSparkContext jsc, String instantTime, String basePath, Boolean rollbackUsingMarkers) throws Exception { + SparkRDDWriteClient client = createHoodieClient(jsc, basePath, rollbackUsingMarkers); if (client.rollback(instantTime)) { LOG.info(String.format("The commit \"%s\" rolled back.", instantTime)); return 0; @@ -466,7 +466,7 @@ public class SparkMain { * @throws Exception */ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) { - HoodieWriteConfig config = getWriteConfig(basePath); + HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue())); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) @@ -482,13 +482,18 @@ public class SparkMain { } } - private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { - HoodieWriteConfig config = getWriteConfig(basePath); + private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers) throws Exception { + HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers); return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config); } - private static HoodieWriteConfig getWriteConfig(String basePath) { + private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { + return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue())); + } + + private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers) { return HoodieWriteConfig.newBuilder().withPath(basePath) + .withRollbackUsingMarkers(rollbackUsingMarkers) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java index b3c5c06be..18f4a387d 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java @@ -100,5 +100,18 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest { HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); assertEquals(2, timeline.getCommitsTimeline().countInstants(), "There should have 2 instants."); + + // rollback complete commit + CommandResult cr2 = getShell().executeCommand(String.format("commit rollback --commit %s --sparkMaster %s --sparkMemory %s", + "101", "local", "4G")); + assertTrue(cr2.isSuccess()); + + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + + HoodieActiveTimeline rollbackTimeline2 = new RollbacksCommand.RollbackTimeline(metaClient); + assertEquals(1, rollbackTimeline2.getRollbackTimeline().countInstants(), "There should have 2 rollback instant."); + + HoodieActiveTimeline timeline2 = metaClient.reloadActiveTimeline(); + assertEquals(2, timeline2.getCommitsTimeline().countInstants(), "There should have 1 instants."); } }