diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java new file mode 100644 index 000000000..48e8031c1 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.config; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { + + public static final String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; + public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport"; + public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table"; + public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size"; + /** + * Note that if HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP is set to true, this batch size will not + * be honored for HBase Puts + */ + public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size"; + /** + * Property to set to enable auto computation of put batch size + */ + public static final String HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP = "hoodie.index.hbase.put.batch.size.autocompute"; + public static final String DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE = "false"; + /** + * Property to set the fraction of the global share of QPS that should be allocated to this job. + * Let's say there are 3 jobs which have input size in terms of number of rows required for + * HbaseIndexing as x, 2x, 3x respectively. Then this fraction for the jobs would be (0.17) 1/6, + * 0.33 (2/6) and 0.5 (3/6) respectively. + */ + public static final String HBASE_QPS_FRACTION_PROP = "hoodie.index.hbase.qps.fraction"; + /** + * Property to set maximum QPS allowed per Region Server. This should be same across various + * jobs. This is intended to limit the aggregate QPS generated across various jobs to an Hbase + * Region Server. It is recommended to set this value based on global indexing throughput needs + * and most importantly, how much the HBase installation in use is able to tolerate without + * Region Servers going down. + */ + public static String HBASE_MAX_QPS_PER_REGION_SERVER_PROP = "hoodie.index.hbase.max.qps.per.region.server"; + /** + * Default batch size, used only for Get, but computed for Put + */ + public static final int DEFAULT_HBASE_BATCH_SIZE = 100; + /** + * A low default value. + */ + public static final int DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER = 1000; + /** + * Default is 50%, which means a total of 2 jobs can run using HbaseIndex without overwhelming + * Region Servers + */ + public static final float DEFAULT_HBASE_QPS_FRACTION = 0.5f; + + public HoodieHBaseIndexConfig(final Properties props) { + super(props); + } + + public static HoodieHBaseIndexConfig.Builder newBuilder() { + return new HoodieHBaseIndexConfig.Builder(); + } + + public static class Builder { + + private final Properties props = new Properties(); + + public HoodieHBaseIndexConfig.Builder fromFile(File propertiesFile) throws IOException { + FileReader reader = new FileReader(propertiesFile); + try { + this.props.load(reader); + return this; + } finally { + reader.close(); + } + } + + public HoodieHBaseIndexConfig.Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public HoodieHBaseIndexConfig.Builder hbaseZkQuorum(String zkString) { + props.setProperty(HBASE_ZKQUORUM_PROP, zkString); + return this; + } + + public HoodieHBaseIndexConfig.Builder hbaseZkPort(int port) { + props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port)); + return this; + } + + public HoodieHBaseIndexConfig.Builder hbaseTableName(String tableName) { + props.setProperty(HBASE_TABLENAME_PROP, tableName); + return this; + } + + public HoodieHBaseIndexConfig.Builder hbaseIndexGetBatchSize(int getBatchSize) { + props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize)); + return this; + } + + public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSize(int putBatchSize) { + props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize)); + return this; + } + + public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSizeAutoCompute( + boolean putBatchSizeAutoCompute) { + props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, + String.valueOf(putBatchSizeAutoCompute)); + return this; + } + + public HoodieHBaseIndexConfig.Builder hbaseIndexQPSFraction(float qpsFraction) { + props.setProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP, + String.valueOf(qpsFraction)); + return this; + } + + /** + *

+ * Method to set maximum QPS allowed per Region Server. This should be same across various + * jobs. This is intended to limit the aggregate QPS generated across various jobs to an + * Hbase Region Server. + *

+ *

+ * It is recommended to set this value based on your global indexing throughput needs and + * most importantly, how much your HBase installation is able to tolerate without Region + * Servers going down. + *

+ */ + public HoodieHBaseIndexConfig.Builder hbaseIndexMaxQPSPerRegionServer( + int maxQPSPerRegionServer) { + // This should be same across various jobs + props.setProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP, + String.valueOf(maxQPSPerRegionServer)); + return this; + } + + public HoodieHBaseIndexConfig build() { + HoodieHBaseIndexConfig config = new HoodieHBaseIndexConfig(props); + setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP), + HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); + setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), + HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); + setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP), + HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, + String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE)); + setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP), + HBASE_QPS_FRACTION_PROP, String.valueOf(DEFAULT_HBASE_QPS_FRACTION)); + setDefaultOnCondition(props, + !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP), + HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf( + DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER)); + return config; + } + + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index 5a24cc12f..f34750973 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -49,14 +49,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { "hoodie.bloom.index.input.storage" + ".level"; public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; - // ***** HBase Index Configs ***** - public static final String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; - public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport"; - public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table"; - public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size"; - public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size"; - public static final String DEFAULT_HBASE_BATCH_SIZE = "100"; - // ***** Bucketed Index Configs ***** public static final String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets"; @@ -92,6 +84,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder withHBaseIndexConfig(HoodieHBaseIndexConfig hBaseIndexConfig) { + props.putAll(hBaseIndexConfig.getProps()); + return this; + } + public Builder bloomFilterNumEntries(int numEntries) { props.setProperty(BLOOM_FILTER_NUM_ENTRIES, String.valueOf(numEntries)); return this; @@ -102,21 +99,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } - public Builder hbaseZkQuorum(String zkString) { - props.setProperty(HBASE_ZKQUORUM_PROP, zkString); - return this; - } - - public Builder hbaseZkPort(int port) { - props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port)); - return this; - } - - public Builder hbaseTableName(String tableName) { - props.setProperty(HBASE_TABLENAME_PROP, tableName); - return this; - } - public Builder bloomIndexParallelism(int parallelism) { props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism)); return this; @@ -137,15 +119,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } - public Builder hbaseIndexGetBatchSize(int getBatchSize) { - props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize)); - return this; - } - - public Builder hbaseIndexPutBatchSize(int putBatchSize) { - props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize)); - return this; - } public Builder withBloomIndexInputStorageLevel(String level) { props.setProperty(BLOOM_INDEX_INPUT_STORAGE_LEVEL, level); @@ -166,10 +139,6 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { BLOOM_INDEX_PRUNE_BY_RANGES_PROP, DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_USE_CACHING_PROP), BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING); - setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP), - HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); - setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), - HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL), BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 6c1e394df..2349860b1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -258,23 +258,45 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { } public String getHbaseZkQuorum() { - return props.getProperty(HoodieIndexConfig.HBASE_ZKQUORUM_PROP); + return props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKQUORUM_PROP); } public int getHbaseZkPort() { - return Integer.parseInt(props.getProperty(HoodieIndexConfig.HBASE_ZKPORT_PROP)); + return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKPORT_PROP)); } public String getHbaseTableName() { - return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP); + return props.getProperty(HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP); } public int getHbaseIndexGetBatchSize() { - return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_GET_BATCH_SIZE_PROP)); + return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP)); } public int getHbaseIndexPutBatchSize() { - return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_PROP)); + return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP)); + } + + public Boolean getHbaseIndexPutBatchSizeAutoCompute() { + return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP)); + } + + /** + * Fraction of the global share of QPS that should be allocated to this job. + * Let's say there are 3 jobs which have input size in terms of number of rows + * required for HbaseIndexing as x, 2x, 3x respectively. Then this fraction for + * the jobs would be (0.17) 1/6, 0.33 (2/6) and 0.5 (3/6) respectively. + */ + public float getHbaseIndexQPSFraction() { + return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP)); + } + + /** + * This should be same across various jobs. This is intended to limit the aggregate + * QPS generated across various Hoodie jobs to an Hbase Region Server + */ + public int getHbaseIndexMaxQPSPerRegionServer() { + return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP)); } public int getBloomIndexParallelism() { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java index e929b81d0..c70685882 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java @@ -28,12 +28,14 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException; import com.uber.hoodie.exception.HoodieIndexException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -47,10 +49,12 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -61,19 +65,44 @@ import org.apache.spark.api.java.function.Function2; */ public class HBaseIndex extends HoodieIndex { + public static final String DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME = + "spark.executor.instances"; + public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME = + "spark.dynamicAllocation.enabled"; + public static final String DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME = + "spark.dynamicAllocation.maxExecutors"; + private static final byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s"); private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts"); private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name"); private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path"); + private static final int SLEEP_TIME_MILLISECONDS = 100; private static Logger logger = LogManager.getLogger(HBaseIndex.class); private static Connection hbaseConnection = null; + private float qpsFraction; + private int maxQpsPerRegionServer; + /** + * multiPutBatchSize will be computed and re-set in updateLocation if + * {@link HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true + */ + private Integer multiPutBatchSize; + private Integer numRegionServersForTable; private final String tableName; + private HbasePutBatchSizeCalculator putBatchSizeCalculator; public HBaseIndex(HoodieWriteConfig config) { super(config); this.tableName = config.getHbaseTableName(); addShutDownHook(); + init(config); + } + + private void init(HoodieWriteConfig config) { + multiPutBatchSize = config.getHbaseIndexGetBatchSize(); + qpsFraction = config.getHbaseIndexQPSFraction(); + maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer(); + putBatchSizeCalculator = new HbasePutBatchSizeCalculator(); } @Override @@ -162,7 +191,7 @@ public class HBaseIndex extends HoodieIndex { // iterator till we reach batch size if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) { // get results for batch from Hbase - Result[] results = hTable.get(statements); + Result[] results = doGet(hTable, statements); // clear statements to be GC'd statements.clear(); for (Result result : results) { @@ -211,6 +240,11 @@ public class HBaseIndex extends HoodieIndex { }; } + private Result[] doGet(HTable hTable, List keys) throws IOException { + sleepForTime(SLEEP_TIME_MILLISECONDS); + return hTable.get(keys); + } + @Override public JavaRDD> tagLocation(JavaRDD> recordRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { @@ -218,9 +252,8 @@ public class HBaseIndex extends HoodieIndex { } private Function2, Iterator> updateLocationFunction() { - return (Function2, Iterator>) (partition, - statusIterator) -> { - Integer multiPutBatchSize = config.getHbaseIndexPutBatchSize(); + + return (Function2, Iterator>) (partition, statusIterator) -> { List writeStatusList = new ArrayList<>(); // Grab the global HBase connection @@ -303,14 +336,131 @@ public class HBaseIndex extends HoodieIndex { hTable.flushCommits(); puts.clear(); deletes.clear(); + sleepForTime(SLEEP_TIME_MILLISECONDS); + } + + private static void sleepForTime(int sleepTimeMs) { + try { + Thread.sleep(sleepTimeMs); + } catch (InterruptedException e) { + logger.error("Sleep interrupted during throttling", e); + throw new RuntimeException(e); + } } @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { + setPutBatchSize(writeStatusRDD, jsc); return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); } + private void setPutBatchSize(JavaRDD writeStatusRDD, + final JavaSparkContext jsc) { + if (config.getHbaseIndexPutBatchSizeAutoCompute()) { + SparkConf conf = jsc.getConf(); + int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1); + if (conf.getBoolean(DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME, false)) { + maxExecutors = Math.max(maxExecutors, conf.getInt( + DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME, 1)); + } + + /* + Each writeStatus represents status information from a write done in one of the IOHandles. + If a writeStatus has any insert, it implies that the corresponding task contacts HBase for + doing puts, since we only do puts for inserts from HBaseIndex. + */ + int hbasePutAccessParallelism = getHBasePutAccessParallelism(writeStatusRDD); + multiPutBatchSize = putBatchSizeCalculator + .getBatchSize( + getNumRegionServersAliveForTable(), + maxQpsPerRegionServer, + hbasePutAccessParallelism, + maxExecutors, + SLEEP_TIME_MILLISECONDS, + qpsFraction); + } + } + + @VisibleForTesting + public int getHBasePutAccessParallelism(final JavaRDD writeStatusRDD) { + return Math.toIntExact(Math.max(writeStatusRDD + .filter(w -> w.getStat().getNumInserts() > 0).count(), 1)); + } + + public static class HbasePutBatchSizeCalculator implements Serializable { + + private static final int MILLI_SECONDS_IN_A_SECOND = 1000; + private static Logger logger = LogManager.getLogger(HbasePutBatchSizeCalculator.class); + + /** + * Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed + * maxQpsPerRegionServer for each Region Server. Multiplying qpsFraction to reduce the aggregate load on common RS + * across topics. Assumption here is that all tables have regions across all RS, which is not necessarily true for + * smaller tables. So, they end up getting a smaller share of QPS than they deserve, but it might be ok. + *

+ * Example: int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f) + *

+ *

+ * Expected batchSize is 8 because in that case, total request sent to a Region Server in one second is: + * + * 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => + * 16000. We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 requests which + * happens to be 10% of 16667 (maxQPSPerRegionServer), as expected. + *

+ *

Assumptions made here

  • In a batch, writes get evenly distributed to each RS for that + * table. Since we do writes only in the case of inserts and not updates, for this assumption to fail, inserts would + * have to be skewed towards few RS, likelihood of which is less if Hbase table is pre-split and rowKeys are UUIDs + * (random strings). If this assumption fails, then it is possible for some RS to receive more than + * maxQpsPerRegionServer QPS, but for simplicity, we are going ahead with this model, since this is meant to be a + * lightweight distributed throttling mechanism without maintaining a global context. So if this assumption breaks, + * we are hoping the HBase Master relocates hot-spot regions to new Region Servers. + * + *
  • For Region Server stability, throttling at a second level granularity is fine. + * Although, within a second, the sum of queries might be within maxQpsPerRegionServer, there could be peaks at some + * sub second intervals. So, the assumption is that these peaks are tolerated by the Region Server (which at max can + * be maxQpsPerRegionServer).
  • + */ + public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer, + int numTasksDuringPut, int maxExecutors, int sleepTimeMs, float qpsFraction) { + int numRSAlive = numRegionServersForTable; + int maxReqPerSec = (int) (qpsFraction * numRSAlive * maxQpsPerRegionServer); + int numTasks = numTasksDuringPut; + int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors)); + int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs; + int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec)); + logger.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction); + logger.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive); + logger.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec); + logger.info("HbaseIndexThrottling: numTasks :" + numTasks); + logger.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors); + logger.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts); + logger.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec); + logger.info("HbaseIndexThrottling: numRegionServersForTable :" + numRegionServersForTable); + logger.info("HbaseIndexThrottling: multiPutBatchSize :" + multiPutBatchSize); + return multiPutBatchSize; + } + } + + private Integer getNumRegionServersAliveForTable() { + // This is being called in the driver, so there is only one connection + // from the driver, so ok to use a local connection variable. + if (numRegionServersForTable == null) { + try (Connection conn = getHBaseConnection()) { + RegionLocator regionLocator = conn + .getRegionLocator(TableName.valueOf(tableName)); + numRegionServersForTable = Math.toIntExact( + regionLocator.getAllRegionLocations().stream().map(e -> e.getServerName()).distinct() + .count()); + return numRegionServersForTable; + } catch (IOException e) { + logger.error(e); + throw new RuntimeException(e); + } + } + return numRegionServersForTable; + } + @Override public boolean rollbackCommit(String commitTime) { // Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java index 6f681564b..bde2bf6ad 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java @@ -27,14 +27,18 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieHBaseIndexConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.hbase.HBaseIndex; +import com.uber.hoodie.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator; import com.uber.hoodie.table.HoodieTable; import java.io.File; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -51,6 +55,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.FixMethodOrder; @@ -281,6 +286,66 @@ public class TestHbaseIndex { Mockito.verify(table, atMost(numberOfDataFileIds)).put((List) anyObject()); } + @Test + public void testPutBatchSizeCalculation() { + HbasePutBatchSizeCalculator batchSizeCalculator = new HbasePutBatchSizeCalculator(); + + // All asserts cases below are derived out of the first + // example below, with change in one parameter at a time. + + int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f); + // Expected batchSize is 8 because in that case, total request sent in one second is below + // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000 + // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request + // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected. + Assert.assertEquals(putBatchSize, 8); + + // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved + int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f); + Assert.assertEquals(putBatchSize2, 4); + + // If the parallelism is halved, batchSize has to double + int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f); + Assert.assertEquals(putBatchSize3, 16); + + // If the parallelism is halved, batchSize has to double. + // This time parallelism is driven by numTasks rather than numExecutors + int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f); + Assert.assertEquals(putBatchSize4, 16); + + // If sleepTimeMs is halved, batchSize has to halve + int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f); + Assert.assertEquals(putBatchSize5, 4); + + // If maxQPSPerRegionServer is doubled, batchSize also doubles + int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f); + Assert.assertEquals(putBatchSize6, 16); + } + + @Test + public void testsHBasePutAccessParallelism() { + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config); + final JavaRDD writeStatusRDD = jsc.parallelize( + Arrays.asList( + getSampleWriteStatus(1, 2), + getSampleWriteStatus(0, 3), + getSampleWriteStatus(10, 0)), + 10); + final int hbasePutAccessParallelism = index.getHBasePutAccessParallelism(writeStatusRDD); + Assert.assertEquals(10, writeStatusRDD.getNumPartitions()); + Assert.assertEquals(2, hbasePutAccessParallelism); + } + + private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) { + final WriteStatus writeStatus = new WriteStatus(); + HoodieWriteStat hoodieWriteStat = new HoodieWriteStat(); + hoodieWriteStat.setNumInserts(numInserts); + hoodieWriteStat.setNumUpdateWrites(numUpdateWrites); + writeStatus.setStat(hoodieWriteStat); + return writeStatus; + } + private void assertNoWriteErrors(List statuses) { // Verify there are no errors for (WriteStatus status : statuses) { @@ -300,8 +365,11 @@ public class TestHbaseIndex { .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .forTable("test-trip-table").withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE) - .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) - .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName) - .hbaseIndexGetBatchSize(100).hbaseIndexPutBatchSize(100).build()); + .withHBaseIndexConfig( + new HoodieHBaseIndexConfig.Builder() + .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) + .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName) + .hbaseIndexGetBatchSize(100).build()) + .build()); } -} \ No newline at end of file +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java index 002c1e435..f7ec0d826 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.config.HoodieHBaseIndexConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.bloom.HoodieBloomIndex; @@ -63,7 +64,8 @@ public class TestHoodieIndex { HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder(); // Different types HoodieWriteConfig config = clientConfigBuilder.withPath(basePath).withIndexConfig( - indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE).build()).build(); + indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE) + .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build()).build(); assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HBaseIndex); config = clientConfigBuilder.withPath(basePath) .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();