From 8c6138cb01997e0ef35c396b2048cf956b53d28f Mon Sep 17 00:00:00 2001
From: Mathieu <49835526+wangxianghu@users.noreply.github.com>
Date: Wed, 19 Feb 2020 03:15:02 +0800
Subject: [PATCH] [MINOR] Add javadoc to SchedulerConfGenerator and code clean
(#1340)
---
.../deltastreamer/SchedulerConfGenerator.java | 36 ++++++++++++++-----
1 file changed, 28 insertions(+), 8 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
index e98b86763..54fcf689a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
@@ -44,6 +44,7 @@ public class SchedulerConfGenerator {
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
public static final String COMPACT_POOL_NAME = "hoodiecompact";
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
+ public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR";
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";
private static final String SPARK_SCHEDULING_PATTERN =
@@ -52,36 +53,55 @@ public class SchedulerConfGenerator {
+ " \n \n %s\n"
+ " %s\n %s\n \n";
+ /**
+ * Helper to generate spark scheduling configs in XML format with input params.
+ *
+ * @param deltaSyncWeight Scheduling weight for delta sync
+ * @param compactionWeight Scheduling weight for compaction
+ * @param deltaSyncMinShare Minshare for delta sync
+ * @param compactionMinShare Minshare for compaction
+ * @return Spark scheduling configs
+ */
private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare,
Integer compactionMinShare) {
- return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, "FAIR", deltaSyncWeight.toString(),
- deltaSyncMinShare.toString(), COMPACT_POOL_NAME, "FAIR", compactionWeight.toString(),
- compactionMinShare.toString());
+ return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
+ deltaSyncWeight.toString(), deltaSyncMinShare.toString(), COMPACT_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
+ compactionWeight.toString(), compactionMinShare.toString());
}
/**
* Helper to set Spark Scheduling Configs dynamically.
*
- * @param cfg Config
+ * @param cfg Config for HoodieDeltaStreamer
*/
public static Map getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception {
scala.Option scheduleModeKeyOption = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY);
final Option sparkSchedulerMode =
scheduleModeKeyOption.isDefined() ? Option.of(scheduleModeKeyOption.get()) : Option.empty();
- Map additionalSparkConfigs = new HashMap<>();
- if (sparkSchedulerMode.isPresent() && "FAIR".equals(sparkSchedulerMode.get()) && cfg.continuousMode
- && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ Map additionalSparkConfigs = new HashMap<>(1);
+ if (sparkSchedulerMode.isPresent() && SPARK_SCHEDULER_FAIR_MODE.equals(sparkSchedulerMode.get())
+ && cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile);
} else {
LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
- + "is not set to FAIR at instatiation time. Continuing without scheduling configs");
+ + "is not set to FAIR at instantiation time. Continuing without scheduling configs");
}
return additionalSparkConfigs;
}
+ /**
+ * Generate spark scheduling configs and store it to a randomly generated tmp file.
+ *
+ * @param deltaSyncWeight Scheduling weight for delta sync
+ * @param compactionWeight Scheduling weight for compaction
+ * @param deltaSyncMinShare Minshare for delta sync
+ * @param compactionMinShare Minshare for compaction
+ * @return Return the absolute path of the tmp file which stores the spark schedule configs
+ * @throws IOException Throws an IOException when write configs to file failed
+ */
private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight,
Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException {
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");