1
0

default implementation for HBase index qps allocator (#685)

* default implementation and configs for HBase index qps allocator

* Test for QPS allocator and address CR

* fix QPS allocator test
This commit is contained in:
Venkat
2019-05-24 18:43:46 -07:00
committed by vinoth chandar
parent 99b0c72aa6
commit f2d91a455e
7 changed files with 461 additions and 30 deletions

View File

@@ -18,6 +18,7 @@
package com.uber.hoodie.config; package com.uber.hoodie.config;
import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator;
import java.io.File; import java.io.File;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
@@ -35,6 +36,12 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
* be honored for HBase Puts * be honored for HBase Puts
*/ */
public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size"; public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
/**
* Property to set which implementation of HBase QPS resource allocator to be used
*/
public static final String HBASE_INDEX_QPS_ALLOCATOR_CLASS = "hoodie.index.hbase.qps.allocator.class";
public static final String DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS = DefaultHBaseQPSResourceAllocator.class.getName();
/** /**
* Property to set to enable auto computation of put batch size * Property to set to enable auto computation of put batch size
*/ */
@@ -69,6 +76,34 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
*/ */
public static final float DEFAULT_HBASE_QPS_FRACTION = 0.5f; public static final float DEFAULT_HBASE_QPS_FRACTION = 0.5f;
/**
* Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on volume
*/
public static final String HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = "hoodie.index.hbase.dynamic_qps";
public static final boolean DEFAULT_HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = false;
/**
* Min and Max for HBASE_QPS_FRACTION_PROP to stabilize skewed volume workloads
*/
public static final String HBASE_MIN_QPS_FRACTION_PROP = "hoodie.index.hbase.min.qps.fraction";
public static final String DEFAULT_HBASE_MIN_QPS_FRACTION_PROP = "0.002";
public static final String HBASE_MAX_QPS_FRACTION_PROP = "hoodie.index.hbase.max.qps.fraction";
public static final String DEFAULT_HBASE_MAX_QPS_FRACTION_PROP = "0.06";
/**
* Hoodie index desired puts operation time in seconds
*/
public static final String HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = "hoodie.index.hbase.desired_puts_time_in_secs";
public static final int DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = 600;
public static final String HBASE_SLEEP_MS_PUT_BATCH_PROP = "hoodie.index.hbase.sleep.ms.for.put.batch";
public static final String HBASE_SLEEP_MS_GET_BATCH_PROP = "hoodie.index.hbase.sleep.ms.for.get.batch";
public static final String HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS = "hoodie.index.hbase.zk.session_timeout_ms";
public static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
public static final String HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS =
"hoodie.index.hbase.zk.connection_timeout_ms";
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
public static final String HBASE_ZK_PATH_QPS_ROOT = "hoodie.index.hbase.zkpath.qps_root";
public static final String DEFAULT_HBASE_ZK_PATH_QPS_ROOT = "/QPS_ROOT";
public HoodieHBaseIndexConfig(final Properties props) { public HoodieHBaseIndexConfig(final Properties props) {
super(props); super(props);
} }
@@ -111,26 +146,68 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
return this; return this;
} }
public HoodieHBaseIndexConfig.Builder hbaseIndexGetBatchSize(int getBatchSize) { public Builder hbaseZkZnodeQPSPath(String zkZnodeQPSPath) {
props.setProperty(HBASE_ZK_PATH_QPS_ROOT, zkZnodeQPSPath);
return this;
}
public Builder hbaseIndexGetBatchSize(int getBatchSize) {
props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize)); props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
return this; return this;
} }
public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSize(int putBatchSize) { public Builder hbaseIndexPutBatchSize(int putBatchSize) {
props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize)); props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
return this; return this;
} }
public HoodieHBaseIndexConfig.Builder hbaseIndexPutBatchSizeAutoCompute( public Builder hbaseIndexPutBatchSizeAutoCompute(boolean putBatchSizeAutoCompute) {
boolean putBatchSizeAutoCompute) { props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(putBatchSizeAutoCompute));
props.setProperty(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP,
String.valueOf(putBatchSizeAutoCompute));
return this; return this;
} }
public HoodieHBaseIndexConfig.Builder hbaseIndexQPSFraction(float qpsFraction) { public Builder hbaseIndexDesiredPutsTime(int desiredPutsTime) {
props.setProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP, props.setProperty(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(desiredPutsTime));
String.valueOf(qpsFraction)); return this;
}
public Builder hbaseIndexShouldComputeQPSDynamically(boolean shouldComputeQPsDynamically) {
props.setProperty(HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY, String.valueOf(shouldComputeQPsDynamically));
return this;
}
public Builder hbaseIndexQPSFraction(float qpsFraction) {
props.setProperty(HBASE_QPS_FRACTION_PROP, String.valueOf(qpsFraction));
return this;
}
public Builder hbaseIndexMinQPSFraction(float minQPSFraction) {
props.setProperty(HBASE_MIN_QPS_FRACTION_PROP, String.valueOf(minQPSFraction));
return this;
}
public Builder hbaseIndexMaxQPSFraction(float maxQPSFraction) {
props.setProperty(HBASE_MAX_QPS_FRACTION_PROP, String.valueOf(maxQPSFraction));
return this;
}
public Builder hbaseIndexSleepMsBetweenPutBatch(int sleepMsBetweenPutBatch) {
props.setProperty(HBASE_SLEEP_MS_PUT_BATCH_PROP, String.valueOf(sleepMsBetweenPutBatch));
return this;
}
public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
return this;
}
public Builder hbaseIndexZkSessionTimeout(int zkSessionTimeout) {
props.setProperty(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(zkSessionTimeout));
return this;
}
public Builder hbaseIndexZkConnectionTimeout(int zkConnectionTimeout) {
props.setProperty(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(zkConnectionTimeout));
return this; return this;
} }
@@ -166,14 +243,25 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP), setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP),
HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE));
String.valueOf(DEFAULT_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE));
setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP), setDefaultOnCondition(props, !props.containsKey(HBASE_QPS_FRACTION_PROP),
HBASE_QPS_FRACTION_PROP, String.valueOf(DEFAULT_HBASE_QPS_FRACTION)); HBASE_QPS_FRACTION_PROP, String.valueOf(DEFAULT_HBASE_QPS_FRACTION));
setDefaultOnCondition(props, setDefaultOnCondition(props, !props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP),
!props.containsKey(HBASE_MAX_QPS_PER_REGION_SERVER_PROP), HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf(DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER));
HBASE_MAX_QPS_PER_REGION_SERVER_PROP, String.valueOf( setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY),
DEFAULT_HBASE_MAX_QPS_PER_REGION_SERVER)); HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY, String.valueOf(DEFAULT_HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY));
setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS),
HBASE_INDEX_QPS_ALLOCATOR_CLASS, String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS),
HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS, String.valueOf(DEFAULT_HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
setDefaultOnCondition(props, !props.containsKey(HBASE_ZK_PATH_QPS_ROOT),
HBASE_ZK_PATH_QPS_ROOT, String.valueOf(DEFAULT_HBASE_ZK_PATH_QPS_ROOT));
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS),
HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_SESSION_TIMEOUT_MS));
setDefaultOnCondition(props, !props.containsKey(HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS),
HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS));
setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS),
HBASE_INDEX_QPS_ALLOCATOR_CLASS, String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
return config; return config;
} }

View File

@@ -315,6 +315,30 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP)); return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP));
} }
public String getHBaseQPSResourceAllocatorClass() {
return props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_QPS_ALLOCATOR_CLASS);
}
public String getHBaseQPSZKnodePath() {
return props.getProperty(HoodieHBaseIndexConfig.HBASE_ZK_PATH_QPS_ROOT);
}
public String getHBaseZkZnodeSessionTimeout() {
return props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS);
}
public String getHBaseZkZnodeConnectionTimeout() {
return props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS);
}
public boolean getHBaseIndexShouldComputeQPSDynamically() {
return Boolean.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY));
}
public int getHBaseIndexDesiredPutsTime() {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
}
/** /**
* Fraction of the global share of QPS that should be allocated to this job. * Fraction of the global share of QPS that should be allocated to this job.
* Let's say there are 3 jobs which have input size in terms of number of rows * Let's say there are 3 jobs which have input size in terms of number of rows
@@ -325,6 +349,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP)); return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP));
} }
public float getHBaseIndexMinQPSFraction() {
return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_MIN_QPS_FRACTION_PROP));
}
public float getHBaseIndexMaxQPSFraction() {
return Float.parseFloat(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_FRACTION_PROP));
}
/** /**
* This should be same across various jobs. This is intended to limit the aggregate * This should be same across various jobs. This is intended to limit the aggregate
* QPS generated across various Hoodie jobs to an Hbase Region Server * QPS generated across various Hoodie jobs to an Hbase Region Server

View File

@@ -0,0 +1,52 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.hbase;
import com.uber.hoodie.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class DefaultHBaseQPSResourceAllocator implements HBaseIndexQPSResourceAllocator {
private HoodieWriteConfig hoodieWriteConfig;
private static Logger logger = LogManager.getLogger(DefaultHBaseQPSResourceAllocator.class);
public DefaultHBaseQPSResourceAllocator(HoodieWriteConfig hoodieWriteConfig) {
this.hoodieWriteConfig = hoodieWriteConfig;
}
@Override
public float calculateQPSFractionForPutsTime(final long numPuts, final int numRegionServers) {
// Just return the configured qps_fraction without calculating it runtime
return hoodieWriteConfig.getHbaseIndexQPSFraction();
}
@Override
public float acquireQPSResources(final float desiredQPSFraction, final long numPuts) {
// Return the requested QPSFraction in this default implementation
return desiredQPSFraction;
}
@Override
public void releaseQPSResources() {
// Do nothing, as there are no resources locked in default implementation
logger.info(String.format("Release QPS resources called for %s with default implementation, do nothing",
this.hoodieWriteConfig.getHbaseTableName()));
}
}

View File

@@ -28,6 +28,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException; import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
@@ -60,6 +61,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
/** /**
* Hoodie Index implementation backed by HBase * Hoodie Index implementation backed by HBase
*/ */
@@ -99,10 +102,24 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
} }
private void init(HoodieWriteConfig config) { private void init(HoodieWriteConfig config) {
multiPutBatchSize = config.getHbaseIndexGetBatchSize(); this.multiPutBatchSize = config.getHbaseIndexGetBatchSize();
qpsFraction = config.getHbaseIndexQPSFraction(); this.qpsFraction = config.getHbaseIndexQPSFraction();
maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer(); this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
putBatchSizeCalculator = new HbasePutBatchSizeCalculator(); this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
}
@VisibleForTesting
public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) {
try {
logger.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass());
final HBaseIndexQPSResourceAllocator resourceAllocator =
(HBaseIndexQPSResourceAllocator) ReflectionUtils.loadClass(
config.getHBaseQPSResourceAllocatorClass(), config);
return resourceAllocator;
} catch (Exception e) {
logger.warn("error while instantiating HBaseIndexQPSResourceAllocator", e);
}
return new DefaultHBaseQPSResourceAllocator(config);
} }
@Override @Override
@@ -351,11 +368,26 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
@Override @Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) { HoodieTable<T> hoodieTable) {
setPutBatchSize(writeStatusRDD, jsc); final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); JavaRDD<WriteStatus> writeStatusResultRDD;
setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
logger.info("multiPutBatchSize: before puts" + multiPutBatchSize);
JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(
updateLocationFunction(), true);
// Forcing a spark action so HBase puts are triggered before releasing resources
if (this.config.getHBaseIndexShouldComputeQPSDynamically()) {
logger.info("writestatus count: " + writeStatusJavaRDD.count());
writeStatusResultRDD = writeStatusRDD;
} else {
writeStatusResultRDD = writeStatusJavaRDD;
}
// Release QPS resources as HBAse puts are done at this point
hBaseIndexQPSResourceAllocator.releaseQPSResources();
return writeStatusResultRDD;
} }
private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD, private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD,
HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator,
final JavaSparkContext jsc) { final JavaSparkContext jsc) {
if (config.getHbaseIndexPutBatchSizeAutoCompute()) { if (config.getHbaseIndexPutBatchSizeAutoCompute()) {
SparkConf conf = jsc.getConf(); SparkConf conf = jsc.getConf();
@@ -370,22 +402,36 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
If a writeStatus has any insert, it implies that the corresponding task contacts HBase for If a writeStatus has any insert, it implies that the corresponding task contacts HBase for
doing puts, since we only do puts for inserts from HBaseIndex. doing puts, since we only do puts for inserts from HBaseIndex.
*/ */
int hbasePutAccessParallelism = getHBasePutAccessParallelism(writeStatusRDD); final Tuple2<Long, Integer> numPutsParallelismTuple = getHBasePutAccessParallelism(writeStatusRDD);
final long numPuts = numPutsParallelismTuple._1;
final int hbasePutsParallelism = numPutsParallelismTuple._2;
this.numRegionServersForTable = getNumRegionServersAliveForTable();
final float desiredQPSFraction = hBaseIndexQPSResourceAllocator
.calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable);
logger.info("Desired QPSFraction :" + desiredQPSFraction);
logger.info("Number HBase puts :" + numPuts);
logger.info("Hbase Puts Parallelism :" + hbasePutsParallelism);
final float availableQpsFraction = hBaseIndexQPSResourceAllocator
.acquireQPSResources(desiredQPSFraction, numPuts);
logger.info("Allocated QPS Fraction :" + availableQpsFraction);
multiPutBatchSize = putBatchSizeCalculator multiPutBatchSize = putBatchSizeCalculator
.getBatchSize( .getBatchSize(
getNumRegionServersAliveForTable(), numRegionServersForTable,
maxQpsPerRegionServer, maxQpsPerRegionServer,
hbasePutAccessParallelism, hbasePutsParallelism,
maxExecutors, maxExecutors,
SLEEP_TIME_MILLISECONDS, SLEEP_TIME_MILLISECONDS,
qpsFraction); availableQpsFraction);
logger.info("multiPutBatchSize :" + multiPutBatchSize);
} }
} }
@VisibleForTesting @VisibleForTesting
public int getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) { public Tuple2<Long, Integer> getHBasePutAccessParallelism(final JavaRDD<WriteStatus> writeStatusRDD) {
return Math.toIntExact(Math.max(writeStatusRDD final JavaPairRDD<Long, Integer> insertOnlyWriteStatusRDD =
.filter(w -> w.getStat().getNumInserts() > 0).count(), 1)); writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
.mapToPair(w -> new Tuple2<>(w.getStat().getNumInserts(), 1));
return insertOnlyWriteStatusRDD.reduce((w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2));
} }
public static class HbasePutBatchSizeCalculator implements Serializable { public static class HbasePutBatchSizeCalculator implements Serializable {

View File

@@ -0,0 +1,51 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.hbase;
import java.io.Serializable;
/**
* <code>HBaseIndexQPSResourceAllocator</code> defines methods to manage resource allocation for HBase index operations
*/
public interface HBaseIndexQPSResourceAllocator extends Serializable {
/**
* This method returns the QPS Fraction value that needs to be acquired such that the respective
* HBase index operation can be completed in desiredPutsTime.
*
* @param numPuts Number of inserts to be written to HBase index
* @param desiredPutsTimeInSecs Total expected time for the HBase inserts operation
* @return QPS fraction that needs to be acquired.
*/
float calculateQPSFractionForPutsTime(final long numPuts, final int desiredPutsTimeInSecs);
/**
* This method acquires the requested QPS Fraction against HBase cluster for index operation.
*
* @param desiredQPSFraction QPS fraction that needs to be requested and acquired
* @param numPuts Number of inserts to be written to HBase index
* @return value of the acquired QPS Fraction.
*/
float acquireQPSResources(final float desiredQPSFraction, final long numPuts);
/**
* This method releases the acquired QPS Fraction
*/
void releaseQPSResources();
}

View File

@@ -0,0 +1,144 @@
/*
* Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.index;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieHBaseIndexConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator;
import com.uber.hoodie.index.hbase.HBaseIndex;
import com.uber.hoodie.index.hbase.HBaseIndexQPSResourceAllocator;
import java.io.File;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestHBaseQPSResourceAllocator {
private static JavaSparkContext jsc = null;
private static String tableName = "test_table";
private String basePath = null;
private static HBaseTestingUtility utility;
private static Configuration hbaseConfig;
private static String QPS_TEST_SUFFIX_PATH = "qps_test_suffix";
@AfterClass
public static void clean() {
if (jsc != null) {
jsc.stop();
}
}
@BeforeClass
public static void init() throws Exception {
utility = new HBaseTestingUtility();
utility.startMiniCluster();
hbaseConfig = utility.getConnection().getConfiguration();
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestQPSResourceAllocator"));
}
@After
public void clear() {
if (basePath != null) {
new File(basePath).delete();
}
}
@Before
public void before() throws Exception {
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
// Initialize table
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
}
@Test
public void testsDefaultQPSResourceAllocator() {
HoodieWriteConfig config = getConfig(Optional.empty());
HBaseIndex index = new HBaseIndex(config);
HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
DefaultHBaseQPSResourceAllocator.class.getName());
Assert.assertEquals(config.getHbaseIndexQPSFraction(),
hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
}
@Test
public void testsExplicitDefaultQPSResourceAllocator() {
HoodieWriteConfig config = getConfig(Optional.of(HoodieHBaseIndexConfig.DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
HBaseIndex index = new HBaseIndex(config);
HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
DefaultHBaseQPSResourceAllocator.class.getName());
Assert.assertEquals(config.getHbaseIndexQPSFraction(),
hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
}
@Test
public void testsInvalidQPSResourceAllocator() {
HoodieWriteConfig config = getConfig(Optional.of("InvalidResourceAllocatorClassName"));
HBaseIndex index = new HBaseIndex(config);
HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
DefaultHBaseQPSResourceAllocator.class.getName());
Assert.assertEquals(config.getHbaseIndexQPSFraction(),
hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
}
private HoodieWriteConfig getConfig(Optional<String> resourceAllocatorClass) {
HoodieHBaseIndexConfig hoodieHBaseIndexConfig = getConfigWithResourceAllocator(resourceAllocatorClass);
return getConfigBuilder(hoodieHBaseIndexConfig).build();
}
private HoodieWriteConfig.Builder getConfigBuilder(HoodieHBaseIndexConfig hoodieHBaseIndexConfig) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(1, 1).withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).withInlineCompaction(false)
.build()).withAutoCommit(false)
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.forTable("test-trip-table").withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
.withHBaseIndexConfig(hoodieHBaseIndexConfig)
.build());
}
private HoodieHBaseIndexConfig getConfigWithResourceAllocator(Optional<String> resourceAllocatorClass) {
HoodieHBaseIndexConfig.Builder builder =
new HoodieHBaseIndexConfig.Builder()
.hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
.hbaseIndexGetBatchSize(100);
if (resourceAllocatorClass.isPresent()) {
builder.withQPSResourceAllocatorType(resourceAllocatorClass.get());
}
return builder.build();
}
}

View File

@@ -35,8 +35,10 @@ import com.uber.hoodie.config.HoodieHBaseIndexConfig;
import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.hbase.DefaultHBaseQPSResourceAllocator;
import com.uber.hoodie.index.hbase.HBaseIndex; import com.uber.hoodie.index.hbase.HBaseIndex;
import com.uber.hoodie.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator; import com.uber.hoodie.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator;
import com.uber.hoodie.index.hbase.HBaseIndexQPSResourceAllocator;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.io.File; import java.io.File;
import java.util.Arrays; import java.util.Arrays;
@@ -64,6 +66,8 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runners.MethodSorters; import org.junit.runners.MethodSorters;
import org.mockito.Mockito; import org.mockito.Mockito;
import scala.Tuple2;
/** /**
* Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown across tests, * Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown across tests,
* (see one problem here : https://issues.apache .org/jira/browse/HBASE-15835). Hence, the need to use * (see one problem here : https://issues.apache .org/jira/browse/HBASE-15835). Hence, the need to use
@@ -331,9 +335,23 @@ public class TestHbaseIndex {
getSampleWriteStatus(0, 3), getSampleWriteStatus(0, 3),
getSampleWriteStatus(10, 0)), getSampleWriteStatus(10, 0)),
10); 10);
final int hbasePutAccessParallelism = index.getHBasePutAccessParallelism(writeStatusRDD); final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
Assert.assertEquals(10, writeStatusRDD.getNumPartitions()); Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
Assert.assertEquals(2, hbasePutAccessParallelism); Assert.assertEquals(2, hbasePutAccessParallelism);
Assert.assertEquals(11, hbaseNumPuts);
}
@Test
public void testsHBaseIndexDefaultQPSResourceAllocator() {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
Assert.assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
DefaultHBaseQPSResourceAllocator.class.getName());
Assert.assertEquals(config.getHbaseIndexQPSFraction(),
hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
} }
private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) { private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {