From 5f8bf970058d5915af1910dbf917a33356d06af2 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Sat, 7 Mar 2020 16:48:43 -0800 Subject: [PATCH] [HUDI-671] Added unit-test for HBaseIndex (#1381) --- .../org/apache/hudi/index/TestHbaseIndex.java | 133 +++++++++++++++++- 1 file changed, 129 insertions(+), 4 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java index 289394741..53adf6c20 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java @@ -22,9 +22,11 @@ import org.apache.hudi.common.HoodieClientTestHarness; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -38,6 +40,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; @@ -57,6 +60,7 @@ import org.junit.Test; import org.junit.runners.MethodSorters; import org.mockito.Mockito; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -65,6 +69,7 @@ import scala.Tuple2; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.times; @@ -93,7 +98,10 @@ public class TestHbaseIndex extends HoodieClientTestHarness { @BeforeClass public static void init() throws Exception { // Initialize HbaseMiniCluster - utility = new HBaseTestingUtility(); + hbaseConfig = HBaseConfiguration.create(); + hbaseConfig.set("zookeeper.znode.parent", "/hudi-hbase-test"); + + utility = new HBaseTestingUtility(hbaseConfig); utility.startMiniCluster(); hbaseConfig = utility.getConnection().getConfiguration(); utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s")); @@ -389,6 +397,117 @@ public class TestHbaseIndex extends HoodieClientTestHarness { hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f); } + @Test + public void testSmallBatchSize() throws Exception { + String newCommitTime = "001"; + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + // Load to memory + HoodieWriteConfig config = getConfig(2); + HBaseIndex index = new HBaseIndex(config); + try (HoodieWriteClient writeClient = getWriteClient(config);) { + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + + // Test tagLocation without any entries in index + JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + + // Insert 200 records + writeClient.startCommitWithTime(newCommitTime); + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + + // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed + // commit + javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + + // Now commit this & update location of records inserted and validate no errors + writeClient.commit(newCommitTime, writeStatues); + // Now tagLocation for these records, hbaseIndex should tag them correctly + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); + assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null + && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); + } + } + + @Test + public void testDelete() throws Exception { + String newCommitTime = "001"; + List records = dataGen.generateInserts(newCommitTime, 10); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + // Load to memory + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config); + try (HoodieWriteClient writeClient = getWriteClient(config);) { + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + + // Test tagLocation without any entries in index + JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + + // Insert records + writeClient.startCommitWithTime(newCommitTime); + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + writeClient.commit(newCommitTime, writeStatues); + + // Now tagLocation for these records, hbaseIndex should tag them correctly + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); + assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null + && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); + + // Delete all records. This has to be done directly as deleting index entries + // is not implemented via HoodieWriteClient + Option recordMetadata = Option.empty(); + JavaRDD deleteWriteStatues = writeStatues.map(w -> { + WriteStatus newWriteStatus = new WriteStatus(true, 1.0); + w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), recordMetadata)); + assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords()); + newWriteStatus.setStat(new HoodieWriteStat()); + return newWriteStatus; + }); + JavaRDD deleteStatus = index.updateLocation(deleteWriteStatues, jsc, hoodieTable); + assertEquals(deleteStatus.count(), deleteWriteStatues.count()); + assertNoWriteErrors(deleteStatus.collect()); + + // Ensure no records can be tagged + javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assertEquals(0, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size()); + assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count()); + assertEquals(0, javaRDD.filter(record -> (record.getCurrentLocation() != null + && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count()); + } + } + + @Test + public void testFeatureSupport() throws Exception { + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config); + + assertTrue(index.canIndexLogFiles()); + try { + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + index.fetchRecordLocation(jsc.parallelize(new ArrayList(), 1), jsc, hoodieTable); + fail("HbaseIndex supports fetchRecordLocation"); + } catch (UnsupportedOperationException ex) { + // Expected so ignore + ex.getStackTrace(); + } + } + private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) { final WriteStatus writeStatus = new WriteStatus(false, 0.1); HoodieWriteStat hoodieWriteStat = new HoodieWriteStat(); @@ -406,10 +525,14 @@ public class TestHbaseIndex extends HoodieClientTestHarness { } private HoodieWriteConfig getConfig() { - return getConfigBuilder().build(); + return getConfigBuilder(100).build(); } - private HoodieWriteConfig.Builder getConfigBuilder() { + private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) { + return getConfigBuilder(hbaseIndexBatchSize).build(); + } + + private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(1, 1) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) @@ -419,8 +542,10 @@ public class TestHbaseIndex extends HoodieClientTestHarness { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE) .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder() .hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) + .hbaseIndexPutBatchSizeAutoCompute(true) + .hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", "")) .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName) - .hbaseIndexGetBatchSize(100).build()) + .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build()) .build()); } }