1
0

Adding support for custom scheduler configs with streaming sink (#4762)

This commit is contained in:
Sivabalan Narayanan
2022-02-08 04:14:10 -05:00
committed by GitHub
parent 1636876e8a
commit ab73047958
4 changed files with 38 additions and 6 deletions

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.async.AsyncCompactService;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
@@ -46,7 +47,6 @@ public class SchedulerConfGenerator {
public static final String COMPACT_POOL_NAME = AsyncCompactService.COMPACT_POOL_NAME;
public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR";
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";
private static final String SPARK_SCHEDULING_PATTERN =
"<?xml version=\"1.0\"?>\n<allocations>\n <pool name=\"%s\">\n"
@@ -85,7 +85,7 @@ public class SchedulerConfGenerator {
&& cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile);
additionalSparkConfigs.put(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY(), sparkSchedulingConfFile);
} else {
LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
+ "is not set to FAIR at instantiation time. Continuing without scheduling configs");

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieTableType;
import org.junit.jupiter.api.Test;
@@ -33,21 +34,21 @@ public class TestSchedulerConfGenerator {
public void testGenerateSparkSchedulingConf() throws Exception {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
Map<String, String> configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "spark.scheduler.mode not set");
assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "spark.scheduler.mode not set");
System.setProperty(SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY, "FAIR");
cfg.continuousMode = false;
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "continuousMode is false");
assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "continuousMode is false");
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY),
assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()),
"table type is not MERGE_ON_READ");
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
assertNotNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "all satisfies");
assertNotNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "all satisfies");
}
}