1
0

[HUDI-740]Fix can not specify the sparkMaster and code clean for SparkUtil (#1452)

This commit is contained in:
hongdd
2020-04-08 21:33:15 +08:00
committed by GitHub
parent d610252d6b
commit 4e5c8671ef
5 changed files with 103 additions and 59 deletions

View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli;
/**
* Class storing configs for init spark.
*/
public class HoodieCliSparkConfig {
/**
* Configs to start spark application.
*/
public static final String CLI_SPARK_MASTER = "SPARK_MASTER";
public static final String CLI_SERIALIZER = "spark.serializer";
public static final String CLI_DRIVER_MAX_RESULT_SIZE = "spark.driver.maxResultSize";
public static final String CLI_EVENT_LOG_OVERWRITE = "spark.eventLog.overwrite";
public static final String CLI_EVENT_LOG_ENABLED = "spark.eventLog.enabled";
public static final String CLI_EXECUTOR_MEMORY = "spark.executor.memory";
/**
* Hadoop output config.
*/
public static final String CLI_MAPRED_OUTPUT_COMPRESS = "spark.hadoop.mapred.output.compress";
public static final String CLI_MAPRED_OUTPUT_COMPRESSION_CODEC = "spark.hadoop.mapred.output.compression.codec";
public static final String CLI_MAPRED_OUTPUT_COMPRESSION_TYPE = "spark.hadoop.mapred.output.compression.type";
/**
* Parquet file config.
*/
public static final String CLI_PARQUET_ENABLE_SUMMARY_METADATA = "parquet.enable.summary-metadata";
}

View File

@@ -139,7 +139,7 @@ public class CleansCommand implements CommandMarker {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String cmd = SparkMain.SparkCommand.CLEAN.toString();
sparkLauncher.addAppArgs(cmd, metaClient.getBasePath(), master, propsFilePath, sparkMemory);
sparkLauncher.addAppArgs(cmd, master, sparkMemory, metaClient.getBasePath(), propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);

View File

@@ -423,8 +423,8 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), client.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), master, sparkMemory, client.getBasePath(),
compactionInstant, outputPathStr, parallelism);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
@@ -484,8 +484,8 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), client.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(skipV).toString(),
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), master, sparkMemory, client.getBasePath(),
compactionInstant, outputPathStr, parallelism, Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -528,8 +528,8 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), client.getBasePath(),
fileId, outputPathStr, "1", master, sparkMemory, Boolean.valueOf(skipV).toString(),
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), master, sparkMemory, client.getBasePath(),
fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -574,8 +574,8 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), client.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(dryRun).toString());
sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), master, sparkMemory, client.getBasePath(),
compactionInstant, outputPathStr, parallelism, Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -62,7 +63,9 @@ public class SparkMain {
SparkCommand cmd = SparkCommand.valueOf(command);
JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + command);
JavaSparkContext jsc = sparkMasterContained(cmd)
? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2]))
: SparkUtil.initJavaSparkConf("hoodie-cli-" + command);
int returnCode = 0;
switch (cmd) {
case ROLLBACK:
@@ -118,38 +121,38 @@ public class SparkMain {
break;
case COMPACT_VALIDATE:
assert (args.length == 7);
doCompactValidate(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6]);
doCompactValidate(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]));
returnCode = 0;
break;
case COMPACT_REPAIR:
assert (args.length == 8);
doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
doCompactRepair(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]),
Boolean.parseBoolean(args[7]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_FILE:
assert (args.length == 9);
doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
doCompactUnscheduleFile(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]),
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_PLAN:
assert (args.length == 9);
doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
doCompactUnschedule(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]),
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case CLEAN:
assert (args.length >= 5);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[3])) {
propsFilePath = args[3];
if (!StringUtils.isNullOrEmpty(args[4])) {
propsFilePath = args[4];
}
configs = new ArrayList<>();
if (args.length > 5) {
configs.addAll(Arrays.asList(args).subList(5, args.length));
}
clean(jsc, args[1], args[2], propsFilePath, args[4], configs);
clean(jsc, args[3], propsFilePath, configs);
break;
default:
break;
@@ -157,14 +160,16 @@ public class SparkMain {
System.exit(returnCode);
}
private static void clean(JavaSparkContext jsc, String basePath, String sparkMaster, String propsFilePath,
String sparkMemory, List<String> configs) throws Exception {
private static boolean sparkMasterContained(SparkCommand command) {
List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN);
return masterContained.contains(command);
}
private static void clean(JavaSparkContext jsc, String basePath, String propsFilePath,
List<String> configs) {
HoodieCleaner.Config cfg = new HoodieCleaner.Config();
cfg.basePath = basePath;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
new HoodieCleaner(cfg, jsc).run();
@@ -172,7 +177,7 @@ public class SparkMain {
private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMemory,
int retry, String propsFilePath, List<String> configs) {
int retry, String propsFilePath, List<String> configs) {
Config cfg = new Config();
cfg.command = command;
cfg.srcPath = srcPath;
@@ -190,22 +195,18 @@ public class SparkMain {
}
private static void doCompactValidate(JavaSparkContext jsc, String basePath, String compactionInstant,
String outputPath, int parallelism, String sparkMaster, String sparkMemory) throws Exception {
String outputPath, int parallelism) throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.VALIDATE;
cfg.outputPath = outputPath;
cfg.compactionInstantTime = compactionInstant;
cfg.parallelism = parallelism;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactRepair(JavaSparkContext jsc, String basePath, String compactionInstant,
String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean dryRun) throws Exception {
String outputPath, int parallelism, boolean dryRun) throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.REPAIR;
@@ -213,16 +214,11 @@ public class SparkMain {
cfg.compactionInstantTime = compactionInstant;
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactUnschedule(JavaSparkContext jsc, String basePath, String compactionInstant,
String outputPath, int parallelism, String sparkMaster, String sparkMemory, boolean skipValidation,
boolean dryRun) throws Exception {
String outputPath, int parallelism, boolean skipValidation, boolean dryRun) throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.UNSCHEDULE_PLAN;
@@ -231,15 +227,11 @@ public class SparkMain {
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
cfg.skipValidation = skipValidation;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId, String outputPath,
int parallelism, String sparkMaster, String sparkMemory, boolean skipValidation, boolean dryRun)
int parallelism, boolean skipValidation, boolean dryRun)
throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
@@ -249,16 +241,12 @@ public class SparkMain {
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
cfg.skipValidation = skipValidation;
if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
jsc.getConf().setMaster(sparkMaster);
}
jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule, String propsFilePath,
List<String> configs) {
List<String> configs) {
HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath;
cfg.tableName = tableName;

View File

@@ -18,10 +18,12 @@
package org.apache.hudi.cli.utils;
import org.apache.hudi.cli.HoodieCliSparkConfig;
import org.apache.hudi.cli.commands.SparkEnvCommand;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.spark.SparkConf;
@@ -38,7 +40,7 @@ import java.util.Objects;
*/
public class SparkUtil {
public static final String DEFAULT_SPARK_MASTER = "yarn-client";
private static final String DEFAULT_SPARK_MASTER = "yarn-client";
/**
* TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro.
@@ -61,29 +63,37 @@ public class SparkUtil {
}
public static JavaSparkContext initJavaSparkConf(String name) {
return initJavaSparkConf(name, Option.empty(), Option.empty());
}
public static JavaSparkContext initJavaSparkConf(String name, Option<String> master,
Option<String> executorMemory) {
SparkConf sparkConf = new SparkConf().setAppName(name);
String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER");
if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) {
String defMaster = master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER));
if ((null == defMaster) || (defMaster.isEmpty())) {
sparkConf.setMaster(DEFAULT_SPARK_MASTER);
} else {
sparkConf.setMaster(defMasterFromEnv);
sparkConf.setMaster(defMaster);
}
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.eventLog.overwrite", "true");
sparkConf.set("spark.eventLog.enabled", "true");
sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER, "org.apache.spark.serializer.KryoSerializer");
sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true");
if (executorMemory.isPresent()) {
sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY, executorMemory.get());
}
// Configure hadoop conf
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "true");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE, "BLOCK");
HoodieWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", false);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
return jsc;
}