diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 3342bffd7..61594143e 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -44,7 +44,6 @@ import com.uber.hoodie.utilities.sources.JsonDFSSource; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -257,21 +256,6 @@ public class HoodieDeltaStreamer implements Serializable { public Boolean help = false; } - /** - * Helper to set Spark Scheduling Configs dynamically - * - * @param cfg Config - */ - public static Map getSparkSchedulingConfigs(Config cfg) throws Exception { - Map additionalSparkConfigs = new HashMap<>(); - if (cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) { - String sparkSchedulingConfFile = SchedulerConfGenerator.generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, - cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); - additionalSparkConfigs.put("spark.scheduler.allocation.file", sparkSchedulingConfFile); - } - return additionalSparkConfigs; - } - public static void main(String[] args) throws Exception { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, args); @@ -280,18 +264,12 @@ public class HoodieDeltaStreamer implements Serializable { System.exit(1); } - Map additionalSparkConfigs = getSparkSchedulingConfigs(cfg); + Map additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs); - if (!("FAIR".equals(jssc.getConf().get("spark.scheduler.mode"))) - && cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) { - log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " - + "is not set to FAIR at instatiation time. Continuing without scheduling configs"); - } new HoodieDeltaStreamer(cfg, jssc).sync(); } - /** * Syncs data either in single-run or in continuous mode. */ diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java index fdc35e408..592e1de82 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java @@ -17,6 +17,7 @@ package com.uber.hoodie.utilities.deltastreamer; +import com.uber.hoodie.common.model.HoodieTableType; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; @@ -27,6 +28,8 @@ import java.util.UUID; import org.apache.commons.lang.text.StrSubstitutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import scala.Option; /** * Utility Class to generate Spark Scheduling allocation file. This kicks in only when user @@ -38,6 +41,8 @@ public class SchedulerConfGenerator { public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync"; public static final String COMPACT_POOL_NAME = "hoodiecompact"; + public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode"; + public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file"; private static final String DELTASYNC_POOL_KEY = "deltasync_pool"; @@ -82,8 +87,32 @@ public class SchedulerConfGenerator { return xmlString; } - public static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight, - Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException { + + /** + * Helper to set Spark Scheduling Configs dynamically + * + * @param cfg Config + */ + public static Map getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception { + final Option sparkSchedulerMode = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY); + + Map additionalSparkConfigs = new HashMap<>(); + if (sparkSchedulerMode.isDefined() && "FAIR".equals(sparkSchedulerMode.get()) + && cfg.continuousMode && cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) { + String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, + cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); + additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile); + } else { + log.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " + + "is not set to FAIR at instatiation time. Continuing without scheduling configs"); + } + return additionalSparkConfigs; + } + + private static String generateAndStoreConfig(Integer deltaSyncWeight, + Integer compactionWeight, + Integer deltaSyncMinShare, + Integer compactionMinShare) throws IOException { File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml"); BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile)); bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare)); diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/SchedulerConfGeneratorTest.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/SchedulerConfGeneratorTest.java new file mode 100644 index 000000000..2e88d897e --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/SchedulerConfGeneratorTest.java @@ -0,0 +1,36 @@ +package com.uber.hoodie.utilities; + +import static com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY; +import static com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer; +import com.uber.hoodie.utilities.deltastreamer.SchedulerConfGenerator; +import java.util.Map; +import org.junit.Test; + +public class SchedulerConfGeneratorTest { + + @Test + public void testGenerateSparkSchedulingConf() throws Exception { + HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); + Map configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); + assertNull("spark.scheduler.mode not set", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY)); + + System.setProperty(SPARK_SCHEDULER_MODE_KEY, "FAIR"); + cfg.continuousMode = false; + configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); + assertNull("continuousMode is false", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY)); + + cfg.continuousMode = true; + cfg.storageType = HoodieTableType.COPY_ON_WRITE.name(); + configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); + assertNull("storageType is not MERGE_ON_READ", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY)); + + cfg.storageType = HoodieTableType.MERGE_ON_READ.name(); + configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); + assertNotNull("all satisfies", configs.get(SPARK_SCHEDULER_ALLOCATION_FILE_KEY)); + } +}