Throttling to limit QPS from HbaseIndex
This commit is contained in:
committed by
vinoth chandar
parent
3746ace76a
commit
e624480259
@@ -28,12 +28,14 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.config.HoodieIndexConfig;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
|
||||
import com.uber.hoodie.exception.HoodieIndexException;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
@@ -47,10 +49,12 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
@@ -61,19 +65,44 @@ import org.apache.spark.api.java.function.Function2;
|
||||
*/
|
||||
public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
|
||||
public static final String DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME =
|
||||
"spark.executor.instances";
|
||||
public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME =
|
||||
"spark.dynamicAllocation.enabled";
|
||||
public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME =
|
||||
"spark.dynamicAllocation.maxExecutors";
|
||||
|
||||
private static final byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s");
|
||||
private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
|
||||
private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
|
||||
private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
|
||||
private static final int SLEEP_TIME_MILLISECONDS = 100;
|
||||
|
||||
private static Logger logger = LogManager.getLogger(HBaseIndex.class);
|
||||
private static Connection hbaseConnection = null;
|
||||
private float qpsFraction;
|
||||
private int maxQpsPerRegionServer;
|
||||
/**
|
||||
* multiPutBatchSize will be computed and re-set in updateLocation if
|
||||
* {@link HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true
|
||||
*/
|
||||
private Integer multiPutBatchSize;
|
||||
private Integer numRegionServersForTable;
|
||||
private final String tableName;
|
||||
private HbasePutBatchSizeCalculator putBatchSizeCalculator;
|
||||
|
||||
public HBaseIndex(HoodieWriteConfig config) {
|
||||
super(config);
|
||||
this.tableName = config.getHbaseTableName();
|
||||
addShutDownHook();
|
||||
init(config);
|
||||
}
|
||||
|
||||
private void init(HoodieWriteConfig config) {
|
||||
multiPutBatchSize = config.getHbaseIndexGetBatchSize();
|
||||
qpsFraction = config.getHbaseIndexQPSFraction();
|
||||
maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
|
||||
putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -162,7 +191,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
// iterator till we reach batch size
|
||||
if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
|
||||
// get results for batch from Hbase
|
||||
Result[] results = hTable.get(statements);
|
||||
Result[] results = doGet(hTable, statements);
|
||||
// clear statements to be GC'd
|
||||
statements.clear();
|
||||
for (Result result : results) {
|
||||
@@ -211,6 +240,11 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
};
|
||||
}
|
||||
|
||||
private Result[] doGet(HTable hTable, List<Get> keys) throws IOException {
|
||||
sleepForTime(SLEEP_TIME_MILLISECONDS);
|
||||
return hTable.get(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
|
||||
HoodieTable<T> hoodieTable) {
|
||||
@@ -218,9 +252,8 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
}
|
||||
|
||||
private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateLocationFunction() {
|
||||
return (Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>>) (partition,
|
||||
statusIterator) -> {
|
||||
Integer multiPutBatchSize = config.getHbaseIndexPutBatchSize();
|
||||
|
||||
return (Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>>) (partition, statusIterator) -> {
|
||||
|
||||
List<WriteStatus> writeStatusList = new ArrayList<>();
|
||||
// Grab the global HBase connection
|
||||
@@ -303,14 +336,131 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
hTable.flushCommits();
|
||||
puts.clear();
|
||||
deletes.clear();
|
||||
sleepForTime(SLEEP_TIME_MILLISECONDS);
|
||||
}
|
||||
|
||||
private static void sleepForTime(int sleepTimeMs) {
|
||||
try {
|
||||
Thread.sleep(sleepTimeMs);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Sleep interrupted during throttling", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
|
||||
HoodieTable<T> hoodieTable) {
|
||||
setPutBatchSize(writeStatusRDD, jsc);
|
||||
return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
|
||||
}
|
||||
|
||||
private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD,
|
||||
final JavaSparkContext jsc) {
|
||||
if (config.getHbaseIndexPutBatchSizeAutoCompute()) {
|
||||
SparkConf conf = jsc.getConf();
|
||||
int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1);
|
||||
if (conf.getBoolean(DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME, false)) {
|
||||
maxExecutors = Math.max(maxExecutors, conf.getInt(
|
||||
DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME, 1));
|
||||
}
|
||||
|
||||
/*
|
||||
Each writeStatus represents status information from a write done in one of the IOHandles.
|
||||
If a writeStatus has any insert, it implies that the corresponding task contacts HBase for
|
||||
doing puts, since we only do puts for inserts from HBaseIndex.
|
||||
*/
|
||||
int hbasePutAccessParallelism = getHBasePutAccessParallelism(writeStatusRDD);
|
||||
multiPutBatchSize = putBatchSizeCalculator
|
||||
.getBatchSize(
|
||||
getNumRegionServersAliveForTable(),
|
||||
maxQpsPerRegionServer,
|
||||
hbasePutAccessParallelism,
|
||||
maxExecutors,
|
||||
SLEEP_TIME_MILLISECONDS,
|
||||
qpsFraction);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) {
|
||||
return Math.toIntExact(Math.max(writeStatusRDD
|
||||
.filter(w -> w.getStat().getNumInserts() > 0).count(), 1));
|
||||
}
|
||||
|
||||
public static class HbasePutBatchSizeCalculator implements Serializable {
|
||||
|
||||
private static final int MILLI_SECONDS_IN_A_SECOND = 1000;
|
||||
private static Logger logger = LogManager.getLogger(HbasePutBatchSizeCalculator.class);
|
||||
|
||||
/**
|
||||
* Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed
|
||||
* maxQpsPerRegionServer for each Region Server. Multiplying qpsFraction to reduce the aggregate load on common RS
|
||||
* across topics. Assumption here is that all tables have regions across all RS, which is not necessarily true for
|
||||
* smaller tables. So, they end up getting a smaller share of QPS than they deserve, but it might be ok.
|
||||
* <p>
|
||||
* Example: int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f)
|
||||
* </p>
|
||||
* <p>
|
||||
* Expected batchSize is 8 because in that case, total request sent to a Region Server in one second is:
|
||||
*
|
||||
* 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) =>
|
||||
* 16000. We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 requests which
|
||||
* happens to be 10% of 16667 (maxQPSPerRegionServer), as expected.
|
||||
* </p>
|
||||
* <p> Assumptions made here <li> In a batch, writes get evenly distributed to each RS for that
|
||||
* table. Since we do writes only in the case of inserts and not updates, for this assumption to fail, inserts would
|
||||
* have to be skewed towards few RS, likelihood of which is less if Hbase table is pre-split and rowKeys are UUIDs
|
||||
* (random strings). If this assumption fails, then it is possible for some RS to receive more than
|
||||
* maxQpsPerRegionServer QPS, but for simplicity, we are going ahead with this model, since this is meant to be a
|
||||
* lightweight distributed throttling mechanism without maintaining a global context. So if this assumption breaks,
|
||||
* we are hoping the HBase Master relocates hot-spot regions to new Region Servers.
|
||||
*
|
||||
* </li> <li> For Region Server stability, throttling at a second level granularity is fine.
|
||||
* Although, within a second, the sum of queries might be within maxQpsPerRegionServer, there could be peaks at some
|
||||
* sub second intervals. So, the assumption is that these peaks are tolerated by the Region Server (which at max can
|
||||
* be maxQpsPerRegionServer). </li> </p>
|
||||
*/
|
||||
public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer,
|
||||
int numTasksDuringPut, int maxExecutors, int sleepTimeMs, float qpsFraction) {
|
||||
int numRSAlive = numRegionServersForTable;
|
||||
int maxReqPerSec = (int) (qpsFraction * numRSAlive * maxQpsPerRegionServer);
|
||||
int numTasks = numTasksDuringPut;
|
||||
int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors));
|
||||
int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs;
|
||||
int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec));
|
||||
logger.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
|
||||
logger.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
|
||||
logger.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
|
||||
logger.info("HbaseIndexThrottling: numTasks :" + numTasks);
|
||||
logger.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
|
||||
logger.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts);
|
||||
logger.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec);
|
||||
logger.info("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable);
|
||||
logger.info("HbaseIndexThrottling: multiPutBatchSize :" + multiPutBatchSize);
|
||||
return multiPutBatchSize;
|
||||
}
|
||||
}
|
||||
|
||||
private Integer getNumRegionServersAliveForTable() {
|
||||
// This is being called in the driver, so there is only one connection
|
||||
// from the driver, so ok to use a local connection variable.
|
||||
if (numRegionServersForTable == null) {
|
||||
try (Connection conn = getHBaseConnection()) {
|
||||
RegionLocator regionLocator = conn
|
||||
.getRegionLocator(TableName.valueOf(tableName));
|
||||
numRegionServersForTable = Math.toIntExact(
|
||||
regionLocator.getAllRegionLocations().stream().map(e -> e.getServerName()).distinct()
|
||||
.count());
|
||||
return numRegionServersForTable;
|
||||
} catch (IOException e) {
|
||||
logger.error(e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return numRegionServersForTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rollbackCommit(String commitTime) {
|
||||
// Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
|
||||
|
||||
Reference in New Issue
Block a user