[HUDI-671] Added unit-test for HBaseIndex (#1381)
This commit is contained in:
@@ -22,9 +22,11 @@ import org.apache.hudi.common.HoodieClientTestHarness;
|
||||
import org.apache.hudi.client.HoodieWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
||||
import org.apache.hudi.config.HoodieIndexConfig;
|
||||
@@ -38,6 +40,7 @@ import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
@@ -57,6 +60,7 @@ import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
@@ -65,6 +69,7 @@ import scala.Tuple2;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.atMost;
|
||||
import static org.mockito.Mockito.times;
|
||||
@@ -93,7 +98,10 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
// Initialize HbaseMiniCluster
|
||||
utility = new HBaseTestingUtility();
|
||||
hbaseConfig = HBaseConfiguration.create();
|
||||
hbaseConfig.set("zookeeper.znode.parent", "/hudi-hbase-test");
|
||||
|
||||
utility = new HBaseTestingUtility(hbaseConfig);
|
||||
utility.startMiniCluster();
|
||||
hbaseConfig = utility.getConnection().getConfiguration();
|
||||
utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
|
||||
@@ -389,6 +397,117 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
hBaseIndexQPSResourceAllocator.acquireQPSResources(config.getHbaseIndexQPSFraction(), 100), 0.0f);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallBatchSize() throws Exception {
|
||||
String newCommitTime = "001";
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfig(2);
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Test tagLocation without any entries in index
|
||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||
|
||||
// Insert 200 records
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||
assertNoWriteErrors(writeStatues.collect());
|
||||
|
||||
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
|
||||
// commit
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||
|
||||
// Now commit this & update location of records inserted and validate no errors
|
||||
writeClient.commit(newCommitTime, writeStatues);
|
||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||
assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(200, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete() throws Exception {
|
||||
String newCommitTime = "001";
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
// Load to memory
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
|
||||
// Test tagLocation without any entries in index
|
||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
|
||||
|
||||
// Insert records
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||
assertNoWriteErrors(writeStatues.collect());
|
||||
writeClient.commit(newCommitTime, writeStatues);
|
||||
|
||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(10, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||
|
||||
// Delete all records. This has to be done directly as deleting index entries
|
||||
// is not implemented via HoodieWriteClient
|
||||
Option recordMetadata = Option.empty();
|
||||
JavaRDD<WriteStatus> deleteWriteStatues = writeStatues.map(w -> {
|
||||
WriteStatus newWriteStatus = new WriteStatus(true, 1.0);
|
||||
w.getWrittenRecords().forEach(r -> newWriteStatus.markSuccess(new HoodieRecord(r.getKey(), null), recordMetadata));
|
||||
assertEquals(w.getTotalRecords(), newWriteStatus.getTotalRecords());
|
||||
newWriteStatus.setStat(new HoodieWriteStat());
|
||||
return newWriteStatus;
|
||||
});
|
||||
JavaRDD<WriteStatus> deleteStatus = index.updateLocation(deleteWriteStatues, jsc, hoodieTable);
|
||||
assertEquals(deleteStatus.count(), deleteWriteStatues.count());
|
||||
assertNoWriteErrors(deleteStatus.collect());
|
||||
|
||||
// Ensure no records can be tagged
|
||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||
assertEquals(0, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||
assertEquals(0, javaRDD.filter(record -> (record.getCurrentLocation() != null
|
||||
&& record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFeatureSupport() throws Exception {
|
||||
HoodieWriteConfig config = getConfig();
|
||||
HBaseIndex index = new HBaseIndex(config);
|
||||
|
||||
assertTrue(index.canIndexLogFiles());
|
||||
try {
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
|
||||
index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(), 1), jsc, hoodieTable);
|
||||
fail("HbaseIndex supports fetchRecordLocation");
|
||||
} catch (UnsupportedOperationException ex) {
|
||||
// Expected so ignore
|
||||
ex.getStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private WriteStatus getSampleWriteStatus(final int numInserts, final int numUpdateWrites) {
|
||||
final WriteStatus writeStatus = new WriteStatus(false, 0.1);
|
||||
HoodieWriteStat hoodieWriteStat = new HoodieWriteStat();
|
||||
@@ -406,10 +525,14 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
}
|
||||
|
||||
private HoodieWriteConfig getConfig() {
|
||||
return getConfigBuilder().build();
|
||||
return getConfigBuilder(100).build();
|
||||
}
|
||||
|
||||
private HoodieWriteConfig.Builder getConfigBuilder() {
|
||||
private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
|
||||
return getConfigBuilder(hbaseIndexBatchSize).build();
|
||||
}
|
||||
|
||||
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
|
||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(1, 1)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
|
||||
@@ -419,8 +542,10 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
|
||||
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder()
|
||||
.hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
|
||||
.hbaseIndexPutBatchSizeAutoCompute(true)
|
||||
.hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
|
||||
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
|
||||
.hbaseIndexGetBatchSize(100).build())
|
||||
.hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user