From 4fed8dd31996bdd3e16e4921237628588195653f Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 29 Mar 2022 18:27:45 -0700 Subject: [PATCH] [HUDI-3485] Adding scheduler pool configs for async clustering (#5043) --- .../hudi/async/AsyncClusteringService.java | 14 +++++++++++--- .../async/SparkAsyncClusteringService.java | 5 +++-- .../common/HoodieSparkEngineContext.java | 5 +++++ .../hudi/common/engine/EngineProperty.java | 1 + .../SparkStreamingAsyncClusteringService.java | 5 +++-- .../org/apache/hudi/HoodieStreamingSink.scala | 3 ++- .../scala/org/apache/hudi/SparkConfigs.scala | 11 ++++++++--- .../deltastreamer/HoodieDeltaStreamer.java | 18 +++++++++++++++--- .../HoodieMultiTableDeltaStreamer.java | 10 ++++++++++ .../deltastreamer/SchedulerConfGenerator.java | 19 ++++++++++++++----- 10 files changed, 72 insertions(+), 19 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java index 1c1cf2bb9..7fece5c88 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java @@ -21,6 +21,8 @@ package org.apache.hudi.async; import org.apache.hudi.client.BaseClusterer; import org.apache.hudi.client.BaseHoodieWriteClient; +import org.apache.hudi.common.engine.EngineProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; @@ -42,18 +44,21 @@ public abstract class AsyncClusteringService extends HoodieAsyncTableService { private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class); + public static final String CLUSTERING_POOL_NAME = "hoodiecluster"; private final int maxConcurrentClustering; private transient BaseClusterer clusteringClient; + protected transient HoodieEngineContext context; - public AsyncClusteringService(BaseHoodieWriteClient writeClient) { - this(writeClient, false); + public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) { + this(context, writeClient, false); } - public AsyncClusteringService(BaseHoodieWriteClient writeClient, boolean runInDaemonMode) { + public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient, boolean runInDaemonMode) { super(writeClient.getConfig(), runInDaemonMode); this.clusteringClient = createClusteringClient(writeClient); this.maxConcurrentClustering = 1; + this.context = context; } protected abstract BaseClusterer createClusteringClient(BaseHoodieWriteClient client); @@ -72,6 +77,9 @@ public abstract class AsyncClusteringService extends HoodieAsyncTableService { return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> { try { + // Set Compactor Pool Name for allowing users to prioritize compaction + LOG.info("Setting pool name for clustering to " + CLUSTERING_POOL_NAME); + context.setProperty(EngineProperty.CLUSTERING_POOL_NAME, CLUSTERING_POOL_NAME); while (!isShutdownRequested()) { final HoodieInstant instant = fetchNextAsyncServiceInstant(); if (null != instant) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java index 8f6535b11..dd2ac9193 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java @@ -22,14 +22,15 @@ package org.apache.hudi.async; import org.apache.hudi.client.BaseClusterer; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.HoodieSparkClusteringClient; +import org.apache.hudi.common.engine.HoodieEngineContext; /** * Async clustering service for Spark datasource. */ public class SparkAsyncClusteringService extends AsyncClusteringService { - public SparkAsyncClusteringService(BaseHoodieWriteClient writeClient) { - super(writeClient); + public SparkAsyncClusteringService(HoodieEngineContext engineContext, BaseHoodieWriteClient writeClient) { + super(engineContext, writeClient); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java index cc29ef70f..d8281d1a1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java @@ -36,6 +36,8 @@ import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.data.HoodieSparkLongAccumulator; import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.sql.SQLContext; @@ -53,6 +55,7 @@ import java.util.stream.Stream; */ public class HoodieSparkEngineContext extends HoodieEngineContext { + private static final Logger LOG = LogManager.getLogger(HoodieSparkEngineContext.class); private final JavaSparkContext javaSparkContext; private SQLContext sqlContext; @@ -158,6 +161,8 @@ public class HoodieSparkEngineContext extends HoodieEngineContext { public void setProperty(EngineProperty key, String value) { if (key == EngineProperty.COMPACTION_POOL_NAME) { javaSparkContext.setLocalProperty("spark.scheduler.pool", value); + } else if (key == EngineProperty.CLUSTERING_POOL_NAME) { + javaSparkContext.setLocalProperty("spark.scheduler.pool", value); } else { throw new HoodieException("Unknown engine property :" + key); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java index 5e9a516ec..36e759493 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java @@ -26,6 +26,7 @@ public enum EngineProperty { EMBEDDED_SERVER_HOST, // Pool/queue to use to run compaction. COMPACTION_POOL_NAME, + CLUSTERING_POOL_NAME, TOTAL_CORES_PER_EXECUTOR, // Amount of total memory available to each engine executor TOTAL_MEMORY_AVAILABLE, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java index f87e16a65..077b102a4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java @@ -22,6 +22,7 @@ package org.apache.hudi.async; import org.apache.hudi.client.BaseClusterer; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.client.HoodieSparkClusteringClient; +import org.apache.hudi.common.engine.HoodieEngineContext; /** * Async clustering service for Spark structured streaming. @@ -31,8 +32,8 @@ public class SparkStreamingAsyncClusteringService extends AsyncClusteringService private static final long serialVersionUID = 1L; - public SparkStreamingAsyncClusteringService(BaseHoodieWriteClient writeClient) { - super(writeClient, true); + public SparkStreamingAsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) { + super(context, writeClient, true); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 8d8ebfa7e..93580dee9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -205,7 +205,8 @@ class HoodieStreamingSink(sqlContext: SQLContext, protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = { if (null == asyncClusteringService) { log.info("Triggering async clustering!") - asyncClusteringService = new SparkStreamingAsyncClusteringService(client) + asyncClusteringService = new SparkStreamingAsyncClusteringService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)), + client) asyncClusteringService.start(new Function[java.lang.Boolean, java.lang.Boolean] { override def apply(errored: lang.Boolean): lang.Boolean = { log.info(s"Async clustering service shutdown. Errored ? $errored") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkConfigs.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkConfigs.scala index 75dee2108..73e1f8694 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkConfigs.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkConfigs.scala @@ -27,9 +27,9 @@ object SparkConfigs { /* When async compaction is enabled (deltastreamer or streaming sink), users might be interested to set custom - scheduling configs for regular writes and async compaction. This is the property used to set custom scheduler config - file with spark. In Deltastreamer, the file is generated within hudi and set if necessary. Where as in case of streaming - sink, users have to set this property when they invoke spark shell. + scheduling configs for regular writes and async table services like compaction and clustering. This is the property + used to set custom scheduler config file with spark. In Deltastreamer, the file is generated within hudi and set if + necessary. Where as in case of streaming sink, users have to set this property when they invoke spark shell. Sample format of the file contents. @@ -43,6 +43,11 @@ object SparkConfigs { 3 1 + + FAIR + 2 + 1 + */ val SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file" diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index c0c141db1..2e8323329 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -388,6 +388,14 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"}, description = "Retry last pending inline clustering plan before writing to sink.") public Boolean retryLastPendingInlineClusteringJob = false; + @Parameter(names = {"--cluster-scheduling-weight"}, description = "Scheduling weight for clustering as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer clusterSchedulingWeight = 1; + + @Parameter(names = {"--cluster-scheduling-minshare"}, description = "Minshare for clustering as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer clusterSchedulingMinShare = 0; + public boolean isAsyncCompactionEnabled() { return continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); @@ -431,8 +439,10 @@ public class HoodieDeltaStreamer implements Serializable { && Objects.equals(commitOnErrors, config.commitOnErrors) && Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight) && Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight) + && Objects.equals(clusterSchedulingWeight, config.clusterSchedulingWeight) && Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare) && Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare) + && Objects.equals(clusterSchedulingMinShare, config.clusterSchedulingMinShare) && Objects.equals(forceDisableCompaction, config.forceDisableCompaction) && Objects.equals(checkpoint, config.checkpoint) && Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider) @@ -447,8 +457,8 @@ public class HoodieDeltaStreamer implements Serializable { transformerClassNames, sourceLimit, operation, filterDupes, enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors, - deltaSyncSchedulingWeight, compactSchedulingWeight, deltaSyncSchedulingMinShare, - compactSchedulingMinShare, forceDisableCompaction, checkpoint, + deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare, + compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint, initialCheckpointProvider, help); } @@ -478,8 +488,10 @@ public class HoodieDeltaStreamer implements Serializable { + ", commitOnErrors=" + commitOnErrors + ", deltaSyncSchedulingWeight=" + deltaSyncSchedulingWeight + ", compactSchedulingWeight=" + compactSchedulingWeight + + ", clusterSchedulingWeight=" + clusterSchedulingWeight + ", deltaSyncSchedulingMinShare=" + deltaSyncSchedulingMinShare + ", compactSchedulingMinShare=" + compactSchedulingMinShare + + ", clusterSchedulingMinShare=" + clusterSchedulingMinShare + ", forceDisableCompaction=" + forceDisableCompaction + ", checkpoint='" + checkpoint + '\'' + ", initialCheckpointProvider='" + initialCheckpointProvider + '\'' @@ -762,7 +774,7 @@ public class HoodieDeltaStreamer implements Serializable { if (asyncClusteringService.isPresent()) { asyncClusteringService.get().updateWriteClient(writeClient); } else { - asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(writeClient)); + asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(new HoodieSparkEngineContext(jssc), writeClient)); HoodieTableMetaClient meta = HoodieTableMetaClient.builder() .setConf(new Configuration(jssc.hadoopConfiguration())) .setBasePath(cfg.targetBasePath) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 3c72eb58a..376c9cfae 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -224,6 +224,8 @@ public class HoodieMultiTableDeltaStreamer { tableConfig.compactSchedulingWeight = globalConfig.compactSchedulingWeight; tableConfig.deltaSyncSchedulingMinShare = globalConfig.deltaSyncSchedulingMinShare; tableConfig.deltaSyncSchedulingWeight = globalConfig.deltaSyncSchedulingWeight; + tableConfig.clusterSchedulingWeight = globalConfig.clusterSchedulingWeight; + tableConfig.clusterSchedulingMinShare = globalConfig.clusterSchedulingMinShare; tableConfig.sparkMaster = globalConfig.sparkMaster; } } @@ -377,6 +379,14 @@ public class HoodieMultiTableDeltaStreamer { @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.") public String checkpoint = null; + @Parameter(names = {"--cluster-scheduling-weight"}, description = "Scheduling weight for clustering as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer clusterSchedulingWeight = 1; + + @Parameter(names = {"--cluster-scheduling-minshare"}, description = "Minshare for clustering as defined in " + + "https://spark.apache.org/docs/latest/job-scheduling.html") + public Integer clusterSchedulingMinShare = 0; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java index b991f9d46..61dc4da9b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java @@ -35,6 +35,8 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import static org.apache.hudi.async.AsyncClusteringService.CLUSTERING_POOL_NAME; + /** * Utility Class to generate Spark Scheduling allocation file. This kicks in only when user sets * spark.scheduler.mode=FAIR at spark-submit time @@ -61,13 +63,16 @@ public class SchedulerConfGenerator { * @param compactionWeight Scheduling weight for compaction * @param deltaSyncMinShare Minshare for delta sync * @param compactionMinShare Minshare for compaction + * @param clusteringMinShare Scheduling weight for clustering + * @param clusteringWeight Minshare for clustering * @return Spark scheduling configs */ private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare, - Integer compactionMinShare) { + Integer compactionMinShare, Integer clusteringWeight, Integer clusteringMinShare) { return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE, deltaSyncWeight.toString(), deltaSyncMinShare.toString(), COMPACT_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE, - compactionWeight.toString(), compactionMinShare.toString()); + compactionWeight.toString(), compactionMinShare.toString(), CLUSTERING_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE, + clusteringWeight.toString(), clusteringMinShare.toString()); } /** @@ -84,7 +89,9 @@ public class SchedulerConfGenerator { if (sparkSchedulerMode.isPresent() && SPARK_SCHEDULER_FAIR_MODE.equals(sparkSchedulerMode.get()) && cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight, - cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare); + cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare, + cfg.clusterSchedulingWeight, cfg.clusterSchedulingMinShare); + LOG.warn("Spark scheduling config file " + sparkSchedulingConfFile); additionalSparkConfigs.put(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY(), sparkSchedulingConfFile); } else { LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode " @@ -100,14 +107,16 @@ public class SchedulerConfGenerator { * @param compactionWeight Scheduling weight for compaction * @param deltaSyncMinShare Minshare for delta sync * @param compactionMinShare Minshare for compaction + * @param clusteringMinShare Scheduling weight for clustering + * @param clusteringWeight Minshare for clustering * @return Return the absolute path of the tmp file which stores the spark schedule configs * @throws IOException Throws an IOException when write configs to file failed */ private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight, - Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException { + Integer deltaSyncMinShare, Integer compactionMinShare, Integer clusteringWeight, Integer clusteringMinShare) throws IOException { File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml"); BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile)); - bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare)); + bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare, clusteringWeight, clusteringMinShare)); bw.close(); LOG.info("Configs written to file" + tempConfigFile.getAbsolutePath()); return tempConfigFile.getAbsolutePath();