[MINOR] Fix a potential NPE and some finer points of hudi cli (#5656)
This commit is contained in:
@@ -364,7 +364,7 @@ public class MetadataCommand implements CommandMarker {
|
|||||||
|
|
||||||
private void initJavaSparkContext(Option<String> userDefinedMaster) {
|
private void initJavaSparkContext(Option<String> userDefinedMaster) {
|
||||||
if (jsc == null) {
|
if (jsc == null) {
|
||||||
jsc = SparkUtil.initJavaSparkConf(SparkUtil.getDefaultConf("HoodieCLI", userDefinedMaster));
|
jsc = SparkUtil.initJavaSparkContext(SparkUtil.getDefaultConf("HoodieCLI", userDefinedMaster));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -95,7 +95,7 @@ public class SparkMain {
|
|||||||
LOG.info("Invoking SparkMain: " + commandString);
|
LOG.info("Invoking SparkMain: " + commandString);
|
||||||
final SparkCommand cmd = SparkCommand.valueOf(commandString);
|
final SparkCommand cmd = SparkCommand.valueOf(commandString);
|
||||||
|
|
||||||
JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + commandString,
|
JavaSparkContext jsc = SparkUtil.initJavaSparkContext("hoodie-cli-" + commandString,
|
||||||
Option.of(args[1]), Option.of(args[2]));
|
Option.of(args[1]), Option.of(args[2]));
|
||||||
|
|
||||||
int returnCode = 0;
|
int returnCode = 0;
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ public class UpgradeOrDowngradeCommand implements CommandMarker {
|
|||||||
if (exitCode != 0) {
|
if (exitCode != 0) {
|
||||||
return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion);
|
return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion);
|
||||||
}
|
}
|
||||||
return String.format("Hoodie table upgraded/downgraded to ", toVersion);
|
return String.format("Hoodie table upgraded/downgraded to %s", toVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
@CliCommand(value = "downgrade table", help = "Downgrades a table")
|
@CliCommand(value = "downgrade table", help = "Downgrades a table")
|
||||||
@@ -78,6 +78,6 @@ public class UpgradeOrDowngradeCommand implements CommandMarker {
|
|||||||
if (exitCode != 0) {
|
if (exitCode != 0) {
|
||||||
return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion);
|
return String.format("Failed: Could not Upgrade/Downgrade Hoodie table to \"%s\".", toVersion);
|
||||||
}
|
}
|
||||||
return String.format("Hoodie table upgraded/downgraded to ", toVersion);
|
return String.format("Hoodie table upgraded/downgraded to %s", toVersion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -32,8 +32,8 @@ import org.apache.spark.launcher.SparkLauncher;
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -56,9 +56,12 @@ public class SparkUtil {
|
|||||||
if (!StringUtils.isNullOrEmpty(propertiesFile)) {
|
if (!StringUtils.isNullOrEmpty(propertiesFile)) {
|
||||||
sparkLauncher.setPropertiesFile(propertiesFile);
|
sparkLauncher.setPropertiesFile(propertiesFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
File libDirectory = new File(new File(currentJar).getParent(), "lib");
|
File libDirectory = new File(new File(currentJar).getParent(), "lib");
|
||||||
for (String library : Objects.requireNonNull(libDirectory.list())) {
|
// This lib directory may be not required, such as providing libraries through a bundle jar
|
||||||
sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath());
|
if (libDirectory.exists()) {
|
||||||
|
Arrays.stream(libDirectory.list()).forEach(library ->
|
||||||
|
sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath()));
|
||||||
}
|
}
|
||||||
return sparkLauncher;
|
return sparkLauncher;
|
||||||
}
|
}
|
||||||
@@ -99,20 +102,20 @@ public class SparkUtil {
|
|||||||
return sparkConf;
|
return sparkConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static JavaSparkContext initJavaSparkConf(String name) {
|
public static JavaSparkContext initJavaSparkContext(String name) {
|
||||||
return initJavaSparkConf(name, Option.empty(), Option.empty());
|
return initJavaSparkContext(name, Option.empty(), Option.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static JavaSparkContext initJavaSparkConf(String name, Option<String> master, Option<String> executorMemory) {
|
public static JavaSparkContext initJavaSparkContext(String name, Option<String> master, Option<String> executorMemory) {
|
||||||
SparkConf sparkConf = getDefaultConf(name, master);
|
SparkConf sparkConf = getDefaultConf(name, master);
|
||||||
if (executorMemory.isPresent()) {
|
if (executorMemory.isPresent()) {
|
||||||
sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
|
sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
return initJavaSparkConf(sparkConf);
|
return initJavaSparkContext(sparkConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static JavaSparkContext initJavaSparkConf(SparkConf sparkConf) {
|
public static JavaSparkContext initJavaSparkContext(SparkConf sparkConf) {
|
||||||
SparkRDDWriteClient.registerClasses(sparkConf);
|
SparkRDDWriteClient.registerClasses(sparkConf);
|
||||||
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
|
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
|
||||||
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
|
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
|
||||||
|
|||||||
@@ -22,11 +22,22 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.cli.utils.SparkUtil;
|
import org.apache.hudi.cli.utils.SparkUtil;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
|
||||||
|
import org.apache.spark.launcher.SparkLauncher;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
|
||||||
public class SparkUtilTest {
|
public class SparkUtilTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInitSparkLauncher() throws URISyntaxException {
|
||||||
|
SparkLauncher sparkLauncher = SparkUtil.initLauncher(null);
|
||||||
|
assertNotNull(sparkLauncher);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetDefaultSparkConf() {
|
public void testGetDefaultSparkConf() {
|
||||||
SparkConf sparkConf = SparkUtil.getDefaultConf("test-spark-app", Option.of(""));
|
SparkConf sparkConf = SparkUtil.getDefaultConf("test-spark-app", Option.of(""));
|
||||||
|
|||||||
Reference in New Issue
Block a user