diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 9811db536..e02d96ee8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -478,6 +478,32 @@ object DataSourceWriteOptions { + "Use this when you are in the process of migrating from " + "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format") + // spark data source write pool name. Incase of streaming sink, users might be interested to set custom scheduling configs + // for regular writes and async compaction. In such cases, this pool name will be used for spark datasource writes. + val SPARK_DATASOURCE_WRITER_POOL_NAME = "sparkdatasourcewrite" + + /* + When async compaction is enabled (deltastreamer or streaming sink), users might be interested to set custom + scheduling configs for regular writes and async compaction. This is the property used to set custom scheduler config + file with spark. In Deltastreamer, the file is generated within hudi and set if necessary. Where as in case of streaming + sink, users have to set this property when they invoke spark shell. + Sample format of the file contents. + + + + FAIR + 4 + 2 + + + FAIR + 3 + 1 + + + */ + val SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file" + /** @deprecated Use {@link HIVE_SYNC_MODE} instead of this config from 0.9.0 */ @Deprecated val HIVE_USE_JDBC: ConfigProperty[String] = ConfigProperty diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5dfefe1ac..62002adf5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -117,6 +117,11 @@ object HoodieSparkSqlWriter { } val jsc = new JavaSparkContext(sparkContext) + if (asyncCompactionTriggerFn.isDefined) { + if (jsc.getConf.getOption(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY).isDefined) { + jsc.setLocalProperty("spark.scheduler.pool", DataSourceWriteOptions.SPARK_DATASOURCE_WRITER_POOL_NAME) + } + } val instantTime = HoodieActiveTimeline.createNewInstantTime() val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps)) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java index f5f1f3847..ed288673a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.deltastreamer; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.async.AsyncCompactService; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; @@ -46,7 +47,6 @@ public class SchedulerConfGenerator { public static final String COMPACT_POOL_NAME = AsyncCompactService.COMPACT_POOL_NAME; public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode"; public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR"; - public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file"; private static final String SPARK_SCHEDULING_PATTERN = "\n\n \n" @@ -85,7 +85,7 @@ public class SchedulerConfGenerator { && cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); - additionalSparkConfigs.put(SPARK_SCHEDULER_ALLOCATION_FILE_KEY, sparkSchedulingConfFile); + additionalSparkConfigs.put(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY(), sparkSchedulingConfFile); } else { LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " + "is not set to FAIR at instantiation time. Continuing without scheduling configs"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSchedulerConfGenerator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSchedulerConfGenerator.java index 77fd04fb4..034284599 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSchedulerConfGenerator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSchedulerConfGenerator.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.deltastreamer; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.model.HoodieTableType; import org.junit.jupiter.api.Test; @@ -33,21 +34,21 @@ public class TestSchedulerConfGenerator { public void testGenerateSparkSchedulingConf() throws Exception { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); Map configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "spark.scheduler.mode not set"); + assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "spark.scheduler.mode not set"); System.setProperty(SchedulerConfGenerator.SPARK_SCHEDULER_MODE_KEY, "FAIR"); cfg.continuousMode = false; configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "continuousMode is false"); + assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "continuousMode is false"); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - assertNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), + assertNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "table type is not MERGE_ON_READ"); cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); configs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg); - assertNotNull(configs.get(SchedulerConfGenerator.SPARK_SCHEDULER_ALLOCATION_FILE_KEY), "all satisfies"); + assertNotNull(configs.get(DataSourceWriteOptions.SPARK_SCHEDULER_ALLOCATION_FILE_KEY()), "all satisfies"); } }