diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index 0f3a89a43..aab12be2c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -97,7 +97,7 @@ public class HBaseIndex extends HoodieIndex { private Integer multiPutBatchSize; private Integer numRegionServersForTable; private final String tableName; - private HbasePutBatchSizeCalculator putBatchSizeCalculator; + private HBasePutBatchSizeCalculator putBatchSizeCalculator; public HBaseIndex(HoodieWriteConfig config) { super(config); @@ -110,7 +110,7 @@ public class HBaseIndex extends HoodieIndex { this.multiPutBatchSize = config.getHbaseIndexGetBatchSize(); this.qpsFraction = config.getHbaseIndexQPSFraction(); this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer(); - this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator(); + this.putBatchSizeCalculator = new HBasePutBatchSizeCalculator(); this.hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); } @@ -392,10 +392,10 @@ public class HBaseIndex extends HoodieIndex { return insertOnlyWriteStatusRDD.fold(new Tuple2<>(0L, 0), (w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2)); } - public static class HbasePutBatchSizeCalculator implements Serializable { + public static class HBasePutBatchSizeCalculator implements Serializable { private static final int MILLI_SECONDS_IN_A_SECOND = 1000; - private static final Logger LOG = LogManager.getLogger(HbasePutBatchSizeCalculator.class); + private static final Logger LOG = LogManager.getLogger(HBasePutBatchSizeCalculator.class); /** * Calculate putBatch size so that sum of requests across multiple jobs in a second does not exceed diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java index d2ff4a491..6b6f11f79 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java @@ -20,7 +20,6 @@ package org.apache.hudi.index.hbase; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -31,7 +30,6 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieTestDataGenerator; @@ -56,15 +54,14 @@ import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import scala.Tuple2; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atMost; @@ -75,23 +72,20 @@ import static org.mockito.Mockito.when; /** * Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown across tests, - * (see one problem here : https://issues.apache .org/jira/browse/HBASE-15835). Hence, the need to use + * (see one problem here : https://issues.apache.org/jira/browse/HBASE-15835). Hence, the need to use * {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully. */ @TestMethodOrder(MethodOrderer.Alphanumeric.class) public class TestHBaseIndex extends HoodieClientTestHarness { + private static final String TABLE_NAME = "test_table"; private static HBaseTestingUtility utility; private static Configuration hbaseConfig; - private static String tableName = "test_table"; - - public TestHBaseIndex() { - } @AfterAll public static void clean() throws Exception { if (utility != null) { - utility.deleteTable(tableName); + utility.deleteTable(TABLE_NAME); utility.shutdownMiniCluster(); } } @@ -105,7 +99,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { utility = new HBaseTestingUtility(hbaseConfig); utility.startMiniCluster(); hbaseConfig = utility.getConnection().getConfiguration(); - utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s")); + utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s")); } @BeforeEach @@ -129,9 +123,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness { @Test public void testSimpleTagLocationAndUpdate() throws Exception { - - String newCommitTime = "001"; - List records = dataGen.generateInserts(newCommitTime, 200); + final String newCommitTime = "001"; + final int numRecords = 10; + List records = dataGen.generateInserts(newCommitTime, numRecords); JavaRDD writeRecords = jsc.parallelize(records, 1); // Load to memory @@ -142,8 +136,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index - JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable); + assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); // Insert 200 records writeClient.startCommitWithTime(newCommitTime); @@ -152,26 +146,27 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed // commit - javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + JavaRDD records2 = index.tagLocation(writeRecords, jsc, hoodieTable); + assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count()); // Now commit this & update location of records inserted and validate no errors writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); - assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); - assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null + List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); + assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); } } @Test public void testTagLocationAndDuplicateUpdate() throws Exception { - String newCommitTime = "001"; - List records = dataGen.generateInserts(newCommitTime, 10); + final String newCommitTime = "001"; + final int numRecords = 10; + List records = dataGen.generateInserts(newCommitTime, numRecords); JavaRDD writeRecords = jsc.parallelize(records, 1); // Load to memory @@ -183,7 +178,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); - JavaRDD javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable); + index.tagLocation(writeRecords, jsc, hoodieTable); // Duplicate upsert and ensure correctness is maintained // We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not @@ -199,10 +194,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size()); - assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); - assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null + List taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); + assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); } @@ -213,8 +208,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HBaseIndex index = new HBaseIndex(config); HoodieWriteClient writeClient = getHoodieWriteClient(config); - String newCommitTime = writeClient.startCommit(); - List records = dataGen.generateInserts(newCommitTime, 200); + final String newCommitTime = writeClient.startCommit(); + final int numRecords = 10; + List records = dataGen.generateInserts(newCommitTime, numRecords); JavaRDD writeRecords = jsc.parallelize(records, 1); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -226,13 +222,13 @@ public class TestHBaseIndex extends HoodieClientTestHarness { writeClient.commit(newCommitTime, writeStatues); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Now tagLocation for these records, hbaseIndex should tag them - JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200); + List records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); // check tagged records are tagged with correct fileIds List fileIds = writeStatues.map(WriteStatus::getFileId).collect(); - assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0); - List taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect(); + assertEquals(0, records2.stream().filter(record -> record.getCurrentLocation().getFileId() == null).count()); + List taggedFileIds = records2.stream().map(record -> record.getCurrentLocation().getFileId()).distinct().collect(Collectors.toList()); // both lists should match assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds)); @@ -242,9 +238,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness { hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled // back commit - javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0); - assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0); + List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count()); + assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count()); } @Test @@ -255,7 +251,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Mock hbaseConnection and related entities Connection hbaseConnection = mock(Connection.class); HTable table = mock(HTable.class); - when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table); + when(hbaseConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(table); when(table.get((List) any())).thenReturn(new Result[0]); // only for test, set the hbaseConnection to mocked object @@ -304,7 +300,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Mock hbaseConnection and related entities Connection hbaseConnection = mock(Connection.class); HTable table = mock(HTable.class); - when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table); + when(hbaseConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(table); when(table.get((List) any())).thenReturn(new Result[0]); // only for test, set the hbaseConnection to mocked object @@ -319,42 +315,6 @@ public class TestHBaseIndex extends HoodieClientTestHarness { verify(table, atMost(numberOfDataFileIds)).put((List) any()); } - @Test - public void testPutBatchSizeCalculation() { - HbasePutBatchSizeCalculator batchSizeCalculator = new HbasePutBatchSizeCalculator(); - - // All asserts cases below are derived out of the first - // example below, with change in one parameter at a time. - - int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f); - // Expected batchSize is 8 because in that case, total request sent in one second is below - // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000 - // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request - // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected. - assertEquals(8, putBatchSize); - - // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved - int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f); - assertEquals(4, putBatchSize2); - - // If the parallelism is halved, batchSize has to double - int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f); - assertEquals(16, putBatchSize3); - - // If the parallelism is halved, batchSize has to double. - // This time parallelism is driven by numTasks rather than numExecutors - int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f); - assertEquals(16, putBatchSize4); - - // If sleepTimeMs is halved, batchSize has to halve - int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f); - assertEquals(4, putBatchSize5); - - // If maxQPSPerRegionServer is doubled, batchSize also doubles - int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f); - assertEquals(16, putBatchSize6); - } - @Test public void testsHBasePutAccessParallelism() { HoodieWriteConfig config = getConfig(); @@ -383,21 +343,11 @@ public class TestHBaseIndex extends HoodieClientTestHarness { assertEquals(0, hbaseNumPuts); } - @Test - public void testsHBaseIndexDefaultQPSResourceAllocator() { - HoodieWriteConfig config = getConfig(); - HBaseIndex index = new HBaseIndex(config); - HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config); - assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(), - DefaultHBaseQPSResourceAllocator.class.getName()); - assertEquals(config.getHbaseIndexQPSFraction(), - hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f); - } - @Test public void testSmallBatchSize() throws Exception { - String newCommitTime = "001"; - List records = dataGen.generateInserts(newCommitTime, 200); + final String newCommitTime = "001"; + final int numRecords = 10; + List records = dataGen.generateInserts(newCommitTime, numRecords); JavaRDD writeRecords = jsc.parallelize(records, 1); // Load to memory @@ -408,9 +358,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index - JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); - + JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable); + assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); // Insert 200 records writeClient.startCommitWithTime(newCommitTime); JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); @@ -418,26 +367,27 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed // commit - javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + JavaRDD records2 = index.tagLocation(writeRecords, jsc, hoodieTable); + assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count()); // Now commit this & update location of records inserted and validate no errors writeClient.commit(newCommitTime, writeStatues); // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); - assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); - assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null + List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); + assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); } } @Test public void testDelete() throws Exception { - String newCommitTime = "001"; - List records = dataGen.generateInserts(newCommitTime, 10); + final String newCommitTime = "001"; + final int numRecords = 10; + List records = dataGen.generateInserts(newCommitTime, numRecords); JavaRDD writeRecords = jsc.parallelize(records, 1); // Load to memory @@ -448,8 +398,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness { HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); // Test tagLocation without any entries in index - JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + JavaRDD records1 = index.tagLocation(writeRecords, jsc, hoodieTable); + assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count()); // Insert records writeClient.startCommitWithTime(newCommitTime); @@ -460,18 +410,17 @@ public class TestHBaseIndex extends HoodieClientTestHarness { // Now tagLocation for these records, hbaseIndex should tag them correctly metaClient = HoodieTableMetaClient.reload(metaClient); hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); - assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); - assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null + List records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count()); + assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); // Delete all records. This has to be done directly as deleting index entries // is not implemented via HoodieWriteClient - Option recordMetadata = Option.empty(); JavaRDD deleteWriteStatues = writeStatues.map(w -> { WriteStatus newWriteStatus = new WriteStatus(true, 1.0); - w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), recordMetadata)); + w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), Option.empty())); assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords()); newWriteStatus.setStat(new HoodieWriteStat()); return newWriteStatus; @@ -481,26 +430,14 @@ public class TestHBaseIndex extends HoodieClientTestHarness { assertNoWriteErrors(deleteStatus.collect()); // Ensure no records can be tagged - javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); - assertEquals(0, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); - assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); - assertEquals(0, javaRDD.filter(record -> (record.getCurrentLocation() != null + List records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect(); + assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count()); + assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); } } - @Test - public void testFeatureSupport() { - HoodieWriteConfig config = getConfig(); - HBaseIndex index = new HBaseIndex(config); - - assertTrue(index.canIndexLogFiles()); - assertThrows(UnsupportedOperationException.class, () -> { - HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); - index.fetchRecordLocation(jsc.parallelize(new ArrayList(), 1), jsc, hoodieTable); - }, "HbaseIndex supports fetchRecordLocation"); - } - private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) { final WriteStatus writeStatus = new WriteStatus(false, 0.1); HoodieWriteStat hoodieWriteStat = new HoodieWriteStat(); @@ -530,7 +467,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness { .hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) .hbaseIndexPutBatchSizeAutoCompute(true) .hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", "")) - .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName) + .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME) .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build()) .build()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java new file mode 100644 index 000000000..c1bf1571a --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndexUsage.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.hbase; + +import org.apache.hudi.config.HoodieWriteConfig; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +@ExtendWith(MockitoExtension.class) +public class TestHBaseIndexUsage { + + @Test + public void testFeatureSupport() { + HoodieWriteConfig config = mock(HoodieWriteConfig.class); + HBaseIndex index = new HBaseIndex(config); + + assertTrue(index.canIndexLogFiles()); + assertThrows(UnsupportedOperationException.class, () -> { + index.fetchRecordLocation(null, null, null); + }, "HBaseIndex should not support fetchRecordLocation"); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java new file mode 100644 index 000000000..3109942ba --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBasePutBatchSizeCalculator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.hbase; + +import org.apache.hudi.index.hbase.HBaseIndex.HBasePutBatchSizeCalculator; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHBasePutBatchSizeCalculator { + + @Test + public void testPutBatchSizeCalculation() { + HBasePutBatchSizeCalculator batchSizeCalculator = new HBasePutBatchSizeCalculator(); + + // All asserts cases below are derived out of the first + // example below, with change in one parameter at a time. + + int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f); + // Expected batchSize is 8 because in that case, total request sent in one second is below + // 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000 + // We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request + // 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected. + assertEquals(8, putBatchSize); + + // Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved + int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f); + assertEquals(4, putBatchSize2); + + // If the parallelism is halved, batchSize has to double + int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f); + assertEquals(16, putBatchSize3); + + // If the parallelism is halved, batchSize has to double. + // This time parallelism is driven by numTasks rather than numExecutors + int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f); + assertEquals(16, putBatchSize4); + + // If sleepTimeMs is halved, batchSize has to halve + int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f); + assertEquals(4, putBatchSize5); + + // If maxQPSPerRegionServer is doubled, batchSize also doubles + int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f); + assertEquals(16, putBatchSize6); + } + +}