[HUDI-706] Add unit test for SavepointsCommand (#1624)
This commit is contained in:
@@ -83,6 +83,11 @@ public class HoodieTableHeaderFields {
|
|||||||
public static final String HEADER_OLD_VALUE = "Old Value";
|
public static final String HEADER_OLD_VALUE = "Old Value";
|
||||||
public static final String HEADER_NEW_VALUE = "New Value";
|
public static final String HEADER_NEW_VALUE = "New Value";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fields of Savepoints.
|
||||||
|
*/
|
||||||
|
public static final String HEADER_SAVEPOINT_TIME = "SavepointTime";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fields of Rollback.
|
* Fields of Rollback.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands;
|
|||||||
|
|
||||||
import org.apache.hudi.cli.HoodieCLI;
|
import org.apache.hudi.cli.HoodieCLI;
|
||||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||||
|
import org.apache.hudi.cli.HoodieTableHeaderFields;
|
||||||
import org.apache.hudi.cli.utils.InputStreamConsumer;
|
import org.apache.hudi.cli.utils.InputStreamConsumer;
|
||||||
import org.apache.hudi.cli.utils.SparkUtil;
|
import org.apache.hudi.cli.utils.SparkUtil;
|
||||||
import org.apache.hudi.client.HoodieWriteClient;
|
import org.apache.hudi.client.HoodieWriteClient;
|
||||||
@@ -30,7 +31,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
|||||||
import org.apache.hudi.config.HoodieIndexConfig;
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieSavepointException;
|
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
@@ -40,7 +40,6 @@ import org.springframework.shell.core.annotation.CliCommand;
|
|||||||
import org.springframework.shell.core.annotation.CliOption;
|
import org.springframework.shell.core.annotation.CliOption;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -51,7 +50,7 @@ import java.util.stream.Collectors;
|
|||||||
public class SavepointsCommand implements CommandMarker {
|
public class SavepointsCommand implements CommandMarker {
|
||||||
|
|
||||||
@CliCommand(value = "savepoints show", help = "Show the savepoints")
|
@CliCommand(value = "savepoints show", help = "Show the savepoints")
|
||||||
public String showSavepoints() throws IOException {
|
public String showSavepoints() {
|
||||||
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
||||||
HoodieTimeline timeline = activeTimeline.getSavePointTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = activeTimeline.getSavePointTimeline().filterCompletedInstants();
|
||||||
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
|
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
|
||||||
@@ -60,13 +59,19 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
HoodieInstant commit = commits.get(i);
|
HoodieInstant commit = commits.get(i);
|
||||||
rows[i] = new String[] {commit.getTimestamp()};
|
rows[i] = new String[] {commit.getTimestamp()};
|
||||||
}
|
}
|
||||||
return HoodiePrintHelper.print(new String[] {"SavepointTime"}, rows);
|
return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_SAVEPOINT_TIME}, rows);
|
||||||
}
|
}
|
||||||
|
|
||||||
@CliCommand(value = "savepoint create", help = "Savepoint a commit")
|
@CliCommand(value = "savepoint create", help = "Savepoint a commit")
|
||||||
public String savepoint(@CliOption(key = {"commit"}, help = "Commit to savepoint") final String commitTime,
|
public String savepoint(@CliOption(key = {"commit"}, help = "Commit to savepoint") final String commitTime,
|
||||||
@CliOption(key = {"user"}, unspecifiedDefaultValue = "default", help = "User who is creating the savepoint") final String user,
|
@CliOption(key = {"user"}, unspecifiedDefaultValue = "default",
|
||||||
@CliOption(key = {"comments"}, unspecifiedDefaultValue = "default", help = "Comments for creating the savepoint") final String comments)
|
help = "User who is creating the savepoint") final String user,
|
||||||
|
@CliOption(key = {"comments"}, unspecifiedDefaultValue = "default",
|
||||||
|
help = "Comments for creating the savepoint") final String comments,
|
||||||
|
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
|
||||||
|
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
|
||||||
|
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
|
||||||
|
help = "Spark executor memory") final String sparkMemory)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
||||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||||
@@ -77,25 +82,27 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
return "Commit " + commitTime + " not found in Commits " + timeline;
|
return "Commit " + commitTime + " not found in Commits " + timeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
String result;
|
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
||||||
try (JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Create Savepoint")) {
|
sparkLauncher.addAppArgs(SparkMain.SparkCommand.SAVEPOINT.toString(), master, sparkMemory, commitTime,
|
||||||
HoodieWriteClient client = createHoodieClient(jsc, metaClient.getBasePath());
|
user, comments, metaClient.getBasePath());
|
||||||
try {
|
Process process = sparkLauncher.launch();
|
||||||
client.savepoint(commitTime, user, comments);
|
InputStreamConsumer.captureOutput(process);
|
||||||
// Refresh the current
|
int exitCode = process.waitFor();
|
||||||
refreshMetaClient();
|
// Refresh the current
|
||||||
result = String.format("The commit \"%s\" has been savepointed.", commitTime);
|
refreshMetaClient();
|
||||||
} catch (HoodieSavepointException se) {
|
if (exitCode != 0) {
|
||||||
result = String.format("Failed: Could not create savepoint \"%s\".", commitTime);
|
return String.format("Failed: Could not create savepoint \"%s\".", commitTime);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return result;
|
return String.format("The commit \"%s\" has been savepointed.", commitTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@CliCommand(value = "savepoint rollback", help = "Savepoint a commit")
|
@CliCommand(value = "savepoint rollback", help = "Savepoint a commit")
|
||||||
public String rollbackToSavepoint(
|
public String rollbackToSavepoint(
|
||||||
@CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String instantTime,
|
@CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String instantTime,
|
||||||
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
|
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
|
||||||
|
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
|
||||||
|
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
|
||||||
|
help = "Spark executor memory") final String sparkMemory)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
||||||
if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) {
|
if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) {
|
||||||
@@ -110,17 +117,17 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
||||||
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), instantTime,
|
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), master, sparkMemory,
|
||||||
metaClient.getBasePath());
|
instantTime, metaClient.getBasePath());
|
||||||
Process process = sparkLauncher.launch();
|
Process process = sparkLauncher.launch();
|
||||||
InputStreamConsumer.captureOutput(process);
|
InputStreamConsumer.captureOutput(process);
|
||||||
int exitCode = process.waitFor();
|
int exitCode = process.waitFor();
|
||||||
// Refresh the current
|
// Refresh the current
|
||||||
refreshMetaClient();
|
refreshMetaClient();
|
||||||
if (exitCode != 0) {
|
if (exitCode != 0) {
|
||||||
return "Savepoint " + instantTime + " failed to roll back";
|
return String.format("Savepoint \"%s\" failed to roll back", instantTime);
|
||||||
}
|
}
|
||||||
return "Savepoint " + instantTime + " rolled back";
|
return String.format("Savepoint \"%s\" rolled back", instantTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
|
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
|
||||||
@@ -130,7 +137,12 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@CliCommand(value = "savepoint delete", help = "Delete the savepoint")
|
@CliCommand(value = "savepoint delete", help = "Delete the savepoint")
|
||||||
public String deleteSavepoint(@CliOption(key = {"commit"}, help = "Delete a savepoint") final String instantTime) throws Exception {
|
public String deleteSavepoint(@CliOption(key = {"commit"}, help = "Delete a savepoint") final String instantTime,
|
||||||
|
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
|
||||||
|
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
|
||||||
|
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
|
||||||
|
help = "Spark executor memory") final String sparkMemory)
|
||||||
|
throws Exception {
|
||||||
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
|
||||||
HoodieTimeline completedInstants = metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
|
HoodieTimeline completedInstants = metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
|
||||||
if (completedInstants.empty()) {
|
if (completedInstants.empty()) {
|
||||||
@@ -142,12 +154,18 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
return "Commit " + instantTime + " not found in Commits " + completedInstants;
|
return "Commit " + instantTime + " not found in Commits " + completedInstants;
|
||||||
}
|
}
|
||||||
|
|
||||||
try (JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Delete Savepoint")) {
|
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
|
||||||
HoodieWriteClient client = createHoodieClient(jsc, metaClient.getBasePath());
|
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DELETE_SAVEPOINT.toString(), master, sparkMemory, instantTime,
|
||||||
client.deleteSavepoint(instantTime);
|
metaClient.getBasePath());
|
||||||
refreshMetaClient();
|
Process process = sparkLauncher.launch();
|
||||||
|
InputStreamConsumer.captureOutput(process);
|
||||||
|
int exitCode = process.waitFor();
|
||||||
|
// Refresh the current
|
||||||
|
refreshMetaClient();
|
||||||
|
if (exitCode != 0) {
|
||||||
|
return String.format("Failed: Could not delete savepoint \"%s\".", instantTime);
|
||||||
}
|
}
|
||||||
return "Savepoint " + instantTime + " deleted";
|
return String.format("Savepoint \"%s\" deleted.", instantTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
|
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.config.HoodieIndexConfig;
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieSavepointException;
|
||||||
import org.apache.hudi.index.HoodieIndex;
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
|
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
|
||||||
import org.apache.hudi.utilities.HDFSParquetImporter;
|
import org.apache.hudi.utilities.HDFSParquetImporter;
|
||||||
@@ -54,7 +55,8 @@ public class SparkMain {
|
|||||||
* Commands.
|
* Commands.
|
||||||
*/
|
*/
|
||||||
enum SparkCommand {
|
enum SparkCommand {
|
||||||
ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN
|
ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN,
|
||||||
|
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
@@ -77,8 +79,8 @@ public class SparkMain {
|
|||||||
returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], args[6]);
|
returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5], args[6]);
|
||||||
break;
|
break;
|
||||||
case ROLLBACK_TO_SAVEPOINT:
|
case ROLLBACK_TO_SAVEPOINT:
|
||||||
assert (args.length == 3);
|
assert (args.length == 5);
|
||||||
returnCode = rollbackToSavepoint(jsc, args[1], args[2]);
|
returnCode = rollbackToSavepoint(jsc, args[3], args[4]);
|
||||||
break;
|
break;
|
||||||
case IMPORT:
|
case IMPORT:
|
||||||
case UPSERT:
|
case UPSERT:
|
||||||
@@ -154,6 +156,14 @@ public class SparkMain {
|
|||||||
}
|
}
|
||||||
clean(jsc, args[3], propsFilePath, configs);
|
clean(jsc, args[3], propsFilePath, configs);
|
||||||
break;
|
break;
|
||||||
|
case SAVEPOINT:
|
||||||
|
assert (args.length == 7);
|
||||||
|
returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]);
|
||||||
|
break;
|
||||||
|
case DELETE_SAVEPOINT:
|
||||||
|
assert (args.length == 5);
|
||||||
|
returnCode = deleteSavepoint(jsc, args[3], args[4]);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -163,7 +173,8 @@ public class SparkMain {
|
|||||||
private static boolean sparkMasterContained(SparkCommand command) {
|
private static boolean sparkMasterContained(SparkCommand command) {
|
||||||
List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
|
List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
|
||||||
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
|
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
|
||||||
SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE);
|
SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE, SparkCommand.SAVEPOINT,
|
||||||
|
SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT);
|
||||||
return masterContained.contains(command);
|
return masterContained.contains(command);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -276,7 +287,20 @@ public class SparkMain {
|
|||||||
LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
|
LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
LOG.info(String.format("The commit \"%s\" failed to roll back.", instantTime));
|
LOG.warn(String.format("The commit \"%s\" failed to roll back.", instantTime));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int createSavepoint(JavaSparkContext jsc, String commitTime, String user,
|
||||||
|
String comments, String basePath) throws Exception {
|
||||||
|
HoodieWriteClient client = createHoodieClient(jsc, basePath);
|
||||||
|
try {
|
||||||
|
client.savepoint(commitTime, user, comments);
|
||||||
|
LOG.info(String.format("The commit \"%s\" has been savepointed.", commitTime));
|
||||||
|
return 0;
|
||||||
|
} catch (HoodieSavepointException se) {
|
||||||
|
LOG.warn(String.format("Failed: Could not create savepoint \"%s\".", commitTime));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -288,7 +312,19 @@ public class SparkMain {
|
|||||||
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
|
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
|
||||||
return 0;
|
return 0;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.info(String.format("The commit \"%s\" failed to roll back.", savepointTime));
|
LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
|
||||||
|
HoodieWriteClient client = createHoodieClient(jsc, basePath);
|
||||||
|
try {
|
||||||
|
client.deleteSavepoint(savepointTime);
|
||||||
|
LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
|
||||||
|
return 0;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,110 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.cli.commands;
|
||||||
|
|
||||||
|
import org.apache.hudi.cli.AbstractShellIntegrationTest;
|
||||||
|
import org.apache.hudi.cli.HoodieCLI;
|
||||||
|
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||||
|
import org.apache.hudi.cli.HoodieTableHeaderFields;
|
||||||
|
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.shell.core.CommandResult;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Comparator;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class for {@link org.apache.hudi.cli.commands.SavepointsCommand}.
|
||||||
|
*/
|
||||||
|
public class TestSavepointsCommand extends AbstractShellIntegrationTest {
|
||||||
|
|
||||||
|
private String tablePath;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void init() throws IOException {
|
||||||
|
String tableName = "test_table";
|
||||||
|
tablePath = basePath + File.separator + tableName;
|
||||||
|
|
||||||
|
// Create table and connect
|
||||||
|
new TableCommand().createTable(
|
||||||
|
tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
|
||||||
|
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case of command 'savepoints show'.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testShowSavepoints() throws IOException {
|
||||||
|
// generate four savepoints
|
||||||
|
for (int i = 100; i < 104; i++) {
|
||||||
|
String instantTime = String.valueOf(i);
|
||||||
|
HoodieTestDataGenerator.createSavepointFile(tablePath, instantTime, jsc.hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
CommandResult cr = getShell().executeCommand("savepoints show");
|
||||||
|
assertTrue(cr.isSuccess());
|
||||||
|
|
||||||
|
// generate expect result
|
||||||
|
String[][] rows = Arrays.asList("100", "101", "102", "103").stream().sorted(Comparator.reverseOrder())
|
||||||
|
.map(instant -> new String[]{instant}).toArray(String[][]::new);
|
||||||
|
String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_SAVEPOINT_TIME}, rows);
|
||||||
|
|
||||||
|
assertEquals(expected, cr.getResult().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case of command 'savepoints refresh'.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRefreshMetaClient() throws IOException {
|
||||||
|
HoodieTimeline timeline =
|
||||||
|
HoodieCLI.getTableMetaClient().getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
|
||||||
|
assertEquals(0, timeline.countInstants(), "There should have no instant at first");
|
||||||
|
|
||||||
|
// generate four savepoints
|
||||||
|
for (int i = 100; i < 104; i++) {
|
||||||
|
String instantTime = String.valueOf(i);
|
||||||
|
HoodieTestDataGenerator.createSavepointFile(tablePath, instantTime, jsc.hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Before refresh, no instant
|
||||||
|
timeline =
|
||||||
|
HoodieCLI.getTableMetaClient().getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
|
||||||
|
assertEquals(0, timeline.countInstants(), "there should have no instant");
|
||||||
|
|
||||||
|
CommandResult cr = getShell().executeCommand("savepoints refresh");
|
||||||
|
assertTrue(cr.isSuccess());
|
||||||
|
|
||||||
|
timeline =
|
||||||
|
HoodieCLI.getTableMetaClient().getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
|
||||||
|
|
||||||
|
// After refresh, there are 4 instants
|
||||||
|
assertEquals(4, timeline.countInstants(), "there should have 4 instants");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,157 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.cli.integ;
|
||||||
|
|
||||||
|
import org.apache.hudi.cli.AbstractShellIntegrationTest;
|
||||||
|
import org.apache.hudi.cli.HoodieCLI;
|
||||||
|
import org.apache.hudi.cli.commands.TableCommand;
|
||||||
|
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import org.springframework.shell.core.CommandResult;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertAll;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration test class for {@link org.apache.hudi.cli.commands.SavepointsCommand}.
|
||||||
|
* <p/>
|
||||||
|
* A command use SparkLauncher need load jars under lib which generate during mvn package.
|
||||||
|
* Use integration test instead of unit test.
|
||||||
|
*/
|
||||||
|
public class ITTestSavepointsCommand extends AbstractShellIntegrationTest {
|
||||||
|
|
||||||
|
private String tablePath;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void init() throws IOException {
|
||||||
|
String tableName = "test_table";
|
||||||
|
tablePath = basePath + File.separator + tableName;
|
||||||
|
|
||||||
|
// Create table and connect
|
||||||
|
new TableCommand().createTable(
|
||||||
|
tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
|
||||||
|
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case of command 'savepoint create'.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSavepoint() {
|
||||||
|
// generate four savepoints
|
||||||
|
for (int i = 100; i < 104; i++) {
|
||||||
|
String instantTime = String.valueOf(i);
|
||||||
|
HoodieTestDataGenerator.createCommitFile(tablePath, instantTime, jsc.hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
String savepoint = "102";
|
||||||
|
CommandResult cr = getShell().executeCommand(
|
||||||
|
String.format("savepoint create --commit %s --sparkMaster %s", savepoint, "local"));
|
||||||
|
|
||||||
|
assertAll("Command run failed",
|
||||||
|
() -> assertTrue(cr.isSuccess()),
|
||||||
|
() -> assertEquals(
|
||||||
|
String.format("The commit \"%s\" has been savepointed.", savepoint), cr.getResult().toString()));
|
||||||
|
|
||||||
|
// there is 1 savepoint instant
|
||||||
|
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
||||||
|
assertEquals(1, timeline.getSavePointTimeline().countInstants());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case of command 'savepoint rollback'.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRollbackToSavepoint() throws IOException {
|
||||||
|
// generate four savepoints
|
||||||
|
for (int i = 100; i < 104; i++) {
|
||||||
|
String instantTime = String.valueOf(i);
|
||||||
|
HoodieTestDataGenerator.createCommitFile(tablePath, instantTime, jsc.hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
// generate one savepoint
|
||||||
|
String savepoint = "102";
|
||||||
|
HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
CommandResult cr = getShell().executeCommand(
|
||||||
|
String.format("savepoint rollback --savepoint %s --sparkMaster %s", savepoint, "local"));
|
||||||
|
|
||||||
|
assertAll("Command run failed",
|
||||||
|
() -> assertTrue(cr.isSuccess()),
|
||||||
|
() -> assertEquals(
|
||||||
|
String.format("Savepoint \"%s\" rolled back", savepoint), cr.getResult().toString()));
|
||||||
|
|
||||||
|
// there is 1 restore instant
|
||||||
|
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
||||||
|
assertEquals(1, timeline.getRestoreTimeline().countInstants());
|
||||||
|
|
||||||
|
// 103 instant had rollback
|
||||||
|
assertFalse(timeline.getCommitTimeline().containsInstant(
|
||||||
|
new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case of command 'savepoint delete'.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeleteSavepoint() throws IOException {
|
||||||
|
// generate four savepoints
|
||||||
|
for (int i = 100; i < 104; i++) {
|
||||||
|
String instantTime = String.valueOf(i);
|
||||||
|
HoodieTestDataGenerator.createCommitFile(tablePath, instantTime, jsc.hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
// generate two savepoint
|
||||||
|
String savepoint1 = "100";
|
||||||
|
String savepoint2 = "102";
|
||||||
|
HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint1, jsc.hadoopConfiguration());
|
||||||
|
HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint2, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
|
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
|
||||||
|
assertEquals(2, timeline.getSavePointTimeline().countInstants(), "There should 2 instants.");
|
||||||
|
|
||||||
|
CommandResult cr = getShell().executeCommand(
|
||||||
|
String.format("savepoint delete --commit %s --sparkMaster %s", savepoint1, "local"));
|
||||||
|
|
||||||
|
assertAll("Command run failed",
|
||||||
|
() -> assertTrue(cr.isSuccess()),
|
||||||
|
() -> assertEquals(
|
||||||
|
String.format("Savepoint \"%s\" deleted.", savepoint1), cr.getResult().toString()));
|
||||||
|
|
||||||
|
// reload timeline
|
||||||
|
timeline = timeline.reload();
|
||||||
|
assertEquals(1, timeline.getSavePointTimeline().countInstants(), "There should 1 instants.");
|
||||||
|
|
||||||
|
// after delete, 100 instant should not exist.
|
||||||
|
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepoint1)));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user