From e6244802598a44db17a70babdcbe02cc89a271a2 Mon Sep 17 00:00:00 2001
From: Kaushik Devarajaiah
Date: Thu, 19 Jul 2018 13:46:33 -0700
Subject: [PATCH] Throttling to limit QPS from HbaseIndex
---
.../hoodie/config/HoodieHBaseIndexConfig.java | 175 ++++++++++++++++++
.../uber/hoodie/config/HoodieIndexConfig.java | 41 +---
.../uber/hoodie/config/HoodieWriteConfig.java | 32 +++-
.../uber/hoodie/index/hbase/HBaseIndex.java | 158 +++++++++++++++-
.../com/uber/hoodie/index/TestHbaseIndex.java | 76 +++++++-
.../uber/hoodie/index/TestHoodieIndex.java | 4 +-
6 files changed, 436 insertions(+), 50 deletions(-)
create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
new file mode 100644
index 000000000..48e8031c1
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.config;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
+
+ public static final String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
+ public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
+ public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
+ public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
+ /**
+ * Note that if HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP is set to true, this batch size will not
+ * be honored for HBase Puts
+ */
+ public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
+ /**
+ * Property to set to enable auto computation of put batch size
+ */
+ public static final String HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP = "hoodie.index.hbase.put.batch.size.autocompute";
+ public static final String DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE = "false";
+ /**
+ * Property to set the fraction of the global share of QPS that should be allocated to this job.
+ * Let's say there are 3 jobs which have input size in terms of number of rows required for
+ * HbaseIndexing as x, 2x, 3x respectively. Then this fraction for the jobs would be (0.17) 1/6,
+ * 0.33 (2/6) and 0.5 (3/6) respectively.
+ */
+ public static final String HBASE_QPS_FRACTION_PROP = "hoodie.index.hbase.qps.fraction";
+ /**
+ * Property to set maximum QPS allowed per Region Server. This should be same across various
+ * jobs. This is intended to limit the aggregate QPS generated across various jobs to an Hbase
+ * Region Server. It is recommended to set this value based on global indexing throughput needs
+ * and most importantly, how much the HBase installation in use is able to tolerate without
+ * Region Servers going down.
+ */
+ public static String HBASE_MAX_QPS_PER_REGION_SERVER_PROP = "hoodie.index.hbase.max.qps.per.region.server";
+ /**
+ * Default batch size, used only for Get, but computed for Put
+ */
+ public static final int DEFAULT_HBASE_BATCH_SIZE = 100;
+ /**
+ * A low default value.
+ */
+ public static final int DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER = 1000;
+ /**
+ * Default is 50%, which means a total of 2 jobs can run using HbaseIndex without overwhelming
+ * Region Servers
+ */
+ public static final float DEFAULT_HBASE_QPS_FRACTION = 0.5f;
+
+ public HoodieHBaseIndexConfig(final Properties props) {
+ super(props);
+ }
+
+ public static HoodieHBaseIndexConfig.Builder newBuilder() {
+ return new HoodieHBaseIndexConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public HoodieHBaseIndexConfig.Builder fromFile(File propertiesFile) throws IOException {
+ FileReader reader = new FileReader(propertiesFile);
+ try {
+ this.props.load(reader);
+ return this;
+ } finally {
+ reader.close();
+ }
+ }
+
+ public HoodieHBaseIndexConfig.Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public HoodieHBaseIndexConfig.Builder hbaseZkQuorum(String zkString) {
+ props.setProperty(HBASE_ZKQUORUM_PROP, zkString);
+ return this;
+ }
+
+ public HoodieHBaseIndexConfig.Builder hbaseZkPort(int port) {
+ props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port));
+ return this;
+ }
+
+ public HoodieHBaseIndexConfig.Builder hbaseTableName(String tableName) {
+ props.setProperty(HBASE_TABLENAME_PROP, tableName);
+ return this;
+ }
+
+ public HoodieHBaseIndexConfig.Builder hbaseIndexGetBatchSize(int getBatchSize) {
+ props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
+ return this;
+ }
+
+ public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSize(int putBatchSize) {
+ props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
+ return this;
+ }
+
+ public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSizeAutoCompute(
+ boolean putBatchSizeAutoCompute) {
+ props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP,
+ String.valueOf(putBatchSizeAutoCompute));
+ return this;
+ }
+
+ public HoodieHBaseIndexConfig.Builder hbaseIndexQPSFraction(float qpsFraction) {
+ props.setProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP,
+ String.valueOf(qpsFraction));
+ return this;
+ }
+
+ /**
+ *
+ * Method to set maximum QPS allowed per Region Server. This should be same across various
+ * jobs. This is intended to limit the aggregate QPS generated across various jobs to an
+ * Hbase Region Server.
+ *
+ *
+ * It is recommended to set this value based on your global indexing throughput needs and
+ * most importantly, how much your HBase installation is able to tolerate without Region
+ * Servers going down.
+ *
+ */
+ public HoodieHBaseIndexConfig.Builder hbaseIndexMaxQPSPerRegionServer(
+ int maxQPSPerRegionServer) {
+ // This should be same across various jobs
+ props.setProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP,
+ String.valueOf(maxQPSPerRegionServer));
+ return this;
+ }
+
+ public HoodieHBaseIndexConfig build() {
+ HoodieHBaseIndexConfig config = new HoodieHBaseIndexConfig(props);
+ setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP),
+ HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
+ setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
+ HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
+ setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP),
+ HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP,
+ String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE));
+ setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP),
+ HBASE_QPS_FRACTION_PROP, String.valueOf(DEFAULT_HBASE_QPS_FRACTION));
+ setDefaultOnCondition(props,
+ !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP),
+ HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf(
+ DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER));
+ return config;
+ }
+
+ }
+}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
index 5a24cc12f..f34750973 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
@@ -49,14 +49,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
"hoodie.bloom.index.input.storage" + ".level";
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
- // ***** HBase Index Configs *****
- public static final String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
- public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
- public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
- public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
- public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
- public static final String DEFAULT_HBASE_BATCH_SIZE = "100";
-
// ***** Bucketed Index Configs *****
public static final String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets";
@@ -92,6 +84,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig hBaseIndexConfig) {
+ props.putAll(hBaseIndexConfig.getProps());
+ return this;
+ }
+
public Builder bloomFilterNumEntries(int numEntries) {
props.setProperty(BLOOM_FILTER_NUM_ENTRIES, String.valueOf(numEntries));
return this;
@@ -102,21 +99,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
- public Builder hbaseZkQuorum(String zkString) {
- props.setProperty(HBASE_ZKQUORUM_PROP, zkString);
- return this;
- }
-
- public Builder hbaseZkPort(int port) {
- props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port));
- return this;
- }
-
- public Builder hbaseTableName(String tableName) {
- props.setProperty(HBASE_TABLENAME_PROP, tableName);
- return this;
- }
-
public Builder bloomIndexParallelism(int parallelism) {
props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
return this;
@@ -137,15 +119,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
- public Builder hbaseIndexGetBatchSize(int getBatchSize) {
- props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
- return this;
- }
-
- public Builder hbaseIndexPutBatchSize(int putBatchSize) {
- props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
- return this;
- }
public Builder withBloomIndexInputStorageLevel(String level) {
props.setProperty(BLOOM_INDEX_INPUT_STORAGE_LEVEL, level);
@@ -166,10 +139,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
BLOOM_INDEX_PRUNE_BY_RANGES_PROP, DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_USE_CACHING_PROP),
BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING);
- setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP),
- HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
- setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
- HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL),
BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index 6c1e394df..2349860b1 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -258,23 +258,45 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
public String getHbaseZkQuorum() {
- return props.getProperty(HoodieIndexConfig.HBASE_ZKQUORUM_PROP);
+ return props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKQUORUM_PROP);
}
public int getHbaseZkPort() {
- return Integer.parseInt(props.getProperty(HoodieIndexConfig.HBASE_ZKPORT_PROP));
+ return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKPORT_PROP));
}
public String getHbaseTableName() {
- return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP);
+ return props.getProperty(HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP);
}
public int getHbaseIndexGetBatchSize() {
- return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
+ return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
}
public int getHbaseIndexPutBatchSize() {
- return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
+ return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
+ }
+
+ public Boolean getHbaseIndexPutBatchSizeAutoCompute() {
+ return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP));
+ }
+
+ /**
+ * Fraction of the global share of QPS that should be allocated to this job.
+ * Let's say there are 3 jobs which have input size in terms of number of rows
+ * required for HbaseIndexing as x, 2x, 3x respectively. Then this fraction for
+ * the jobs would be (0.17) 1/6, 0.33 (2/6) and 0.5 (3/6) respectively.
+ */
+ public float getHbaseIndexQPSFraction() {
+ return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP));
+ }
+
+ /**
+ * This should be same across various jobs. This is intended to limit the aggregate
+ * QPS generated across various Hoodie jobs to an Hbase Region Server
+ */
+ public int getHbaseIndexMaxQPSPerRegionServer() {
+ return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP));
}
public int getBloomIndexParallelism() {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
index e929b81d0..c70685882 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
@@ -28,12 +28,14 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@@ -47,10 +49,12 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -61,19 +65,44 @@ import org.apache.spark.api.java.function.Function2;
*/
public class HBaseIndex extends HoodieIndex {
+ public static final String DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME =
+ "spark.executor.instances";
+ public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME =
+ "spark.dynamicAllocation.enabled";
+ public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME =
+ "spark.dynamicAllocation.maxExecutors";
+
private static final byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s");
private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path");
+ private static final int SLEEP_TIME_MILLISECONDS = 100;
private static Logger logger = LogManager.getLogger(HBaseIndex.class);
private static Connection hbaseConnection = null;
+ private float qpsFraction;
+ private int maxQpsPerRegionServer;
+ /**
+ * multiPutBatchSize will be computed and re-set in updateLocation if
+ * {@link HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true
+ */
+ private Integer multiPutBatchSize;
+ private Integer numRegionServersForTable;
private final String tableName;
+ private HbasePutBatchSizeCalculator putBatchSizeCalculator;
public HBaseIndex(HoodieWriteConfig config) {
super(config);
this.tableName = config.getHbaseTableName();
addShutDownHook();
+ init(config);
+ }
+
+ private void init(HoodieWriteConfig config) {
+ multiPutBatchSize = config.getHbaseIndexGetBatchSize();
+ qpsFraction = config.getHbaseIndexQPSFraction();
+ maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
+ putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
}
@Override
@@ -162,7 +191,7 @@ public class HBaseIndex extends HoodieIndex {
// iterator till we reach batch size
if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
// get results for batch from Hbase
- Result[] results = hTable.get(statements);
+ Result[] results = doGet(hTable, statements);
// clear statements to be GC'd
statements.clear();
for (Result result : results) {
@@ -211,6 +240,11 @@ public class HBaseIndex extends HoodieIndex {
};
}
+ private Result[] doGet(HTable hTable, List keys) throws IOException {
+ sleepForTime(SLEEP_TIME_MILLISECONDS);
+ return hTable.get(keys);
+ }
+
@Override
public JavaRDD> tagLocation(JavaRDD> recordRDD, JavaSparkContext jsc,
HoodieTable hoodieTable) {
@@ -218,9 +252,8 @@ public class HBaseIndex extends HoodieIndex {
}
private Function2, Iterator> updateLocationFunction() {
- return (Function2, Iterator>) (partition,
- statusIterator) -> {
- Integer multiPutBatchSize = config.getHbaseIndexPutBatchSize();
+
+ return (Function2, Iterator>) (partition, statusIterator) -> {
List writeStatusList = new ArrayList<>();
// Grab the global HBase connection
@@ -303,14 +336,131 @@ public class HBaseIndex extends HoodieIndex {
hTable.flushCommits();
puts.clear();
deletes.clear();
+ sleepForTime(SLEEP_TIME_MILLISECONDS);
+ }
+
+ private static void sleepForTime(int sleepTimeMs) {
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException e) {
+ logger.error("Sleep interrupted during throttling", e);
+ throw new RuntimeException(e);
+ }
}
@Override
public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc,
HoodieTable hoodieTable) {
+ setPutBatchSize(writeStatusRDD, jsc);
return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
}
+ private void setPutBatchSize(JavaRDD writeStatusRDD,
+ final JavaSparkContext jsc) {
+ if (config.getHbaseIndexPutBatchSizeAutoCompute()) {
+ SparkConf conf = jsc.getConf();
+ int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1);
+ if (conf.getBoolean(DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME, false)) {
+ maxExecutors = Math.max(maxExecutors, conf.getInt(
+ DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME, 1));
+ }
+
+ /*
+ Each writeStatus represents status information from a write done in one of the IOHandles.
+ If a writeStatus has any insert, it implies that the corresponding task contacts HBase for
+ doing puts, since we only do puts for inserts from HBaseIndex.
+ */
+ int hbasePutAccessParallelism = getHBasePutAccessParallelism(writeStatusRDD);
+ multiPutBatchSize = putBatchSizeCalculator
+ .getBatchSize(
+ getNumRegionServersAliveForTable(),
+ maxQpsPerRegionServer,
+ hbasePutAccessParallelism,
+ maxExecutors,
+ SLEEP_TIME_MILLISECONDS,
+ qpsFraction);
+ }
+ }
+
+ @VisibleForTesting
+ public int getHBasePutAccessParallelism(final JavaRDD writeStatusRDD) {
+ return Math.toIntExact(Math.max(writeStatusRDD
+ .filter(w -> w.getStat().getNumInserts() > 0).count(), 1));
+ }
+
+ public static class HbasePutBatchSizeCalculator implements Serializable {
+
+ private static final int MILLI_SECONDS_IN_A_SECOND = 1000;
+ private static Logger logger = LogManager.getLogger(HbasePutBatchSizeCalculator.class);
+
+ /**
+ * Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed
+ * maxQpsPerRegionServer for each Region Server. Multiplying qpsFraction to reduce the aggregate load on common RS
+ * across topics. Assumption here is that all tables have regions across all RS, which is not necessarily true for
+ * smaller tables. So, they end up getting a smaller share of QPS than they deserve, but it might be ok.
+ *
+ * Example: int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f)
+ *
+ *
+ * Expected batchSize is 8 because in that case, total request sent to a Region Server in one second is:
+ *
+ * 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) =>
+ * 16000. We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 requests which
+ * happens to be 10% of 16667 (maxQPSPerRegionServer), as expected.
+ *
+ * Assumptions made here
In a batch, writes get evenly distributed to each RS for that
+ * table. Since we do writes only in the case of inserts and not updates, for this assumption to fail, inserts would
+ * have to be skewed towards few RS, likelihood of which is less if Hbase table is pre-split and rowKeys are UUIDs
+ * (random strings). If this assumption fails, then it is possible for some RS to receive more than
+ * maxQpsPerRegionServer QPS, but for simplicity, we are going ahead with this model, since this is meant to be a
+ * lightweight distributed throttling mechanism without maintaining a global context. So if this assumption breaks,
+ * we are hoping the HBase Master relocates hot-spot regions to new Region Servers.
+ *
+ * For Region Server stability, throttling at a second level granularity is fine.
+ * Although, within a second, the sum of queries might be within maxQpsPerRegionServer, there could be peaks at some
+ * sub second intervals. So, the assumption is that these peaks are tolerated by the Region Server (which at max can
+ * be maxQpsPerRegionServer).
+ */
+ public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer,
+ int numTasksDuringPut, int maxExecutors, int sleepTimeMs, float qpsFraction) {
+ int numRSAlive = numRegionServersForTable;
+ int maxReqPerSec = (int) (qpsFraction * numRSAlive * maxQpsPerRegionServer);
+ int numTasks = numTasksDuringPut;
+ int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors));
+ int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs;
+ int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec));
+ logger.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
+ logger.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
+ logger.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
+ logger.info("HbaseIndexThrottling: numTasks :" + numTasks);
+ logger.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
+ logger.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts);
+ logger.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec);
+ logger.info("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable);
+ logger.info("HbaseIndexThrottling: multiPutBatchSize :" + multiPutBatchSize);
+ return multiPutBatchSize;
+ }
+ }
+
+ private Integer getNumRegionServersAliveForTable() {
+ // This is being called in the driver, so there is only one connection
+ // from the driver, so ok to use a local connection variable.
+ if (numRegionServersForTable == null) {
+ try (Connection conn = getHBaseConnection()) {
+ RegionLocator regionLocator = conn
+ .getRegionLocator(TableName.valueOf(tableName));
+ numRegionServersForTable = Math.toIntExact(
+ regionLocator.getAllRegionLocations().stream().map(e -> e.getServerName()).distinct()
+ .count());
+ return numRegionServersForTable;
+ } catch (IOException e) {
+ logger.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+ return numRegionServersForTable;
+ }
+
@Override
public boolean rollbackCommit(String commitTime) {
// Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
index 6f681564b..bde2bf6ad 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
@@ -27,14 +27,18 @@ import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.config.HoodieCompactionConfig;
+import com.uber.hoodie.config.HoodieHBaseIndexConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.hbase.HBaseIndex;
+import com.uber.hoodie.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator;
import com.uber.hoodie.table.HoodieTable;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -51,6 +55,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
@@ -281,6 +286,66 @@ public class TestHbaseIndex {
Mockito.verify(table, atMost(numberOfDataFileIds)).put((List) anyObject());
}
+ @Test
+ public void testPutBatchSizeCalculation() {
+ HbasePutBatchSizeCalculator batchSizeCalculator = new HbasePutBatchSizeCalculator();
+
+ // All asserts cases below are derived out of the first
+ // example below, with change in one parameter at a time.
+
+ int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
+ // Expected batchSize is 8 because in that case, total request sent in one second is below
+ // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
+ // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
+ // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
+ Assert.assertEquals(putBatchSize, 8);
+
+ // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
+ int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
+ Assert.assertEquals(putBatchSize2, 4);
+
+ // If the parallelism is halved, batchSize has to double
+ int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
+ Assert.assertEquals(putBatchSize3, 16);
+
+ // If the parallelism is halved, batchSize has to double.
+ // This time parallelism is driven by numTasks rather than numExecutors
+ int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
+ Assert.assertEquals(putBatchSize4, 16);
+
+ // If sleepTimeMs is halved, batchSize has to halve
+ int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
+ Assert.assertEquals(putBatchSize5, 4);
+
+ // If maxQPSPerRegionServer is doubled, batchSize also doubles
+ int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
+ Assert.assertEquals(putBatchSize6, 16);
+ }
+
+ @Test
+ public void testsHBasePutAccessParallelism() {
+ HoodieWriteConfig config = getConfig();
+ HBaseIndex index = new HBaseIndex(config);
+ final JavaRDD writeStatusRDD = jsc.parallelize(
+ Arrays.asList(
+ getSampleWriteStatus(1, 2),
+ getSampleWriteStatus(0, 3),
+ getSampleWriteStatus(10, 0)),
+ 10);
+ final int hbasePutAccessParallelism = index.getHBasePutAccessParallelism(writeStatusRDD);
+ Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
+ Assert.assertEquals(2, hbasePutAccessParallelism);
+ }
+
+ private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {
+ final WriteStatus writeStatus = new WriteStatus();
+ HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
+ hoodieWriteStat.setNumInserts(numInserts);
+ hoodieWriteStat.setNumUpdateWrites(numUpdateWrites);
+ writeStatus.setStat(hoodieWriteStat);
+ return writeStatus;
+ }
+
private void assertNoWriteErrors(List statuses) {
// Verify there are no errors
for (WriteStatus status : statuses) {
@@ -300,8 +365,11 @@ public class TestHbaseIndex {
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.forTable("test-trip-table").withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
- .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
- .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
- .hbaseIndexGetBatchSize(100).hbaseIndexPutBatchSize(100).build());
+ .withHBaseIndexConfig(
+ new HoodieHBaseIndexConfig.Builder()
+ .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
+ .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
+ .hbaseIndexGetBatchSize(100).build())
+ .build());
}
-}
\ No newline at end of file
+}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java
index 002c1e435..f7ec0d826 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.config.HoodieHBaseIndexConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
@@ -63,7 +64,8 @@ public class TestHoodieIndex {
HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder();
// Different types
HoodieWriteConfig config = clientConfigBuilder.withPath(basePath).withIndexConfig(
- indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE).build()).build();
+ indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE)
+ .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build()).build();
assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HBaseIndex);
config = clientConfigBuilder.withPath(basePath)
.withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();