[MINOR] Add javadoc to SchedulerConfGenerator and code clean (#1340)
This commit is contained in:
@@ -44,6 +44,7 @@ public class SchedulerConfGenerator {
|
|||||||
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
|
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
|
||||||
public static final String COMPACT_POOL_NAME = "hoodiecompact";
|
public static final String COMPACT_POOL_NAME = "hoodiecompact";
|
||||||
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
|
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
|
||||||
|
public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR";
|
||||||
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";
|
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";
|
||||||
|
|
||||||
private static final String SPARK_SCHEDULING_PATTERN =
|
private static final String SPARK_SCHEDULING_PATTERN =
|
||||||
@@ -52,36 +53,55 @@ public class SchedulerConfGenerator {
|
|||||||
+ " </pool>\n <pool name=\"%s\">\n <schedulingMode>%s</schedulingMode>\n"
|
+ " </pool>\n <pool name=\"%s\">\n <schedulingMode>%s</schedulingMode>\n"
|
||||||
+ " <weight>%s</weight>\n <minShare>%s</minShare>\n </pool>\n</allocations>";
|
+ " <weight>%s</weight>\n <minShare>%s</minShare>\n </pool>\n</allocations>";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper to generate spark scheduling configs in XML format with input params.
|
||||||
|
*
|
||||||
|
* @param deltaSyncWeight Scheduling weight for delta sync
|
||||||
|
* @param compactionWeight Scheduling weight for compaction
|
||||||
|
* @param deltaSyncMinShare Minshare for delta sync
|
||||||
|
* @param compactionMinShare Minshare for compaction
|
||||||
|
* @return Spark scheduling configs
|
||||||
|
*/
|
||||||
private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare,
|
private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare,
|
||||||
Integer compactionMinShare) {
|
Integer compactionMinShare) {
|
||||||
return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, "FAIR", deltaSyncWeight.toString(),
|
return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
|
||||||
deltaSyncMinShare.toString(), COMPACT_POOL_NAME, "FAIR", compactionWeight.toString(),
|
deltaSyncWeight.toString(), deltaSyncMinShare.toString(), COMPACT_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
|
||||||
compactionMinShare.toString());
|
compactionWeight.toString(), compactionMinShare.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper to set Spark Scheduling Configs dynamically.
|
* Helper to set Spark Scheduling Configs dynamically.
|
||||||
*
|
*
|
||||||
* @param cfg Config
|
* @param cfg Config for HoodieDeltaStreamer
|
||||||
*/
|
*/
|
||||||
public static Map<String, String> getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception {
|
public static Map<String, String> getSparkSchedulingConfigs(HoodieDeltaStreamer.Config cfg) throws Exception {
|
||||||
scala.Option<String> scheduleModeKeyOption = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY);
|
scala.Option<String> scheduleModeKeyOption = new SparkConf().getOption(SPARK_SCHEDULER_MODE_KEY);
|
||||||
final Option<String> sparkSchedulerMode =
|
final Option<String> sparkSchedulerMode =
|
||||||
scheduleModeKeyOption.isDefined() ? Option.of(scheduleModeKeyOption.get()) : Option.empty();
|
scheduleModeKeyOption.isDefined() ? Option.of(scheduleModeKeyOption.get()) : Option.empty();
|
||||||
|
|
||||||
Map<String, String> additionalSparkConfigs = new HashMap<>();
|
Map<String, String> additionalSparkConfigs = new HashMap<>(1);
|
||||||
if (sparkSchedulerMode.isPresent() && "FAIR".equals(sparkSchedulerMode.get()) && cfg.continuousMode
|
if (sparkSchedulerMode.isPresent() && SPARK_SCHEDULER_FAIR_MODE.equals(sparkSchedulerMode.get())
|
||||||
&& cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
|
&& cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
|
||||||
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
|
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
|
||||||
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
|
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
|
||||||
additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile);
|
additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
|
LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
|
||||||
+ "is not set to FAIR at instatiation time. Continuing without scheduling configs");
|
+ "is not set to FAIR at instantiation time. Continuing without scheduling configs");
|
||||||
}
|
}
|
||||||
return additionalSparkConfigs;
|
return additionalSparkConfigs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate spark scheduling configs and store it to a randomly generated tmp file.
|
||||||
|
*
|
||||||
|
* @param deltaSyncWeight Scheduling weight for delta sync
|
||||||
|
* @param compactionWeight Scheduling weight for compaction
|
||||||
|
* @param deltaSyncMinShare Minshare for delta sync
|
||||||
|
* @param compactionMinShare Minshare for compaction
|
||||||
|
* @return Return the absolute path of the tmp file which stores the spark schedule configs
|
||||||
|
* @throws IOException Throws an IOException when write configs to file failed
|
||||||
|
*/
|
||||||
private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight,
|
private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight,
|
||||||
Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException {
|
Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException {
|
||||||
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");
|
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");
|
||||||
|
|||||||
Reference in New Issue
Block a user