1
0

[HUDI-118]: Options provided for passing properties to Cleaner, compactor and importer commands

This commit is contained in:
Pratyaksh Sharma
2019-12-28 23:46:09 +05:30
committed by n3nash
parent ff1113f3b7
commit 290278fc6c
6 changed files with 138 additions and 25 deletions

View File

@@ -23,17 +23,24 @@ import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption; import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import scala.collection.JavaConverters;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@@ -113,4 +120,32 @@ public class CleansCommand implements CommandMarker {
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
} }
@CliCommand(value = "cleans run", help = "run clean")
public String runClean(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for cleaning",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master) throws IOException, InterruptedException, URISyntaxException {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String cmd = SparkMain.SparkCommand.CLEAN.toString();
sparkLauncher.addAppArgs(cmd, metaClient.getBasePath(), master, propsFilePath, sparkMemory);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to clean hoodie dataset";
}
return "Cleaned hoodie dataset";
}
} }

View File

@@ -42,6 +42,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.launcher.SparkLauncher;
@@ -175,7 +176,11 @@ public class CompactionCommand implements CommandMarker {
@CliCommand(value = "compaction schedule", help = "Schedule Compaction") @CliCommand(value = "compaction schedule", help = "Schedule Compaction")
public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G", public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G",
help = "Spark executor memory") final String sparkMemory) throws Exception { help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs) throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf(); boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized); HoodieCLI.initFS(initialized);
@@ -187,7 +192,8 @@ public class CompactionCommand implements CommandMarker {
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), client.getBasePath(), sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), client.getBasePath(),
client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory); client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor(); int exitCode = process.waitFor();
@@ -207,7 +213,11 @@ public class CompactionCommand implements CommandMarker {
help = "Spark executor memory") final String sparkMemory, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
@CliOption(key = "compactionInstant", mandatory = false, @CliOption(key = "compactionInstant", mandatory = false,
help = "Base path for the target hoodie dataset") String compactionInstantTime) help = "Base path for the target hoodie dataset") String compactionInstantTime,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs)
throws Exception { throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf(); boolean initialized = HoodieCLI.initConf();
@@ -224,13 +234,13 @@ public class CompactionCommand implements CommandMarker {
} }
compactionInstantTime = firstPendingInstant.get(); compactionInstantTime = firstPendingInstant.get();
} }
String sparkPropertiesPath = String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), client.getBasePath(), sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), client.getBasePath(),
client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath,
sparkMemory, retry); sparkMemory, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor(); int exitCode = process.waitFor();

View File

@@ -24,8 +24,7 @@ import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator; import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator;
import org.apache.log4j.LogManager; import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils; import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.CommandMarker;
@@ -41,8 +40,6 @@ import scala.collection.JavaConverters;
@Component @Component
public class HDFSParquetImportCommand implements CommandMarker { public class HDFSParquetImportCommand implements CommandMarker {
private static final Logger LOG = LogManager.getLogger(HDFSParquetImportCommand.class);
@CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset") @CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset")
public String convert( public String convert(
@CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false", @CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false",
@@ -61,7 +58,11 @@ public class HDFSParquetImportCommand implements CommandMarker {
help = "Path for Avro schema file") final String schemaFilePath, help = "Path for Avro schema file") final String schemaFilePath,
@CliOption(key = "format", mandatory = true, help = "Format for the input data") final String format, @CliOption(key = "format", mandatory = true, help = "Format for the input data") final String format,
@CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry) throws Exception { @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for importing",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs) throws Exception {
(new FormatValidator()).validate("format", format); (new FormatValidator()).validate("format", format);
@@ -78,7 +79,8 @@ public class HDFSParquetImportCommand implements CommandMarker {
} }
sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, rowKeyField, partitionPathField, sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, rowKeyField, partitionPathField,
parallelism, schemaFilePath, sparkMemory, retry); parallelism, schemaFilePath, sparkMemory, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor(); int exitCode = process.waitFor();

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.cli.commands; package org.apache.hudi.cli.commands;
import com.google.common.base.Strings;
import org.apache.hudi.HoodieWriteClient; import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.cli.utils.SparkUtil;
@@ -28,6 +29,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.compact.strategy.UnBoundedCompactionStrategy; import org.apache.hudi.io.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.utilities.HDFSParquetImporter; import org.apache.hudi.utilities.HDFSParquetImporter;
import org.apache.hudi.utilities.HDFSParquetImporter.Config; import org.apache.hudi.utilities.HDFSParquetImporter.Config;
import org.apache.hudi.utilities.HoodieCleaner;
import org.apache.hudi.utilities.HoodieCompactionAdminTool; import org.apache.hudi.utilities.HoodieCompactionAdminTool;
import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation; import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation;
import org.apache.hudi.utilities.HoodieCompactor; import org.apache.hudi.utilities.HoodieCompactor;
@@ -36,6 +38,10 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/** /**
* This class deals with initializing spark context based on command entered to hudi-cli. * This class deals with initializing spark context based on command entered to hudi-cli.
*/ */
@@ -47,7 +53,7 @@ public class SparkMain {
* Commands. * Commands.
*/ */
enum SparkCommand { enum SparkCommand {
ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@@ -73,18 +79,42 @@ public class SparkMain {
break; break;
case IMPORT: case IMPORT:
case UPSERT: case UPSERT:
assert (args.length == 11); assert (args.length >= 12);
String propsFilePath = null;
if (!Strings.isNullOrEmpty(args[11])) {
propsFilePath = args[11];
}
List<String> configs = new ArrayList<>();
if (args.length > 12) {
configs.addAll(Arrays.asList(args).subList(12, args.length));
}
returnCode = dataLoad(jsc, command, args[1], args[2], args[3], args[4], args[5], args[6], returnCode = dataLoad(jsc, command, args[1], args[2], args[3], args[4], args[5], args[6],
Integer.parseInt(args[7]), args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10])); Integer.parseInt(args[7]), args[8], args[9], Integer.parseInt(args[10]), propsFilePath, configs);
break; break;
case COMPACT_RUN: case COMPACT_RUN:
assert (args.length == 8); assert (args.length >= 9);
propsFilePath = null;
if (!Strings.isNullOrEmpty(args[8])) {
propsFilePath = args[8];
}
configs = new ArrayList<>();
if (args.length > 9) {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
Integer.parseInt(args[7]), false); Integer.parseInt(args[7]), false, propsFilePath, configs);
break; break;
case COMPACT_SCHEDULE: case COMPACT_SCHEDULE:
assert (args.length == 5); assert (args.length >= 6);
returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true); propsFilePath = null;
if (!Strings.isNullOrEmpty(args[5])) {
propsFilePath = args[5];
}
configs = new ArrayList<>();
if (args.length > 6) {
configs.addAll(Arrays.asList(args).subList(6, args.length));
}
returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true, propsFilePath, configs);
break; break;
case COMPACT_VALIDATE: case COMPACT_VALIDATE:
assert (args.length == 7); assert (args.length == 7);
@@ -109,15 +139,40 @@ public class SparkMain {
Boolean.valueOf(args[7]), Boolean.valueOf(args[8])); Boolean.valueOf(args[7]), Boolean.valueOf(args[8]));
returnCode = 0; returnCode = 0;
break; break;
case CLEAN:
assert (args.length >= 5);
propsFilePath = null;
if (!Strings.isNullOrEmpty(args[3])) {
propsFilePath = args[3];
}
configs = new ArrayList<>();
if (args.length > 5) {
configs.addAll(Arrays.asList(args).subList(5, args.length));
}
clean(jsc, args[1], args[2], propsFilePath, args[4], configs);
break;
default: default:
break; break;
} }
System.exit(returnCode); System.exit(returnCode);
} }
private static void clean(JavaSparkContext jsc, String basePath, String sparkMaster, String propsFilePath,
String sparkMemory, List<String> configs) throws Exception {
HoodieCleaner.Config cfg = new HoodieCleaner.Config();
cfg.basePath = basePath;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
new HoodieCleaner(cfg, jsc).run();
}
private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName, private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMaster, String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMemory,
String sparkMemory, int retry) throws Exception { int retry, String propsFilePath, List<String> configs) {
Config cfg = new Config(); Config cfg = new Config();
cfg.command = command; cfg.command = command;
cfg.srcPath = srcPath; cfg.srcPath = srcPath;
@@ -128,6 +183,8 @@ public class SparkMain {
cfg.partitionKey = partitionKey; cfg.partitionKey = partitionKey;
cfg.parallelism = parallelism; cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile; cfg.schemaFile = schemaFile;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory); jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HDFSParquetImporter(cfg).dataImport(jsc, retry); return new HDFSParquetImporter(cfg).dataImport(jsc, retry);
} }
@@ -200,7 +257,8 @@ public class SparkMain {
} }
private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule) throws Exception { int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule, String propsFilePath,
List<String> configs) {
HoodieCompactor.Config cfg = new HoodieCompactor.Config(); HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath; cfg.basePath = basePath;
cfg.tableName = tableName; cfg.tableName = tableName;
@@ -210,12 +268,14 @@ public class SparkMain {
cfg.parallelism = parallelism; cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile; cfg.schemaFile = schemaFile;
cfg.runSchedule = schedule; cfg.runSchedule = schedule;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory); jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HoodieCompactor(cfg).compact(jsc, retry); return new HoodieCompactor(cfg).compact(jsc, retry);
} }
private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath,
String repairedOutputPath, String basePath) throws Exception { String repairedOutputPath, String basePath) {
DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc),
FSUtils.getFs(basePath, jsc.hadoopConfiguration())); FSUtils.getFs(basePath, jsc.hadoopConfiguration()));
job.fixDuplicates(true); job.fixDuplicates(true);

View File

@@ -69,7 +69,7 @@ public class HoodieCompactor {
public int retry = 0; public int retry = 0;
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false) @Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false)
public Boolean runSchedule = false; public Boolean runSchedule = false;
@Parameter(names = {"--strategy", "-st"}, description = "Stratgey Class", required = false) @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", required = false)
public String strategyClassName = null; public String strategyClassName = null;
@Parameter(names = {"--help", "-h"}, help = true) @Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false; public Boolean help = false;

View File

@@ -43,6 +43,7 @@ import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import java.io.BufferedReader; import java.io.BufferedReader;
@@ -50,6 +51,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.StringReader; import java.io.StringReader;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -114,7 +116,7 @@ public class UtilHelpers {
public static TypedProperties buildProperties(List<String> props) { public static TypedProperties buildProperties(List<String> props) {
TypedProperties properties = new TypedProperties(); TypedProperties properties = new TypedProperties();
props.stream().forEach(x -> { props.forEach(x -> {
String[] kv = x.split("="); String[] kv = x.split("=");
Preconditions.checkArgument(kv.length == 2); Preconditions.checkArgument(kv.length == 2);
properties.setProperty(kv[0], kv[1]); properties.setProperty(kv[0], kv[1]);
@@ -122,6 +124,10 @@ public class UtilHelpers {
return properties; return properties;
} }
public static void validateAndAddProperties(String[] configs, SparkLauncher sparkLauncher) {
Arrays.stream(configs).filter(config -> config.contains("=") && config.split("=").length == 2).forEach(sparkLauncher::addAppArgs);
}
/** /**
* Parse Schema from file. * Parse Schema from file.
* *