1
0

Use RateLimiter instead of sleep. Repartition WriteStatus to optimize Hbase index writes (#1484)

This commit is contained in:
Venkatesh Rudraraju
2020-11-02 08:33:27 -08:00
committed by GitHub
parent a205dd10fa
commit 59f995a3f5
5 changed files with 398 additions and 84 deletions

View File

@@ -62,6 +62,8 @@ import org.junit.jupiter.api.TestMethodOrder;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -382,13 +384,98 @@ public class TestHBaseIndex extends FunctionalTestHarness {
HoodieWriteConfig config = getConfig();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
final JavaRDD<WriteStatus> writeStatusRDD = jsc().parallelize(
Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10);
Arrays.asList(
getSampleWriteStatus(0, 2),
getSampleWriteStatus(2, 3),
getSampleWriteStatus(4, 3),
getSampleWriteStatus(6, 3),
getSampleWriteStatus(8, 0)),
10);
final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
assertEquals(10, writeStatusRDD.getNumPartitions());
assertEquals(2, hbasePutAccessParallelism);
assertEquals(11, hbaseNumPuts);
assertEquals(4, hbasePutAccessParallelism);
assertEquals(20, hbaseNumPuts);
}
@Test
public void testsWriteStatusPartitioner() {
HoodieWriteConfig config = getConfig();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
int parallelism = 4;
final JavaRDD<WriteStatus> writeStatusRDD = jsc().parallelize(
Arrays.asList(
getSampleWriteStatusWithFileId(0, 2),
getSampleWriteStatusWithFileId(2, 3),
getSampleWriteStatusWithFileId(4, 3),
getSampleWriteStatusWithFileId(0, 3),
getSampleWriteStatusWithFileId(11, 0)), parallelism);
final Map<String, Integer> fileIdPartitionMap = index.mapFileWithInsertsToUniquePartition(writeStatusRDD);
int numWriteStatusWithInserts = (int) index.getHBasePutAccessParallelism(writeStatusRDD)._2;
JavaRDD<WriteStatus> partitionedRDD = writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
.partitionBy(new SparkHoodieHBaseIndex
.WriteStatusPartitioner(fileIdPartitionMap,
numWriteStatusWithInserts)).map(w -> w._2());
assertEquals(numWriteStatusWithInserts, partitionedRDD.getNumPartitions());
int[] partitionIndexesBeforeRepartition = writeStatusRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
assertEquals(parallelism, partitionIndexesBeforeRepartition.length);
int[] partitionIndexesAfterRepartition = partitionedRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
// there should be 3 partitions after repartition, because only 3 writestatus has
// inserts (numWriteStatusWithInserts)
assertEquals(numWriteStatusWithInserts, partitionIndexesAfterRepartition.length);
List<WriteStatus>[] writeStatuses = partitionedRDD.collectPartitions(partitionIndexesAfterRepartition);
for (List<WriteStatus> list : writeStatuses) {
int count = 0;
for (WriteStatus w: list) {
if (w.getStat().getNumInserts() > 0) {
count++;
}
}
assertEquals(1, count);
}
}
@Test
public void testsWriteStatusPartitionerWithNoInserts() {
HoodieWriteConfig config = getConfig();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
int parallelism = 3;
final JavaRDD<WriteStatus> writeStatusRDD = jsc().parallelize(
Arrays.asList(
getSampleWriteStatusWithFileId(0, 2),
getSampleWriteStatusWithFileId(0, 3),
getSampleWriteStatusWithFileId(0, 0)), parallelism);
final Map<String, Integer> fileIdPartitionMap = index.mapFileWithInsertsToUniquePartition(writeStatusRDD);
int numWriteStatusWithInserts = (int) index.getHBasePutAccessParallelism(writeStatusRDD)._2;
JavaRDD<WriteStatus> partitionedRDD = writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
.partitionBy(new SparkHoodieHBaseIndex
.WriteStatusPartitioner(fileIdPartitionMap,
numWriteStatusWithInserts)).map(w -> w._2());
assertEquals(numWriteStatusWithInserts, partitionedRDD.getNumPartitions());
int[] partitionIndexesBeforeRepartition = writeStatusRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
assertEquals(parallelism, partitionIndexesBeforeRepartition.length);
int[] partitionIndexesAfterRepartition = partitionedRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
// there should be 3 partitions after repartition, because only 3 writestatus has inserts
// (numWriteStatusWithInserts)
assertEquals(numWriteStatusWithInserts, partitionIndexesAfterRepartition.length);
assertEquals(partitionIndexesBeforeRepartition.length, parallelism);
}
private WriteStatus getSampleWriteStatusWithFileId(final int numInserts, final int numUpdateWrites) {
final WriteStatus writeStatus = new WriteStatus(false, 0.0);
HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
hoodieWriteStat.setNumInserts(numInserts);
hoodieWriteStat.setNumUpdateWrites(numUpdateWrites);
writeStatus.setStat(hoodieWriteStat);
writeStatus.setFileId(UUID.randomUUID().toString());
return writeStatus;
}
@Test

View File

@@ -27,38 +27,35 @@ public class TestHBasePutBatchSizeCalculator {
@Test
public void testPutBatchSizeCalculation() {
SparkHoodieHBaseIndex.HBasePutBatchSizeCalculator batchSizeCalculator = new SparkHoodieHBaseIndex.HBasePutBatchSizeCalculator();
SparkHoodieHBaseIndex.HBasePutBatchSizeCalculator batchSizeCalculator = new SparkHoodieHBaseIndex
.HBasePutBatchSizeCalculator();
// All asserts cases below are derived out of the first
// example below, with change in one parameter at a time.
int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
// Expected batchSize is 8 because in that case, total request sent in one second is below
// 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
// We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
// 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
assertEquals(8, putBatchSize);
int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 0.1f);
// Total puts that can be sent in 1 second = (10 * 16667 * 0.1) = 16,667
// Total puts per batch will be (16,667 / parallelism) = 83.335, where 200 is the maxExecutors
assertEquals(putBatchSize, 83);
// Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
assertEquals(4, putBatchSize2);
int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 0.1f);
assertEquals(putBatchSize2, 41);
// If the parallelism is halved, batchSize has to double
int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
assertEquals(16, putBatchSize3);
int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 0.1f);
assertEquals(putBatchSize3, 166);
// If the parallelism is halved, batchSize has to double.
// This time parallelism is driven by numTasks rather than numExecutors
int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
assertEquals(16, putBatchSize4);
int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 0.1f);
assertEquals(putBatchSize4, 166);
// If sleepTimeMs is halved, batchSize has to halve
int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
assertEquals(4, putBatchSize5);
int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 0.05f);
assertEquals(putBatchSize5, 41);
// If maxQPSPerRegionServer is doubled, batchSize also doubles
int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
assertEquals(16, putBatchSize6);
int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 0.1f);
assertEquals(putBatchSize6, 166);
}
}