1
0

[HUDI-3485] Adding scheduler pool configs for async clustering (#5043)

This commit is contained in:
Sivabalan Narayanan
2022-03-29 18:27:45 -07:00
committed by GitHub
parent 5c1b482a1b
commit 4fed8dd319
10 changed files with 72 additions and 19 deletions

View File

@@ -21,6 +21,8 @@ package org.apache.hudi.async;
import org.apache.hudi.client.BaseClusterer;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
@@ -42,18 +44,21 @@ public abstract class AsyncClusteringService extends HoodieAsyncTableService {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
public static final String CLUSTERING_POOL_NAME = "hoodiecluster";
private final int maxConcurrentClustering;
private transient BaseClusterer clusteringClient;
protected transient HoodieEngineContext context;
public AsyncClusteringService(BaseHoodieWriteClient writeClient) {
this(writeClient, false);
public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) {
this(context, writeClient, false);
}
public AsyncClusteringService(BaseHoodieWriteClient writeClient, boolean runInDaemonMode) {
public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient, boolean runInDaemonMode) {
super(writeClient.getConfig(), runInDaemonMode);
this.clusteringClient = createClusteringClient(writeClient);
this.maxConcurrentClustering = 1;
this.context = context;
}
protected abstract BaseClusterer createClusteringClient(BaseHoodieWriteClient client);
@@ -72,6 +77,9 @@ public abstract class AsyncClusteringService extends HoodieAsyncTableService {
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
LOG.info("Setting pool name for clustering to " + CLUSTERING_POOL_NAME);
context.setProperty(EngineProperty.CLUSTERING_POOL_NAME, CLUSTERING_POOL_NAME);
while (!isShutdownRequested()) {
final HoodieInstant instant = fetchNextAsyncServiceInstant();
if (null != instant) {

View File

@@ -22,14 +22,15 @@ package org.apache.hudi.async;
import org.apache.hudi.client.BaseClusterer;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieSparkClusteringClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
/**
* Async clustering service for Spark datasource.
*/
public class SparkAsyncClusteringService extends AsyncClusteringService {
public SparkAsyncClusteringService(BaseHoodieWriteClient writeClient) {
super(writeClient);
public SparkAsyncClusteringService(HoodieEngineContext engineContext, BaseHoodieWriteClient writeClient) {
super(engineContext, writeClient);
}
@Override

View File

@@ -36,6 +36,8 @@ import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.data.HoodieSparkLongAccumulator;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SQLContext;
@@ -53,6 +55,7 @@ import java.util.stream.Stream;
*/
public class HoodieSparkEngineContext extends HoodieEngineContext {
private static final Logger LOG = LogManager.getLogger(HoodieSparkEngineContext.class);
private final JavaSparkContext javaSparkContext;
private SQLContext sqlContext;
@@ -158,6 +161,8 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
public void setProperty(EngineProperty key, String value) {
if (key == EngineProperty.COMPACTION_POOL_NAME) {
javaSparkContext.setLocalProperty("spark.scheduler.pool", value);
} else if (key == EngineProperty.CLUSTERING_POOL_NAME) {
javaSparkContext.setLocalProperty("spark.scheduler.pool", value);
} else {
throw new HoodieException("Unknown engine property :" + key);
}

View File

@@ -26,6 +26,7 @@ public enum EngineProperty {
EMBEDDED_SERVER_HOST,
// Pool/queue to use to run compaction.
COMPACTION_POOL_NAME,
CLUSTERING_POOL_NAME,
TOTAL_CORES_PER_EXECUTOR,
// Amount of total memory available to each engine executor
TOTAL_MEMORY_AVAILABLE,

View File

@@ -22,6 +22,7 @@ package org.apache.hudi.async;
import org.apache.hudi.client.BaseClusterer;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieSparkClusteringClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
/**
* Async clustering service for Spark structured streaming.
@@ -31,8 +32,8 @@ public class SparkStreamingAsyncClusteringService extends AsyncClusteringService
private static final long serialVersionUID = 1L;
public SparkStreamingAsyncClusteringService(BaseHoodieWriteClient writeClient) {
super(writeClient, true);
public SparkStreamingAsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) {
super(context, writeClient, true);
}
@Override

View File

@@ -205,7 +205,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
if (null == asyncClusteringService) {
log.info("Triggering async clustering!")
asyncClusteringService = new SparkStreamingAsyncClusteringService(client)
asyncClusteringService = new SparkStreamingAsyncClusteringService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
client)
asyncClusteringService.start(new Function[java.lang.Boolean, java.lang.Boolean] {
override def apply(errored: lang.Boolean): lang.Boolean = {
log.info(s"Async clustering service shutdown. Errored ? $errored")

View File

@@ -27,9 +27,9 @@ object SparkConfigs {
/*
When async compaction is enabled (deltastreamer or streaming sink), users might be interested to set custom
scheduling configs for regular writes and async compaction. This is the property used to set custom scheduler config
file with spark. In Deltastreamer, the file is generated within hudi and set if necessary. Where as in case of streaming
sink, users have to set this property when they invoke spark shell.
scheduling configs for regular writes and async table services like compaction and clustering. This is the property
used to set custom scheduler config file with spark. In Deltastreamer, the file is generated within hudi and set if
necessary. Where as in case of streaming sink, users have to set this property when they invoke spark shell.
Sample format of the file contents.
<?xml version="1.0"?>
<allocations>
@@ -43,6 +43,11 @@ object SparkConfigs {
<weight>3</weight>
<minShare>1</minShare>
</pool>
<pool name="hoodiecluster">
<schedulingMode>FAIR</schedulingMode>
<weight>2</weight>
<minShare>1</minShare>
</pool>
</allocations>
*/
val SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file"

View File

@@ -388,6 +388,14 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"}, description = "Retry last pending inline clustering plan before writing to sink.")
public Boolean retryLastPendingInlineClusteringJob = false;
@Parameter(names = {"--cluster-scheduling-weight"}, description = "Scheduling weight for clustering as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer clusterSchedulingWeight = 1;
@Parameter(names = {"--cluster-scheduling-minshare"}, description = "Minshare for clustering as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer clusterSchedulingMinShare = 0;
public boolean isAsyncCompactionEnabled() {
return continuousMode && !forceDisableCompaction
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
@@ -431,8 +439,10 @@ public class HoodieDeltaStreamer implements Serializable {
&& Objects.equals(commitOnErrors, config.commitOnErrors)
&& Objects.equals(deltaSyncSchedulingWeight, config.deltaSyncSchedulingWeight)
&& Objects.equals(compactSchedulingWeight, config.compactSchedulingWeight)
&& Objects.equals(clusterSchedulingWeight, config.clusterSchedulingWeight)
&& Objects.equals(deltaSyncSchedulingMinShare, config.deltaSyncSchedulingMinShare)
&& Objects.equals(compactSchedulingMinShare, config.compactSchedulingMinShare)
&& Objects.equals(clusterSchedulingMinShare, config.clusterSchedulingMinShare)
&& Objects.equals(forceDisableCompaction, config.forceDisableCompaction)
&& Objects.equals(checkpoint, config.checkpoint)
&& Objects.equals(initialCheckpointProvider, config.initialCheckpointProvider)
@@ -447,8 +457,8 @@ public class HoodieDeltaStreamer implements Serializable {
transformerClassNames, sourceLimit, operation, filterDupes,
enableHiveSync, maxPendingCompactions, maxPendingClustering, continuousMode,
minSyncIntervalSeconds, sparkMaster, commitOnErrors,
deltaSyncSchedulingWeight, compactSchedulingWeight, deltaSyncSchedulingMinShare,
compactSchedulingMinShare, forceDisableCompaction, checkpoint,
deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
initialCheckpointProvider, help);
}
@@ -478,8 +488,10 @@ public class HoodieDeltaStreamer implements Serializable {
+ ", commitOnErrors=" + commitOnErrors
+ ", deltaSyncSchedulingWeight=" + deltaSyncSchedulingWeight
+ ", compactSchedulingWeight=" + compactSchedulingWeight
+ ", clusterSchedulingWeight=" + clusterSchedulingWeight
+ ", deltaSyncSchedulingMinShare=" + deltaSyncSchedulingMinShare
+ ", compactSchedulingMinShare=" + compactSchedulingMinShare
+ ", clusterSchedulingMinShare=" + clusterSchedulingMinShare
+ ", forceDisableCompaction=" + forceDisableCompaction
+ ", checkpoint='" + checkpoint + '\''
+ ", initialCheckpointProvider='" + initialCheckpointProvider + '\''
@@ -762,7 +774,7 @@ public class HoodieDeltaStreamer implements Serializable {
if (asyncClusteringService.isPresent()) {
asyncClusteringService.get().updateWriteClient(writeClient);
} else {
asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(writeClient));
asyncClusteringService = Option.ofNullable(new SparkAsyncClusteringService(new HoodieSparkEngineContext(jssc), writeClient));
HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
.setConf(new Configuration(jssc.hadoopConfiguration()))
.setBasePath(cfg.targetBasePath)

View File

@@ -224,6 +224,8 @@ public class HoodieMultiTableDeltaStreamer {
tableConfig.compactSchedulingWeight = globalConfig.compactSchedulingWeight;
tableConfig.deltaSyncSchedulingMinShare = globalConfig.deltaSyncSchedulingMinShare;
tableConfig.deltaSyncSchedulingWeight = globalConfig.deltaSyncSchedulingWeight;
tableConfig.clusterSchedulingWeight = globalConfig.clusterSchedulingWeight;
tableConfig.clusterSchedulingMinShare = globalConfig.clusterSchedulingMinShare;
tableConfig.sparkMaster = globalConfig.sparkMaster;
}
}
@@ -377,6 +379,14 @@ public class HoodieMultiTableDeltaStreamer {
@Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer from this checkpoint.")
public String checkpoint = null;
@Parameter(names = {"--cluster-scheduling-weight"}, description = "Scheduling weight for clustering as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer clusterSchedulingWeight = 1;
@Parameter(names = {"--cluster-scheduling-minshare"}, description = "Minshare for clustering as defined in "
+ "https://spark.apache.org/docs/latest/job-scheduling.html")
public Integer clusterSchedulingMinShare = 0;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}

View File

@@ -35,6 +35,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static org.apache.hudi.async.AsyncClusteringService.CLUSTERING_POOL_NAME;
/**
* Utility Class to generate Spark Scheduling allocation file. This kicks in only when user sets
* spark.scheduler.mode=FAIR at spark-submit time
@@ -61,13 +63,16 @@ public class SchedulerConfGenerator {
* @param compactionWeight Scheduling weight for compaction
* @param deltaSyncMinShare Minshare for delta sync
* @param compactionMinShare Minshare for compaction
* @param clusteringMinShare Scheduling weight for clustering
* @param clusteringWeight Minshare for clustering
* @return Spark scheduling configs
*/
private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare,
Integer compactionMinShare) {
Integer compactionMinShare, Integer clusteringWeight, Integer clusteringMinShare) {
return String.format(SPARK_SCHEDULING_PATTERN, DELTASYNC_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
deltaSyncWeight.toString(), deltaSyncMinShare.toString(), COMPACT_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
compactionWeight.toString(), compactionMinShare.toString());
compactionWeight.toString(), compactionMinShare.toString(), CLUSTERING_POOL_NAME, SPARK_SCHEDULER_FAIR_MODE,
clusteringWeight.toString(), clusteringMinShare.toString());
}
/**
@@ -84,7 +89,9 @@ public class SchedulerConfGenerator {
if (sparkSchedulerMode.isPresent() && SPARK_SCHEDULER_FAIR_MODE.equals(sparkSchedulerMode.get())
&& cfg.continuousMode && cfg.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
String sparkSchedulingConfFile = generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare);
cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, cfg.compactSchedulingMinShare,
cfg.clusterSchedulingWeight, cfg.clusterSchedulingMinShare);
LOG.warn("Spark scheduling config file " + sparkSchedulingConfFile);
additionalSparkConfigs.put(SparkConfigs.SPARK_SCHEDULER_ALLOCATION_FILE_KEY(), sparkSchedulingConfFile);
} else {
LOG.warn("Job Scheduling Configs will not be in effect as spark.scheduler.mode "
@@ -100,14 +107,16 @@ public class SchedulerConfGenerator {
* @param compactionWeight Scheduling weight for compaction
* @param deltaSyncMinShare Minshare for delta sync
* @param compactionMinShare Minshare for compaction
* @param clusteringMinShare Scheduling weight for clustering
* @param clusteringWeight Minshare for clustering
* @return Return the absolute path of the tmp file which stores the spark schedule configs
* @throws IOException Throws an IOException when write configs to file failed
*/
private static String generateAndStoreConfig(Integer deltaSyncWeight, Integer compactionWeight,
Integer deltaSyncMinShare, Integer compactionMinShare) throws IOException {
Integer deltaSyncMinShare, Integer compactionMinShare, Integer clusteringWeight, Integer clusteringMinShare) throws IOException {
File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), ".xml");
BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare));
bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare, clusteringWeight, clusteringMinShare));
bw.close();
LOG.info("Configs written to file" + tempConfigFile.getAbsolutePath());
return tempConfigFile.getAbsolutePath();