1
0

[HUDI-2850] Fixing Clustering CLI - schedule and run command fixes to avoid NumberFormatException (#4101)

This commit is contained in:
Manoj Govindassamy
2021-11-26 04:17:23 -08:00
committed by GitHub
parent e9efbdb63c
commit 3d75aca40d
4 changed files with 62 additions and 46 deletions

View File

@@ -40,14 +40,21 @@ public class ClusteringCommand implements CommandMarker {
private static final Logger LOG = LogManager.getLogger(ClusteringCommand.class);
/**
* Schedule clustering table service.
* <p>
* Example:
* > connect --path {path to hudi table}
* > clustering schedule --sparkMaster local --sparkMemory 2g
*/
@CliCommand(value = "clustering schedule", help = "Schedule Clustering")
public String scheduleClustering(
@CliOption(key = "sparkMemory", help = "Spark executor memory",
unspecifiedDefaultValue = "1G") final String sparkMemory,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for clustering",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs) throws Exception {
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1g", help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations "
+ "for hoodie client for clustering", unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can "
+ "be passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
@@ -59,8 +66,8 @@ public class ClusteringCommand implements CommandMarker {
// First get a clustering instant time and pass it to spark launcher for scheduling clustering
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), client.getBasePath(),
client.getTableConfig().getTableName(), clusteringInstantTime, sparkMemory, propsFilePath);
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), master, sparkMemory,
client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -71,21 +78,25 @@ public class ClusteringCommand implements CommandMarker {
return "Succeeded to schedule clustering for " + clusteringInstantTime;
}
/**
* Run clustering table service.
* <p>
* Example:
* > connect --path {path to hudi table}
* > clustering schedule --sparkMaster local --sparkMemory 2g
* > clustering run --sparkMaster local --sparkMemory 2g --clusteringInstant 20211124005208
*/
@CliCommand(value = "clustering run", help = "Run Clustering")
public String runClustering(
@CliOption(key = "parallelism", help = "Parallelism for hoodie clustering",
unspecifiedDefaultValue = "1") final String parallelism,
@CliOption(key = "sparkMemory", help = "Spark executor memory",
unspecifiedDefaultValue = "4G") final String sparkMemory,
@CliOption(key = "retry", help = "Number of retries",
unspecifiedDefaultValue = "1") final String retry,
@CliOption(key = "clusteringInstant", help = "Clustering instant time",
mandatory = true) final String clusteringInstantTime,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs
) throws Exception {
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
@CliOption(key = "sparkMemory", help = "Spark executor memory", unspecifiedDefaultValue = "4g") final String sparkMemory,
@CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", unspecifiedDefaultValue = "1") final String parallelism,
@CliOption(key = "retry", help = "Number of retries", unspecifiedDefaultValue = "1") final String retry,
@CliOption(key = "clusteringInstant", help = "Clustering instant time", mandatory = true) final String clusteringInstantTime,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for "
+ "hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be "
+ "passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
@@ -93,8 +104,9 @@ public class ClusteringCommand implements CommandMarker {
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), client.getBasePath(),
client.getTableConfig().getTableName(), clusteringInstantTime, parallelism, sparkMemory, retry, propsFilePath);
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), master, sparkMemory,
client.getBasePath(), client.getTableConfig().getTableName(), clusteringInstantTime,
parallelism, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -79,12 +80,14 @@ public class SparkMain {
}
public static void main(String[] args) throws Exception {
String command = args[0];
LOG.info("Invoking SparkMain:" + command);
ValidationUtils.checkArgument(args.length >= 4);
final String commandString = args[0];
LOG.info("Invoking SparkMain: " + commandString);
final SparkCommand cmd = SparkCommand.valueOf(commandString);
SparkCommand cmd = SparkCommand.valueOf(command);
JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + commandString,
Option.of(args[1]), Option.of(args[2]));
JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2]));
int returnCode = 0;
try {
switch (cmd) {
@@ -111,8 +114,8 @@ public class SparkMain {
if (args.length > 13) {
configs.addAll(Arrays.asList(args).subList(13, args.length));
}
returnCode = dataLoad(jsc, command, args[3], args[4], args[5], args[6], args[7], args[8],
Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
returnCode = dataLoad(jsc, commandString, args[3], args[4], args[5], args[6], args[7], args[8],
Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
break;
case COMPACT_RUN:
assert (args.length >= 10);
@@ -159,33 +162,34 @@ public class SparkMain {
case COMPACT_UNSCHEDULE_PLAN:
assert (args.length == 9);
doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]),
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case CLUSTERING_RUN:
assert (args.length >= 8);
assert (args.length >= 9);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[7])) {
propsFilePath = args[7];
if (!StringUtils.isNullOrEmpty(args[8])) {
propsFilePath = args[8];
}
configs = new ArrayList<>();
if (args.length > 8) {
configs.addAll(Arrays.asList(args).subList(8, args.length));
if (args.length > 9) {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = cluster(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5],
Integer.parseInt(args[6]), false, propsFilePath, configs);
returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2],
Integer.parseInt(args[7]), false, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE:
assert (args.length >= 6);
assert (args.length >= 7);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[5])) {
propsFilePath = args[5];
if (!StringUtils.isNullOrEmpty(args[6])) {
propsFilePath = args[6];
}
configs = new ArrayList<>();
if (args.length > 6) {
configs.addAll(Arrays.asList(args).subList(6, args.length));
if (args.length > 7) {
configs.addAll(Arrays.asList(args).subList(7, args.length));
}
returnCode = cluster(jsc, args[1], args[2], args[3], 1, args[4], 0, true, propsFilePath, configs);
returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
0, true, propsFilePath, configs);
break;
case CLEAN:
assert (args.length >= 5);
@@ -229,7 +233,7 @@ public class SparkMain {
break;
}
} catch (Throwable throwable) {
LOG.error("Fail to execute command", throwable);
LOG.error("Fail to execute commandString", throwable);
returnCode = -1;
} finally {
jsc.stop();

View File

@@ -40,7 +40,7 @@ import java.util.Objects;
*/
public class SparkUtil {
private static final String DEFAULT_SPARK_MASTER = "yarn";
public static final String DEFAULT_SPARK_MASTER = "yarn";
/**
* TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro.