[HUDI-994] Split TestHBaseIndex to unit tests (#1818)
- Refactor and improve TestHBaseIndex for performance - Move HBaseIndex unit tests to different test classes
This commit is contained in:
@@ -20,7 +20,6 @@ package org.apache.hudi.index.hbase;
|
||||
|
||||
import org.apache.hudi.client.HoodieWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
@@ -31,7 +30,6 @@ import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
import org.apache.hudi.testutils.HoodieTestDataGenerator;
|
||||
@@ -56,15 +54,14 @@ import org.junit.jupiter.api.MethodOrderer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.atMost;
|
||||
@@ -75,23 +72,20 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown across tests,
|
||||
* (see one problem here : https://issues.apache .org/jira/browse/HBASE-15835). Hence, the need to use
|
||||
* (see one problem here : https://issues.apache.org/jira/browse/HBASE-15835). Hence, the need to use
|
||||
* {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully.
|
||||
*/
|
||||
@TestMethodOrder(MethodOrderer.Alphanumeric.class)
|
||||
public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
|
||||
private static final String TABLE_NAME = "test_table";
|
||||
private static HBaseTestingUtility utility;
|
||||
private static Configuration hbaseConfig;
|
||||
private static String tableName = "test_table";
|
||||
|
||||
public TestHBaseIndex() {
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void clean() throws Exception {
|
||||
if (utility != null) {
|
||||
utility.deleteTable(tableName);
|
||||
utility.deleteTable(TABLE_NAME);
|
||||
utility.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
@@ -105,7 +99,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
utility = new HBaseTestingUtility(hbaseConfig);
|
||||
utility.startMiniCluster();
|
||||
hbaseConfig = utility.getConnection().getConfiguration();
|
||||
utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
|
||||
utility.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes("_s"));
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
@@ -129,9 +123,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testSimpleTagLocationAndUpdate() throws Exception {
|
||||
|
||||
String newCommitTime = "001";
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
final String newCommitTime = "001";
|
||||
final int numRecords = 10;
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
// Load to memory
|
||||
@@ -142,8 +136,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
|
||||
// Test tagLocation without any entries in index
|
||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||
JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
|
||||
|
||||
// Insert 200 records
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
@@ -152,26 +146,27 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
|
||||
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
|
||||
// commit
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||
JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
|
||||
|
||||
// Now commit this & update location of records inserted and validate no errors
|
||||
writeClient.commit(newCommitTime, writeStatues);
|
||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||
assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||
List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
|
||||
assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
|
||||
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
|
||||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTagLocationAndDuplicateUpdate() throws Exception {
|
||||
String newCommitTime = "001";
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
|
||||
final String newCommitTime = "001";
|
||||
final int numRecords = 10;
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
// Load to memory
|
||||
@@ -183,7 +178,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
|
||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
|
||||
// Duplicate upsert and ensure correctness is maintained
|
||||
// We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
|
||||
@@ -199,10 +194,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
|
||||
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||
List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
|
||||
assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
|
||||
assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null
|
||||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||
}
|
||||
|
||||
@@ -213,8 +208,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
HoodieWriteClient writeClient = getHoodieWriteClient(config);
|
||||
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
final String newCommitTime = writeClient.startCommit();
|
||||
final int numRecords = 10;
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
@@ -226,13 +222,13 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
writeClient.commit(newCommitTime, writeStatues);
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
// Now tagLocation for these records, hbaseIndex should tag them
|
||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200);
|
||||
List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
|
||||
assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
|
||||
|
||||
// check tagged records are tagged with correct fileIds
|
||||
List<String> fileIds = writeStatues.map(WriteStatus::getFileId).collect();
|
||||
assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0);
|
||||
List<String> taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect();
|
||||
assertEquals(0, records2.stream().filter(record -> record.getCurrentLocation().getFileId() == null).count());
|
||||
List<String> taggedFileIds = records2.stream().map(record -> record.getCurrentLocation().getFileId()).distinct().collect(Collectors.toList());
|
||||
|
||||
// both lists should match
|
||||
assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds));
|
||||
@@ -242,9 +238,9 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
|
||||
// back commit
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0);
|
||||
assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
|
||||
List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
|
||||
assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
|
||||
assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -255,7 +251,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Mock hbaseConnection and related entities
|
||||
Connection hbaseConnection = mock(Connection.class);
|
||||
HTable table = mock(HTable.class);
|
||||
when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
|
||||
when(hbaseConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(table);
|
||||
when(table.get((List<Get>) any())).thenReturn(new Result[0]);
|
||||
|
||||
// only for test, set the hbaseConnection to mocked object
|
||||
@@ -304,7 +300,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Mock hbaseConnection and related entities
|
||||
Connection hbaseConnection = mock(Connection.class);
|
||||
HTable table = mock(HTable.class);
|
||||
when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
|
||||
when(hbaseConnection.getTable(TableName.valueOf(TABLE_NAME))).thenReturn(table);
|
||||
when(table.get((List<Get>) any())).thenReturn(new Result[0]);
|
||||
|
||||
// only for test, set the hbaseConnection to mocked object
|
||||
@@ -319,42 +315,6 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
verify(table, atMost(numberOfDataFileIds)).put((List<Put>) any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutBatchSizeCalculation() {
|
||||
HbasePutBatchSizeCalculator batchSizeCalculator = new HbasePutBatchSizeCalculator();
|
||||
|
||||
// All asserts cases below are derived out of the first
|
||||
// example below, with change in one parameter at a time.
|
||||
|
||||
int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
|
||||
// Expected batchSize is 8 because in that case, total request sent in one second is below
|
||||
// 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
|
||||
// We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
|
||||
// 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
|
||||
assertEquals(8, putBatchSize);
|
||||
|
||||
// Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
|
||||
int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
|
||||
assertEquals(4, putBatchSize2);
|
||||
|
||||
// If the parallelism is halved, batchSize has to double
|
||||
int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
|
||||
assertEquals(16, putBatchSize3);
|
||||
|
||||
// If the parallelism is halved, batchSize has to double.
|
||||
// This time parallelism is driven by numTasks rather than numExecutors
|
||||
int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
|
||||
assertEquals(16, putBatchSize4);
|
||||
|
||||
// If sleepTimeMs is halved, batchSize has to halve
|
||||
int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
|
||||
assertEquals(4, putBatchSize5);
|
||||
|
||||
// If maxQPSPerRegionServer is doubled, batchSize also doubles
|
||||
int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
|
||||
assertEquals(16, putBatchSize6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testsHBasePutAccessParallelism() {
|
||||
HoodieWriteConfig config = getConfig();
|
||||
@@ -383,21 +343,11 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
assertEquals(0, hbaseNumPuts);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testsHBaseIndexDefaultQPSResourceAllocator() {
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = index.createQPSResourceAllocator(config);
|
||||
assertEquals(hBaseIndexQPSResourceAllocator.getClass().getName(),
|
||||
DefaultHBaseQPSResourceAllocator.class.getName());
|
||||
assertEquals(config.getHbaseIndexQPSFraction(),
|
||||
hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallBatchSize() throws Exception {
|
||||
String newCommitTime = "001";
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
final String newCommitTime = "001";
|
||||
final int numRecords = 10;
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
// Load to memory
|
||||
@@ -408,9 +358,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
|
||||
// Test tagLocation without any entries in index
|
||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||
|
||||
JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
|
||||
// Insert 200 records
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||
@@ -418,26 +367,27 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
|
||||
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
|
||||
// commit
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||
JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
|
||||
|
||||
// Now commit this & update location of records inserted and validate no errors
|
||||
writeClient.commit(newCommitTime, writeStatues);
|
||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||
assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||
List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
|
||||
assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
|
||||
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
|
||||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete() throws Exception {
|
||||
String newCommitTime = "001";
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
|
||||
final String newCommitTime = "001";
|
||||
final int numRecords = 10;
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
// Load to memory
|
||||
@@ -448,8 +398,8 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
|
||||
// Test tagLocation without any entries in index
|
||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||
JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
|
||||
|
||||
// Insert records
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
@@ -460,18 +410,17 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||
List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
|
||||
assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count());
|
||||
assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null
|
||||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||
|
||||
// Delete all records. This has to be done directly as deleting index entries
|
||||
// is not implemented via HoodieWriteClient
|
||||
Option recordMetadata = Option.empty();
|
||||
JavaRDD<WriteStatus> deleteWriteStatues = writeStatues.map(w -> {
|
||||
WriteStatus newWriteStatus = new WriteStatus(true, 1.0);
|
||||
w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), recordMetadata));
|
||||
w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), Option.empty()));
|
||||
assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords());
|
||||
newWriteStatus.setStat(new HoodieWriteStat());
|
||||
return newWriteStatus;
|
||||
@@ -481,26 +430,14 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
assertNoWriteErrors(deleteStatus.collect());
|
||||
|
||||
// Ensure no records can be tagged
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(0, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(0, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||
List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
|
||||
assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
|
||||
assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null
|
||||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFeatureSupport() {
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
|
||||
assertTrue(index.canIndexLogFiles());
|
||||
assertThrows(UnsupportedOperationException.class, () -> {
|
||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(), 1), jsc, hoodieTable);
|
||||
}, "HbaseIndex supports fetchRecordLocation");
|
||||
}
|
||||
|
||||
private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {
|
||||
final WriteStatus writeStatus = new WriteStatus(false, 0.1);
|
||||
HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
|
||||
@@ -530,7 +467,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
|
||||
.hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
|
||||
.hbaseIndexPutBatchSizeAutoCompute(true)
|
||||
.hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
|
||||
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
|
||||
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME)
|
||||
.hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
|
||||
.build());
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.index.hbase;
|
||||
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestHBaseIndexUsage {
|
||||
|
||||
@Test
|
||||
public void testFeatureSupport() {
|
||||
HoodieWriteConfig config = mock(HoodieWriteConfig.class);
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
|
||||
assertTrue(index.canIndexLogFiles());
|
||||
assertThrows(UnsupportedOperationException.class, () -> {
|
||||
index.fetchRecordLocation(null, null, null);
|
||||
}, "HBaseIndex should not support fetchRecordLocation");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.index.hbase;
|
||||
|
||||
import org.apache.hudi.index.hbase.HBaseIndex.HBasePutBatchSizeCalculator;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestHBasePutBatchSizeCalculator {
|
||||
|
||||
@Test
|
||||
public void testPutBatchSizeCalculation() {
|
||||
HBasePutBatchSizeCalculator batchSizeCalculator = new HBasePutBatchSizeCalculator();
|
||||
|
||||
// All asserts cases below are derived out of the first
|
||||
// example below, with change in one parameter at a time.
|
||||
|
||||
int putBatchSize = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.1f);
|
||||
// Expected batchSize is 8 because in that case, total request sent in one second is below
|
||||
// 8 (batchSize) * 200 (parallelism) * 10 (maxReqsInOneSecond) * 10 (numRegionServers) * 0.1 (qpsFraction)) => 16000
|
||||
// We assume requests get distributed to Region Servers uniformly, so each RS gets 1600 request
|
||||
// 1600 happens to be 10% of 16667 (maxQPSPerRegionServer) as expected.
|
||||
assertEquals(8, putBatchSize);
|
||||
|
||||
// Number of Region Servers are halved, total requests sent in a second are also halved, so batchSize is also halved
|
||||
int putBatchSize2 = batchSizeCalculator.getBatchSize(5, 16667, 1200, 200, 100, 0.1f);
|
||||
assertEquals(4, putBatchSize2);
|
||||
|
||||
// If the parallelism is halved, batchSize has to double
|
||||
int putBatchSize3 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 100, 100, 0.1f);
|
||||
assertEquals(16, putBatchSize3);
|
||||
|
||||
// If the parallelism is halved, batchSize has to double.
|
||||
// This time parallelism is driven by numTasks rather than numExecutors
|
||||
int putBatchSize4 = batchSizeCalculator.getBatchSize(10, 16667, 100, 200, 100, 0.1f);
|
||||
assertEquals(16, putBatchSize4);
|
||||
|
||||
// If sleepTimeMs is halved, batchSize has to halve
|
||||
int putBatchSize5 = batchSizeCalculator.getBatchSize(10, 16667, 1200, 200, 100, 0.05f);
|
||||
assertEquals(4, putBatchSize5);
|
||||
|
||||
// If maxQPSPerRegionServer is doubled, batchSize also doubles
|
||||
int putBatchSize6 = batchSizeCalculator.getBatchSize(10, 33334, 1200, 200, 100, 0.1f);
|
||||
assertEquals(16, putBatchSize6);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user