[HUDI-3395] Allow pass rollbackUsingMarkers to Hudi CLI rollback command (#4557)
Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
@@ -232,7 +232,9 @@ public class CommitsCommand implements CommandMarker {
|
|||||||
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
|
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
|
||||||
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
|
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
|
||||||
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
|
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
|
||||||
help = "Spark executor memory") final String sparkMemory)
|
help = "Spark executor memory") final String sparkMemory,
|
||||||
|
@CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "true",
|
||||||
|
help = "Enabling marker based rollback") final String rollbackUsingMarkers)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
||||||
HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
|
||||||
@@ -243,7 +245,7 @@ public class CommitsCommand implements CommandMarker {
|
|||||||
|
|
||||||
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
||||||
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime,
|
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime,
|
||||||
HoodieCLI.getTableMetaClient().getBasePath());
|
HoodieCLI.getTableMetaClient().getBasePath(), rollbackUsingMarkers);
|
||||||
Process process = sparkLauncher.launch();
|
Process process = sparkLauncher.launch();
|
||||||
InputStreamConsumer.captureOutput(process);
|
InputStreamConsumer.captureOutput(process);
|
||||||
int exitCode = process.waitFor();
|
int exitCode = process.waitFor();
|
||||||
|
|||||||
@@ -92,8 +92,8 @@ public class SparkMain {
|
|||||||
try {
|
try {
|
||||||
switch (cmd) {
|
switch (cmd) {
|
||||||
case ROLLBACK:
|
case ROLLBACK:
|
||||||
assert (args.length == 5);
|
assert (args.length == 6);
|
||||||
returnCode = rollback(jsc, args[3], args[4]);
|
returnCode = rollback(jsc, args[3], args[4], Boolean.parseBoolean(args[5]));
|
||||||
break;
|
break;
|
||||||
case DEDUPLICATE:
|
case DEDUPLICATE:
|
||||||
assert (args.length == 8);
|
assert (args.length == 8);
|
||||||
@@ -408,8 +408,8 @@ public class SparkMain {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int rollback(JavaSparkContext jsc, String instantTime, String basePath) throws Exception {
|
private static int rollback(JavaSparkContext jsc, String instantTime, String basePath, Boolean rollbackUsingMarkers) throws Exception {
|
||||||
SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
|
SparkRDDWriteClient client = createHoodieClient(jsc, basePath, rollbackUsingMarkers);
|
||||||
if (client.rollback(instantTime)) {
|
if (client.rollback(instantTime)) {
|
||||||
LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
|
LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
|
||||||
return 0;
|
return 0;
|
||||||
@@ -466,7 +466,7 @@ public class SparkMain {
|
|||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) {
|
protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) {
|
||||||
HoodieWriteConfig config = getWriteConfig(basePath);
|
HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()));
|
||||||
HoodieTableMetaClient metaClient =
|
HoodieTableMetaClient metaClient =
|
||||||
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
|
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
|
||||||
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||||
@@ -482,13 +482,18 @@ public class SparkMain {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
|
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers) throws Exception {
|
||||||
HoodieWriteConfig config = getWriteConfig(basePath);
|
HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers);
|
||||||
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
|
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HoodieWriteConfig getWriteConfig(String basePath) {
|
private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
|
||||||
|
return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers) {
|
||||||
return HoodieWriteConfig.newBuilder().withPath(basePath)
|
return HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||||
|
.withRollbackUsingMarkers(rollbackUsingMarkers)
|
||||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -100,5 +100,18 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest {
|
|||||||
|
|
||||||
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
|
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
|
||||||
assertEquals(2, timeline.getCommitsTimeline().countInstants(), "There should have 2 instants.");
|
assertEquals(2, timeline.getCommitsTimeline().countInstants(), "There should have 2 instants.");
|
||||||
|
|
||||||
|
// rollback complete commit
|
||||||
|
CommandResult cr2 = getShell().executeCommand(String.format("commit rollback --commit %s --sparkMaster %s --sparkMemory %s",
|
||||||
|
"101", "local", "4G"));
|
||||||
|
assertTrue(cr2.isSuccess());
|
||||||
|
|
||||||
|
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
|
||||||
|
|
||||||
|
HoodieActiveTimeline rollbackTimeline2 = new RollbacksCommand.RollbackTimeline(metaClient);
|
||||||
|
assertEquals(1, rollbackTimeline2.getRollbackTimeline().countInstants(), "There should have 2 rollback instant.");
|
||||||
|
|
||||||
|
HoodieActiveTimeline timeline2 = metaClient.reloadActiveTimeline();
|
||||||
|
assertEquals(2, timeline2.getCommitsTimeline().countInstants(), "There should have 1 instants.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user