1
0

Added preemptive check for 'spark.scheduler.mode'

When running docker demo, NoSuchElementException was thrown because spark.scheduler.mode is not set.
Also we want to check before initializing the Spark Context to avoid polute the SparkConf
with unused config.
This commit is contained in:
Ho Tien Vu
2019-06-25 00:42:31 +08:00
committed by Balaji Varadarajan
parent 17e878f721
commit e48e35385a
3 changed files with 68 additions and 25 deletions

View File

@@ -44,7 +44,6 @@ import com.uber.hoodie.utilities.sources.JsonDFSSource;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -257,21 +256,6 @@ public class HoodieDeltaStreamer implements Serializable {
public Boolean help = false;
}
/**
* Helper to set Spark Scheduling Configs dynamically
*
* @param cfg Config
*/
public static Map<String, String> getSparkSchedulingConfigs(Config cfg) throws Exception {
Map<String, String> additionalSparkConfigs = new HashMap<>();
if (cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = SchedulerConfGenerator.generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
additionalSparkConfigs.put("spark.scheduler.allocation.file", sparkSchedulingConfFile);
}
return additionalSparkConfigs;
}
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, args);
@@ -280,18 +264,12 @@ public class HoodieDeltaStreamer implements Serializable {
System.exit(1);
}
Map<String, String> additionalSparkConfigs = getSparkSchedulingConfigs(cfg);
Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName,
cfg.sparkMaster, additionalSparkConfigs);
if (!("FAIR".equals(jssc.getConf().get("spark.scheduler.mode")))
&& cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
+ "is not set to FAIR at instatiation time. Continuing without scheduling configs");
}
new HoodieDeltaStreamer(cfg, jssc).sync();
}
/**
* Syncs data either in single-run or in continuous mode.
*/

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.utilities.deltastreamer;
import com.uber.hoodie.common.model.HoodieTableType;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
@@ -27,6 +28,8 @@ import java.util.UUID;
import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import scala.Option;
/**
* Utility Class to generate Spark Scheduling allocation file. This kicks in only when user
@@ -38,6 +41,8 @@ public class SchedulerConfGenerator {
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
public static final String COMPACT_POOL_NAME = "hoodiecompact";
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";
private static final String DELTASYNC_POOL_KEY = "deltasync_pool";
@@ -82,8 +87,32 @@ public class SchedulerConfGenerator {
return xmlString;
}
public static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight,
Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException {
/**
* Helper to set Spark Scheduling Configs dynamically
*
* @param cfg Config
*/
public static Map<String, String> getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception {
final Option<String> sparkSchedulerMode = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY);
Map<String, String> additionalSparkConfigs = new HashMap<>();
if (sparkSchedulerMode.isDefined() && "FAIR".equals(sparkSchedulerMode.get())
&& cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile);
} else {
log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
+ "is not set to FAIR at instatiation time. Continuing without scheduling configs");
}
return additionalSparkConfigs;
}
private static String generateAndStoreConfig(Integer deltaSyncWeight,
Integer compactionWeight,
Integer deltaSyncMinShare,
Integer compactionMinShare) throws IOException {
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");
BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare));

View File

@@ -0,0 +1,36 @@
package com.uber.hoodie.utilities;
import static com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY;
import static com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
import com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator;
import java.util.Map;
import org.junit.Test;
public class SchedulerConfGeneratorTest {
@Test
public void testGenerateSparkSchedulingConf() throws Exception {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
Map<String, String> configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull("spark.scheduler.mode not set", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
System.setProperty(SPARK_SCHEDULER_MODE_KEY, "FAIR");
cfg.continuousMode = false;
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull("continuousMode is false", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
cfg.continuousMode = true;
cfg.storageType = HoodieTableType.COPY_ON_WRITE.name();
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull("storageType is not MERGE_ON_READ", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
cfg.storageType = HoodieTableType.MERGE_ON_READ.name();
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNotNull("all satisfies", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY));
}
}