From e48e35385a91cf6d03a0a91d71aabfe78fe82928 Mon Sep 17 00:00:00 2001 From: Ho Tien Vu Date: Tue, 25 Jun 2019 00:42:31 +0800 Subject: [PATCH] Added preemptive check for 'spark.scheduler.mode' When running docker demo, NoSuchElementException was thrown because spark.scheduler.mode is not set. Also we want to check before initializing the Spark Context to avoid polute the SparkConf with unused config. --- .../deltastreamer/HoodieDeltaStreamer.java | 24 +------------ .../deltastreamer/SchedulerConfGenerator.java | 33 +++++++++++++++-- .../utilities/SchedulerConfGeneratorTest.java | 36 +++++++++++++++++++ 3 files changed, 68 insertions(+), 25 deletions(-) create mode 100644 hoodie-utilities/src/test/java/com/uber/hoodie/utilities/SchedulerConfGeneratorTest.java diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 3342bffd7..61594143e 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -44,7 +44,6 @@ import com.uber.hoodie.utilities.sources.JsonDFSSource; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -257,21 +256,6 @@ public class HoodieDeltaStreamer implements Serializable { public Boolean help = false; } - /** - * Helper to set Spark Scheduling Configs dynamically - * - * @param cfg Config - */ - public static Map getSparkSchedulingConfigs(Config cfg) throws Exception { - Map additionalSparkConfigs = new HashMap<>(); - if (cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) { - String sparkSchedulingConfFile = SchedulerConfGenerator.generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, - cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); - additionalSparkConfigs.put("spark.scheduler.allocation.file", sparkSchedulingConfFile); - } - return additionalSparkConfigs; - } - public static void main(String[] args) throws Exception { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, args); @@ -280,18 +264,12 @@ public class HoodieDeltaStreamer implements Serializable { System.exit(1); } - Map additionalSparkConfigs = getSparkSchedulingConfigs(cfg); + Map additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); - if (!("FAIR".equals(jssc.getConf().get("spark.scheduler.mode"))) - && cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) { - log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " - + "is not set to FAIR at instatiation time. Continuing without scheduling configs"); - } new HoodieDeltaStreamer(cfg, jssc).sync(); } - /** * Syncs data either in single-run or in continuous mode. */ diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java index fdc35e408..592e1de82 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java @@ -17,6 +17,7 @@ package com.uber.hoodie.utilities.deltastreamer; +import com.uber.hoodie.common.model.HoodieTableType; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; @@ -27,6 +28,8 @@ import java.util.UUID; import org.apache.commons.lang.text.StrSubstitutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import scala.Option; /** * Utility Class to generate Spark Scheduling allocation file. This kicks in only when user @@ -38,6 +41,8 @@ public class SchedulerConfGenerator { public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync"; public static final String COMPACT_POOL_NAME = "hoodiecompact"; + public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode"; + public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file"; private static final String DELTASYNC_POOL_KEY = "deltasync_pool"; @@ -82,8 +87,32 @@ public class SchedulerConfGenerator { return xmlString; } - public static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight, - Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException { + + /** + * Helper to set Spark Scheduling Configs dynamically + * + * @param cfg Config + */ + public static Map getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception { + final Option sparkSchedulerMode = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY); + + Map additionalSparkConfigs = new HashMap<>(); + if (sparkSchedulerMode.isDefined() && "FAIR".equals(sparkSchedulerMode.get()) + && cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) { + String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, + cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); + additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile); + } else { + log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " + + "is not set to FAIR at instatiation time. Continuing without scheduling configs"); + } + return additionalSparkConfigs; + } + + private static String generateAndStoreConfig(Integer deltaSyncWeight, + Integer compactionWeight, + Integer deltaSyncMinShare, + Integer compactionMinShare) throws IOException { File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml"); BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile)); bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare)); diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/SchedulerConfGeneratorTest.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/SchedulerConfGeneratorTest.java new file mode 100644 index 000000000..2e88d897e --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/SchedulerConfGeneratorTest.java @@ -0,0 +1,36 @@ +package com.uber.hoodie.utilities; + +import static com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY; +import static com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer; +import com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator; +import java.util.Map; +import org.junit.Test; + +public class SchedulerConfGeneratorTest { + + @Test + public void testGenerateSparkSchedulingConf() throws Exception { + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + Map configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); + assertNull("spark.scheduler.mode not set", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY)); + + System.setProperty(SPARK_SCHEDULER_MODE_KEY, "FAIR"); + cfg.continuousMode = false; + configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); + assertNull("continuousMode is false", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY)); + + cfg.continuousMode = true; + cfg.storageType = HoodieTableType.COPY_ON_WRITE.name(); + configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); + assertNull("storageType is not MERGE_ON_READ", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY)); + + cfg.storageType = HoodieTableType.MERGE_ON_READ.name(); + configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); + assertNotNull("all satisfies", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY)); + } +}