diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java index 3c6f1cf8a..b60ba5fdc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java @@ -18,6 +18,7 @@ package com.uber.hoodie.config; +import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator; import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -35,6 +36,12 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { * be honored for HBase Puts */ public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size"; + + /** + * Property to set which implementation of HBase QPS resource allocator to be used + */ + public static final String HBASE_INDEX_QPS_ALLOCATOR_CLASS = "hoodie.index.hbase.qps.allocator.class"; + public static final String DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS = DefaultHBaseQPSResourceAllocator.class.getName(); /** * Property to set to enable auto computation of put batch size */ @@ -69,6 +76,34 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { */ public static final float DEFAULT_HBASE_QPS_FRACTION = 0.5f; + /** + * Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on volume + */ + public static final String HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = "hoodie.index.hbase.dynamic_qps"; + public static final boolean DEFAULT_HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = false; + /** + * Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads + */ + public static final String HBASE_MIN_QPS_FRACTION_PROP = "hoodie.index.hbase.min.qps.fraction"; + public static final String DEFAULT_HBASE_MIN_QPS_FRACTION_PROP = "0.002"; + + public static final String HBASE_MAX_QPS_FRACTION_PROP = "hoodie.index.hbase.max.qps.fraction"; + public static final String DEFAULT_HBASE_MAX_QPS_FRACTION_PROP = "0.06"; + /** + * Hoodie index desired puts operation time in seconds + */ + public static final String HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = "hoodie.index.hbase.desired_puts_time_in_secs"; + public static final int DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = 600; + public static final String HBASE_SLEEP_MS_PUT_BATCH_PROP = "hoodie.index.hbase.sleep.ms.for.put.batch"; + public static final String HBASE_SLEEP_MS_GET_BATCH_PROP = "hoodie.index.hbase.sleep.ms.for.get.batch"; + public static final String HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS = "hoodie.index.hbase.zk.session_timeout_ms"; + public static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000; + public static final String HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS = + "hoodie.index.hbase.zk.connection_timeout_ms"; + public static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000; + public static final String HBASE_ZK_PATH_QPS_ROOT = "hoodie.index.hbase.zkpath.qps_root"; + public static final String DEFAULT_HBASE_ZK_PATH_QPS_ROOT = "/QPS_ROOT"; + public HoodieHBaseIndexConfig(final Properties props) { super(props); } @@ -111,26 +146,68 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { return this; } - public HoodieHBaseIndexConfig.Builder hbaseIndexGetBatchSize(int getBatchSize) { + public Builder hbaseZkZnodeQPSPath(String zkZnodeQPSPath) { + props.setProperty(HBASE_ZK_PATH_QPS_ROOT, zkZnodeQPSPath); + return this; + } + + public Builder hbaseIndexGetBatchSize(int getBatchSize) { props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize)); return this; } - public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSize(int putBatchSize) { + public Builder hbaseIndexPutBatchSize(int putBatchSize) { props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize)); return this; } - public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSizeAutoCompute( - boolean putBatchSizeAutoCompute) { - props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, - String.valueOf(putBatchSizeAutoCompute)); + public Builder hbaseIndexPutBatchSizeAutoCompute(boolean putBatchSizeAutoCompute) { + props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(putBatchSizeAutoCompute)); return this; } - public HoodieHBaseIndexConfig.Builder hbaseIndexQPSFraction(float qpsFraction) { - props.setProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP, - String.valueOf(qpsFraction)); + public Builder hbaseIndexDesiredPutsTime(int desiredPutsTime) { + props.setProperty(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(desiredPutsTime)); + return this; + } + + public Builder hbaseIndexShouldComputeQPSDynamically(boolean shouldComputeQPsDynamically) { + props.setProperty(HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY, String.valueOf(shouldComputeQPsDynamically)); + return this; + } + + public Builder hbaseIndexQPSFraction(float qpsFraction) { + props.setProperty(HBASE_QPS_FRACTION_PROP, String.valueOf(qpsFraction)); + return this; + } + + public Builder hbaseIndexMinQPSFraction(float minQPSFraction) { + props.setProperty(HBASE_MIN_QPS_FRACTION_PROP, String.valueOf(minQPSFraction)); + return this; + } + + public Builder hbaseIndexMaxQPSFraction(float maxQPSFraction) { + props.setProperty(HBASE_MAX_QPS_FRACTION_PROP, String.valueOf(maxQPSFraction)); + return this; + } + + public Builder hbaseIndexSleepMsBetweenPutBatch(int sleepMsBetweenPutBatch) { + props.setProperty(HBASE_SLEEP_MS_PUT_BATCH_PROP, String.valueOf(sleepMsBetweenPutBatch)); + return this; + } + + public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) { + props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass); + return this; + } + + public Builder hbaseIndexZkSessionTimeout(int zkSessionTimeout) { + props.setProperty(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(zkSessionTimeout)); + return this; + } + + public Builder hbaseIndexZkConnectionTimeout(int zkConnectionTimeout) { + props.setProperty(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(zkConnectionTimeout)); return this; } @@ -166,14 +243,25 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP), - HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, - String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE)); + HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE)); setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP), HBASE_QPS_FRACTION_PROP, String.valueOf(DEFAULT_HBASE_QPS_FRACTION)); - setDefaultOnCondition(props, - !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP), - HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf( - DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER)); + setDefaultOnCondition(props, !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP), + HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf(DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER)); + setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY), + HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY, String.valueOf(DEFAULT_HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY)); + setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS), + HBASE_INDEX_QPS_ALLOCATOR_CLASS, String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS)); + setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS), + HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); + setDefaultOnCondition(props, !props.containsKey(HBASE_ZK_PATH_QPS_ROOT), + HBASE_ZK_PATH_QPS_ROOT, String.valueOf(DEFAULT_HBASE_ZK_PATH_QPS_ROOT)); + setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS), + HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_SESSION_TIMEOUT_MS)); + setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS), + HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS)); + setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS), + HBASE_INDEX_QPS_ALLOCATOR_CLASS, String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS)); return config; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 3c69fa9ae..657d68f4b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -315,6 +315,30 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP)); } + public String getHBaseQPSResourceAllocatorClass() { + return props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_QPS_ALLOCATOR_CLASS); + } + + public String getHBaseQPSZKnodePath() { + return props.getProperty(HoodieHBaseIndexConfig.HBASE_ZK_PATH_QPS_ROOT); + } + + public String getHBaseZkZnodeSessionTimeout() { + return props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS); + } + + public String getHBaseZkZnodeConnectionTimeout() { + return props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS); + } + + public boolean getHBaseIndexShouldComputeQPSDynamically() { + return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY)); + } + + public int getHBaseIndexDesiredPutsTime() { + return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); + } + /** * Fraction of the global share of QPS that should be allocated to this job. * Let's say there are 3 jobs which have input size in terms of number of rows @@ -325,6 +349,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP)); } + public float getHBaseIndexMinQPSFraction() { + return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_MIN_QPS_FRACTION_PROP)); + } + + public float getHBaseIndexMaxQPSFraction() { + return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_FRACTION_PROP)); + } + /** * This should be same across various jobs. This is intended to limit the aggregate * QPS generated across various Hoodie jobs to an Hbase Region Server diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/DefaultHBaseQPSResourceAllocator.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/DefaultHBaseQPSResourceAllocator.java new file mode 100644 index 000000000..1f74dd241 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/DefaultHBaseQPSResourceAllocator.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.hbase; + +import com.uber.hoodie.config.HoodieWriteConfig; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAllocator { + private HoodieWriteConfig hoodieWriteConfig; + private static Logger logger = LogManager.getLogger(DefaultHBaseQPSResourceAllocator.class); + + public DefaultHBaseQPSResourceAllocator(HoodieWriteConfig hoodieWriteConfig) { + this.hoodieWriteConfig = hoodieWriteConfig; + } + + @Override + public float calculateQPSFractionForPutsTime(final long numPuts, final int numRegionServers) { + // Just return the configured qps_fraction without calculating it runtime + return hoodieWriteConfig.getHbaseIndexQPSFraction(); + } + + @Override + public float acquireQPSResources(final float desiredQPSFraction, final long numPuts) { + // Return the requested QPSFraction in this default implementation + return desiredQPSFraction; + } + + @Override + public void releaseQPSResources() { + // Do nothing, as there are no resources locked in default implementation + logger.info(String.format("Release QPS resources called for %s with default implementation, do nothing", + this.hoodieWriteConfig.getHbaseTableName())); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java index c70685882..e275b0344 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java @@ -28,6 +28,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException; @@ -60,6 +61,8 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; +import scala.Tuple2; + /** * Hoodie Index implementation backed by HBase */ @@ -99,10 +102,24 @@ public class HBaseIndex extends HoodieIndex { } private void init(HoodieWriteConfig config) { - multiPutBatchSize = config.getHbaseIndexGetBatchSize(); - qpsFraction = config.getHbaseIndexQPSFraction(); - maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer(); - putBatchSizeCalculator = new HbasePutBatchSizeCalculator(); + this.multiPutBatchSize = config.getHbaseIndexGetBatchSize(); + this.qpsFraction = config.getHbaseIndexQPSFraction(); + this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer(); + this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator(); + } + + @VisibleForTesting + public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) { + try { + logger.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass()); + final HBaseIndexQPSResourceAllocator resourceAllocator = + (HBaseIndexQPSResourceAllocator) ReflectionUtils.loadClass( + config.getHBaseQPSResourceAllocatorClass(), config); + return resourceAllocator; + } catch (Exception e) { + logger.warn("error while instantiating HBaseIndexQPSResourceAllocator", e); + } + return new DefaultHBaseQPSResourceAllocator(config); } @Override @@ -351,12 +368,27 @@ public class HBaseIndex extends HoodieIndex { @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { - setPutBatchSize(writeStatusRDD, jsc); - return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); + final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); + JavaRDD writeStatusResultRDD; + setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); + logger.info("multiPutBatchSize: before puts" + multiPutBatchSize); + JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex( + updateLocationFunction(), true); + // Forcing a spark action so HBase puts are triggered before releasing resources + if (this.config.getHBaseIndexShouldComputeQPSDynamically()) { + logger.info("writestatus count: " + writeStatusJavaRDD.count()); + writeStatusResultRDD = writeStatusRDD; + } else { + writeStatusResultRDD = writeStatusJavaRDD; + } + // Release QPS resources as HBAse puts are done at this point + hBaseIndexQPSResourceAllocator.releaseQPSResources(); + return writeStatusResultRDD; } private void setPutBatchSize(JavaRDD writeStatusRDD, - final JavaSparkContext jsc) { + HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator, + final JavaSparkContext jsc) { if (config.getHbaseIndexPutBatchSizeAutoCompute()) { SparkConf conf = jsc.getConf(); int maxExecutors = conf.getInt(DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME, 1); @@ -370,22 +402,36 @@ public class HBaseIndex extends HoodieIndex { If a writeStatus has any insert, it implies that the corresponding task contacts HBase for doing puts, since we only do puts for inserts from HBaseIndex. */ - int hbasePutAccessParallelism = getHBasePutAccessParallelism(writeStatusRDD); + final Tuple2 numPutsParallelismTuple = getHBasePutAccessParallelism(writeStatusRDD); + final long numPuts = numPutsParallelismTuple._1; + final int hbasePutsParallelism = numPutsParallelismTuple._2; + this.numRegionServersForTable = getNumRegionServersAliveForTable(); + final float desiredQPSFraction = hBaseIndexQPSResourceAllocator + .calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable); + logger.info("Desired QPSFraction :" + desiredQPSFraction); + logger.info("Number HBase puts :" + numPuts); + logger.info("Hbase Puts Parallelism :" + hbasePutsParallelism); + final float availableQpsFraction = hBaseIndexQPSResourceAllocator + .acquireQPSResources(desiredQPSFraction, numPuts); + logger.info("Allocated QPS Fraction :" + availableQpsFraction); multiPutBatchSize = putBatchSizeCalculator .getBatchSize( - getNumRegionServersAliveForTable(), + numRegionServersForTable, maxQpsPerRegionServer, - hbasePutAccessParallelism, + hbasePutsParallelism, maxExecutors, SLEEP_TIME_MILLISECONDS, - qpsFraction); + availableQpsFraction); + logger.info("multiPutBatchSize :" + multiPutBatchSize); } } @VisibleForTesting - public int getHBasePutAccessParallelism(final JavaRDD writeStatusRDD) { - return Math.toIntExact(Math.max(writeStatusRDD - .filter(w -> w.getStat().getNumInserts() > 0).count(), 1)); + public Tuple2 getHBasePutAccessParallelism(final JavaRDD writeStatusRDD) { + final JavaPairRDD insertOnlyWriteStatusRDD = + writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) + .mapToPair(w -> new Tuple2<>(w.getStat().getNumInserts(), 1)); + return insertOnlyWriteStatusRDD.reduce((w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2)); } public static class HbasePutBatchSizeCalculator implements Serializable { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndexQPSResourceAllocator.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndexQPSResourceAllocator.java new file mode 100644 index 000000000..eccdb970f --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndexQPSResourceAllocator.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.hbase; + +import java.io.Serializable; + +/** + * HBaseIndexQPSResourceAllocator defines methods to manage resource allocation for HBase index operations + */ +public interface HBaseIndexQPSResourceAllocator extends Serializable { + + /** + * This method returns the QPS Fraction value that needs to be acquired such that the respective + * HBase index operation can be completed in desiredPutsTime. + * + * @param numPuts Number of inserts to be written to HBase index + * @param desiredPutsTimeInSecs Total expected time for the HBase inserts operation + * @return QPS fraction that needs to be acquired. + */ + float calculateQPSFractionForPutsTime(final long numPuts, final int desiredPutsTimeInSecs); + + /** + * This method acquires the requested QPS Fraction against HBase cluster for index operation. + * + * @param desiredQPSFraction QPS fraction that needs to be requested and acquired + * @param numPuts Number of inserts to be written to HBase index + * @return value of the acquired QPS Fraction. + */ + float acquireQPSResources(final float desiredQPSFraction, final long numPuts); + + /** + * This method releases the acquired QPS Fraction + */ + void releaseQPSResources(); +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHBaseQPSResourceAllocator.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHBaseQPSResourceAllocator.java new file mode 100644 index 000000000..54af10b36 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHBaseQPSResourceAllocator.java @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.index; + +import com.uber.hoodie.common.HoodieClientTestUtils; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieHBaseIndexConfig; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieStorageConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator; +import com.uber.hoodie.index.hbase.HBaseIndex; +import com.uber.hoodie.index.hbase.HBaseIndexQPSResourceAllocator; +import java.io.File; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestHBaseQPSResourceAllocator { + private static JavaSparkContext jsc = null; + private static String tableName = "test_table"; + private String basePath = null; + private static HBaseTestingUtility utility; + private static Configuration hbaseConfig; + private static String QPS_TEST_SUFFIX_PATH = "qps_test_suffix"; + + @AfterClass + public static void clean() { + if (jsc != null) { + jsc.stop(); + } + } + + @BeforeClass + public static void init() throws Exception { + utility = new HBaseTestingUtility(); + utility.startMiniCluster(); + hbaseConfig = utility.getConnection().getConfiguration(); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestQPSResourceAllocator")); + } + + @After + public void clear() { + if (basePath != null) { + new File(basePath).delete(); + } + } + + @Before + public void before() throws Exception { + // Create a temp folder as the base path + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH; + // Initialize table + HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); + } + + @Test + public void testsDefaultQPSResourceAllocator() { + HoodieWriteConfig config = getConfig(Optional.empty()); + HBaseIndex index = new HBaseIndex(config); + HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config); + Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(), + DefaultHBaseQPSResourceAllocator.class.getName()); + Assert.assertEquals(config.getHbaseIndexQPSFraction(), + hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f); + } + + @Test + public void testsExplicitDefaultQPSResourceAllocator() { + HoodieWriteConfig config = getConfig(Optional.of(HoodieHBaseIndexConfig.DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS)); + HBaseIndex index = new HBaseIndex(config); + HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config); + Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(), + DefaultHBaseQPSResourceAllocator.class.getName()); + Assert.assertEquals(config.getHbaseIndexQPSFraction(), + hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f); + } + + @Test + public void testsInvalidQPSResourceAllocator() { + HoodieWriteConfig config = getConfig(Optional.of("InvalidResourceAllocatorClassName")); + HBaseIndex index = new HBaseIndex(config); + HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config); + Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(), + DefaultHBaseQPSResourceAllocator.class.getName()); + Assert.assertEquals(config.getHbaseIndexQPSFraction(), + hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f); + } + + private HoodieWriteConfig getConfig(Optional resourceAllocatorClass) { + HoodieHBaseIndexConfig hoodieHBaseIndexConfig = getConfigWithResourceAllocator(resourceAllocatorClass); + return getConfigBuilder(hoodieHBaseIndexConfig).build(); + } + + private HoodieWriteConfig.Builder getConfigBuilder(HoodieHBaseIndexConfig hoodieHBaseIndexConfig) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(1, 1).withCompactionConfig( + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).withInlineCompaction(false) + .build()).withAutoCommit(false) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .forTable("test-trip-table").withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE) + .withHBaseIndexConfig(hoodieHBaseIndexConfig) + .build()); + } + + private HoodieHBaseIndexConfig getConfigWithResourceAllocator(Optional resourceAllocatorClass) { + HoodieHBaseIndexConfig.Builder builder = + new HoodieHBaseIndexConfig.Builder() + .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) + .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName) + .hbaseIndexGetBatchSize(100); + if (resourceAllocatorClass.isPresent()) { + builder.withQPSResourceAllocatorType(resourceAllocatorClass.get()); + } + return builder.build(); + } +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java index 16e67c403..5f6a56aef 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java @@ -35,8 +35,10 @@ import com.uber.hoodie.config.HoodieHBaseIndexConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator; import com.uber.hoodie.index.hbase.HBaseIndex; import com.uber.hoodie.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator; +import com.uber.hoodie.index.hbase.HBaseIndexQPSResourceAllocator; import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.util.Arrays; @@ -64,6 +66,8 @@ import org.junit.rules.TemporaryFolder; import org.junit.runners.MethodSorters; import org.mockito.Mockito; +import scala.Tuple2; + /** * Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown across tests, * (see one problem here : https://issues.apache .org/jira/browse/HBASE-15835). Hence, the need to use @@ -331,9 +335,23 @@ public class TestHbaseIndex { getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10); - final int hbasePutAccessParallelism = index.getHBasePutAccessParallelism(writeStatusRDD); + final Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD); + final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString()); + final int hbaseNumPuts = Integer.parseInt(tuple._1.toString()); Assert.assertEquals(10, writeStatusRDD.getNumPartitions()); Assert.assertEquals(2, hbasePutAccessParallelism); + Assert.assertEquals(11, hbaseNumPuts); + } + + @Test + public void testsHBaseIndexDefaultQPSResourceAllocator() { + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config); + HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config); + Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(), + DefaultHBaseQPSResourceAllocator.class.getName()); + Assert.assertEquals(config.getHbaseIndexQPSFraction(), + hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f); } private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {