[HUDI-387] Fix NPE when create savepoint via hudi-cli (#1085)
This commit is contained in:
committed by
vinoth chandar
parent
8df4b83017
commit
24a09c775f
@@ -84,8 +84,8 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
|
|
||||||
@CliCommand(value = "savepoint create", help = "Savepoint a commit")
|
@CliCommand(value = "savepoint create", help = "Savepoint a commit")
|
||||||
public String savepoint(@CliOption(key = {"commit"}, help = "Commit to savepoint") final String commitTime,
|
public String savepoint(@CliOption(key = {"commit"}, help = "Commit to savepoint") final String commitTime,
|
||||||
@CliOption(key = {"user"}, help = "User who is creating the savepoint") final String user,
|
@CliOption(key = {"user"}, unspecifiedDefaultValue = "default", help = "User who is creating the savepoint") final String user,
|
||||||
@CliOption(key = {"comments"}, help = "Comments for creating the savepoint") final String comments)
|
@CliOption(key = {"comments"}, unspecifiedDefaultValue = "default", help = "Comments for creating the savepoint") final String comments)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
|
||||||
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
|
||||||
@@ -95,13 +95,18 @@ public class SavepointsCommand implements CommandMarker {
|
|||||||
return "Commit " + commitTime + " not found in Commits " + timeline;
|
return "Commit " + commitTime + " not found in Commits " + timeline;
|
||||||
}
|
}
|
||||||
|
|
||||||
HoodieWriteClient client = createHoodieClient(null, HoodieCLI.tableMetadata.getBasePath());
|
JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Create Savepoint");
|
||||||
|
HoodieWriteClient client = createHoodieClient(jsc, HoodieCLI.tableMetadata.getBasePath());
|
||||||
|
String result;
|
||||||
if (client.savepoint(commitTime, user, comments)) {
|
if (client.savepoint(commitTime, user, comments)) {
|
||||||
// Refresh the current
|
// Refresh the current
|
||||||
refreshMetaClient();
|
refreshMetaClient();
|
||||||
return String.format("The commit \"%s\" has been savepointed.", commitTime);
|
result = String.format("The commit \"%s\" has been savepointed.", commitTime);
|
||||||
|
} else {
|
||||||
|
result = String.format("Failed: Could not savepoint commit \"%s\".", commitTime);
|
||||||
}
|
}
|
||||||
return String.format("Failed: Could not savepoint commit \"%s\".", commitTime);
|
jsc.close();
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@CliCommand(value = "savepoint rollback", help = "Savepoint a commit")
|
@CliCommand(value = "savepoint rollback", help = "Savepoint a commit")
|
||||||
|
|||||||
@@ -61,10 +61,14 @@ public class SparkUtil {
|
|||||||
|
|
||||||
public static JavaSparkContext initJavaSparkConf(String name) {
|
public static JavaSparkContext initJavaSparkConf(String name) {
|
||||||
SparkConf sparkConf = new SparkConf().setAppName(name);
|
SparkConf sparkConf = new SparkConf().setAppName(name);
|
||||||
String defMasterFromEnv = sparkConf.get("spark.master");
|
|
||||||
|
String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER");
|
||||||
if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) {
|
if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) {
|
||||||
sparkConf.setMaster(DEFUALT_SPARK_MASTER);
|
sparkConf.setMaster(DEFUALT_SPARK_MASTER);
|
||||||
|
} else {
|
||||||
|
sparkConf.setMaster(defMasterFromEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
sparkConf.set("spark.driver.maxResultSize", "2g");
|
sparkConf.set("spark.driver.maxResultSize", "2g");
|
||||||
sparkConf.set("spark.eventLog.overwrite", "true");
|
sparkConf.set("spark.eventLog.overwrite", "true");
|
||||||
|
|||||||
Reference in New Issue
Block a user