1
0

Adding global indexing to HbaseIndex implementation

- Adding tests or HbaseIndex
	- Enabling global index functionality
This commit is contained in:
Nishith Agarwal
2017-12-15 21:29:02 -08:00
committed by vinoth chandar
parent 15e669c60c
commit be0b1f3e57
5 changed files with 508 additions and 56 deletions

View File

@@ -191,6 +191,34 @@
<scope>test</scope>
</dependency>
</dependencies>
<!-- Hbase dependencies -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<version>1.2.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@@ -49,6 +49,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
public final static String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
public final static String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
public final static String DEFAULT_HBASE_BATCH_SIZE = "100";
// ***** Bucketed Index Configs *****
public final static String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets";
@@ -130,6 +133,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
public Builder hbaseIndexGetBatchSize(int getBatchSize) {
props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
return this;
}
public Builder hbaseIndexPutBatchSize(int putBatchSize) {
props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
return this;
}
public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP),
@@ -144,9 +157,13 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
BLOOM_INDEX_PRUNE_BY_RANGES_PROP, DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_USE_CACHING_PROP),
BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING);
setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP),
HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;
}
}
}
}

View File

@@ -237,6 +237,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP);
}
public int getHbaseIndexGetBatchSize() {
return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
}
public int getHbaseIndexPutBatchSize() {
return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
}
public int getBloomIndexParallelism() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
}

View File

@@ -18,6 +18,7 @@
package com.uber.hoodie.index.hbase;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieKey;
@@ -32,10 +33,6 @@ import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@@ -54,11 +51,16 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* Hoodie Index implementation backed by HBase
*/
public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
private final static byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s");
private final static byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
private final static byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
@@ -70,22 +72,24 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
public HBaseIndex(HoodieWriteConfig config, JavaSparkContext jsc) {
super(config, jsc);
this.tableName = config.getProps().getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP);
this.tableName = config.getHbaseTableName();
addShutDownHook();
}
@Override
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> table) {
throw new UnsupportedOperationException("HBase index does not implement check exist yet");
//TODO : Change/Remove filterExists in HoodieReadClient() and revisit
throw new UnsupportedOperationException("HBase index does not implement check exist");
}
private static Connection hbaseConnection = null;
private Connection getHBaseConnection() {
Configuration hbaseConfig = HBaseConfiguration.create();
String quorum = config.getProps().getProperty(HoodieIndexConfig.HBASE_ZKQUORUM_PROP);
String quorum = config.getHbaseZkQuorum();
hbaseConfig.set("hbase.zookeeper.quorum", quorum);
String port = config.getProps().getProperty(HoodieIndexConfig.HBASE_ZKPORT_PROP);
String port = String.valueOf(config.getHbaseZkPort());
hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
try {
return ConnectionFactory.createConnection(hbaseConfig);
@@ -95,24 +99,53 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
}
}
/**
* Since we are sharing the HbaseConnection across tasks in a JVM, make sure the HbaseConnectio is closed when
* JVM exits
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
hbaseConnection.close();
} catch(Exception e) {
// fail silently for any sort of exception
}
}
});
}
private Get generateStatement(String key) throws IOException {
return new Get(Bytes.toBytes(key)).setMaxVersions(1)
.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)
.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN);
}
private boolean checkIfValidCommit(HoodieTable<T> hoodieTable, String commitTs) {
HoodieTimeline commitTimeline = hoodieTable.getCompletedCommitTimeline();
// Check if the last commit ts for this row is 1) present in the timeline or
// 2) is less than the first commit ts in the timeline
return !commitTimeline.empty() && (commitTimeline.containsInstant(
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs)) ||
HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(),
commitTs, HoodieTimeline.GREATER));
}
/**
* Function that tags each HoodieRecord with an existing location, if known.
*/
class LocationTagFunction
implements Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> {
private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>
locationTagFunction(HoodieTable<T> hoodieTable) {
private final HoodieTable<T> hoodieTable;
return (Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>)
(partitionNum, hoodieRecordIterator) -> {
LocationTagFunction(HoodieTable<T> hoodieTable) {
this.hoodieTable = hoodieTable;
}
Integer multiGetBatchSize = config.getHbaseIndexGetBatchSize();
@Override
public Iterator<HoodieRecord<T>> call(Integer partitionNum,
Iterator<HoodieRecord<T>> hoodieRecordIterator) {
// Grab the global HBase connection
synchronized (HBaseIndex.class) {
if (hbaseConnection == null) {
if (hbaseConnection == null || hbaseConnection.isClosed()) {
hbaseConnection = getHBaseConnection();
}
}
@@ -120,31 +153,46 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
HTable hTable = null;
try {
hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
List<Get> statements = new ArrayList<>();
List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
// Do the tagging.
while (hoodieRecordIterator.hasNext()) {
HoodieRecord rec = hoodieRecordIterator.next();
// TODO(vc): This may need to be a multi get.
Result result = hTable.get(
new Get(Bytes.toBytes(rec.getRecordKey())).setMaxVersions(1)
.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)
.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
statements.add(generateStatement(rec.getRecordKey()));
currentBatchOfRecords.add(rec);
// iterator till we reach batch size
if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
// get results for batch from Hbase
Result[] results = hTable.get(statements);
// clear statements to be GC'd
statements.clear();
for (Result result : results) {
// first, attempt to grab location from HBase
HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
if (result.getRow() != null) {
String keyFromResult = Bytes.toString(result.getRow());
String commitTs =
Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
String fileId =
Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
String partitionPath =
Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
// first, attempt to grab location from HBase
if (result.getRow() != null) {
String commitTs =
Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
String fileId =
Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
HoodieTimeline commitTimeline = hoodieTable.getCompletedCommitTimeline();
// if the last commit ts for this row is less than the system commit ts
if (!commitTimeline.empty() && commitTimeline.containsInstant(
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs))) {
rec.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
if (checkIfValidCommit(hoodieTable, commitTs)) {
currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(),
partitionPath), currentRecord.getData());
currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
taggedRecords.add(currentRecord);
// the key from Result and the key being processed should be same
assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
} else { //if commit is invalid, treat this as a new taggedRecord
taggedRecords.add(currentRecord);
}
} else {
taggedRecords.add(currentRecord);
}
}
}
taggedRecords.add(rec);
}
} catch (IOException e) {
throw new HoodieIndexException(
@@ -160,25 +208,25 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
}
return taggedRecords.iterator();
}
};
}
@Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
HoodieTable<T> hoodieTable) {
return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(hoodieTable), true);
HoodieTable<T> hoodieTable) {
return recordRDD.mapPartitionsWithIndex(locationTagFunction(hoodieTable), true);
}
class UpdateLocationTask implements
Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> {
private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateLocationFunction() {
@Override
public Iterator<WriteStatus> call(Integer partition, Iterator<WriteStatus> statusIterator) {
return (Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>>) (partition, statusIterator) -> {
Integer multiPutBatchSize = config.getHbaseIndexPutBatchSize();
List<WriteStatus> writeStatusList = new ArrayList<>();
// Grab the global HBase connection
synchronized (HBaseIndex.class) {
if (hbaseConnection == null) {
if (hbaseConnection == null || hbaseConnection.isClosed()) {
hbaseConnection = getHBaseConnection();
}
}
@@ -194,6 +242,10 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
if (!writeStatus.isErrored(rec.getKey())) {
java.util.Optional<HoodieRecordLocation> loc = rec.getNewLocation();
if (loc.isPresent()) {
if (rec.getCurrentLocation() != null) {
// This is an update, no need to update index
continue;
}
Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN,
Bytes.toBytes(loc.get().getCommitTime()));
@@ -208,10 +260,13 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
deletes.add(delete);
}
}
if (puts.size() + deletes.size() < multiPutBatchSize) {
continue;
}
doPutsAndDeletes(hTable, puts, deletes);
}
hTable.put(puts);
hTable.delete(deletes);
hTable.flushCommits();
//process remaining puts and deletes, if any
doPutsAndDeletes(hTable, puts, deletes);
} catch (Exception e) {
Exception we = new Exception("Error updating index for " + writeStatus, e);
logger.error(we);
@@ -232,24 +287,43 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
}
}
return writeStatusList.iterator();
};
}
/**
* Helper method to facilitate performing puts and deletes in Hbase
* @param hTable
* @param puts
* @param deletes
* @throws IOException
*/
private void doPutsAndDeletes(HTable hTable, List<Put> puts, List<Delete> deletes) throws IOException {
if(puts.size() > 0) {
hTable.put(puts);
}
if(deletes.size() > 0) {
hTable.delete(deletes);
}
hTable.flushCommits();
puts.clear();
deletes.clear();
}
@Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTable<T> hoodieTable) {
return writeStatusRDD.mapPartitionsWithIndex(new UpdateLocationTask(), true);
HoodieTable<T> hoodieTable) {
return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
}
@Override
public boolean rollbackCommit(String commitTime) {
// Can't really rollback here. HBase only can let you go from recordKey to fileID,
// not the other way around
// Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
return true;
}
/**
* Only looks up by recordKey
* @return
*/
@Override
public boolean isGlobal() {
@@ -258,6 +332,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
/**
* Mapping is available in HBase already.
* @return
*/
@Override
public boolean canIndexLogFiles() {
@@ -266,9 +341,15 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
/**
* Index needs to be explicitly updated after storage write.
* @return
*/
@Override
public boolean isImplicitWithStorage() {
return false;
}
}
@VisibleForTesting
public void setHbaseConnection(Connection hbaseConnection) {
HBaseIndex.hbaseConnection = hbaseConnection;
}
}

View File

@@ -0,0 +1,318 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.index;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableConfig;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.hbase.HBaseIndex;
import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.MethodSorters;
import org.mockito.Mockito;
import scala.Tuple2;
import java.io.File;
import java.io.IOException;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.times;
/**
* Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown
* across tests, (see one problem here : https://issues.apache.org/jira/browse/HBASE-15835).
* Hence, the need to use MethodSorters.NAME_ASCENDING to make sure the tests run in order. Please alter
* the order of tests running carefully.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestHbaseIndex {
private static JavaSparkContext jsc = null;
private String basePath = null;
private transient FileSystem fs;
private static HBaseTestingUtility utility;
private static Configuration hbaseConfig;
private static String tableName = "test_table";
private HoodieTableMetaClient metaClient;
@AfterClass
public static void clean() throws Exception {
if (jsc != null) {
jsc.stop();
}
if (utility != null) {
utility.shutdownMiniCluster();
}
}
@After
public void clear() throws Exception {
if (basePath != null) {
new File(basePath).delete();
}
}
@Before
public void before() throws Exception {
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
// Initialize table
metaClient = HoodieTableMetaClient
.initTableType(utility.getTestFileSystem(), basePath, HoodieTableType.COPY_ON_WRITE,
tableName, HoodieTableConfig.DEFAULT_PAYLOAD_CLASS);
}
public TestHbaseIndex() throws Exception {
}
@BeforeClass
public static void init() throws Exception {
// Initialize HbaseMiniCluster
utility = new HBaseTestingUtility();
utility.startMiniCluster();
hbaseConfig = utility.getConnection().getConfiguration();
utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
// Initialize a local spark env
SparkConf sparkConf = new SparkConf().setAppName("TestHbaseIndex").setMaster("local[1]");
jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().addResource(utility.getConfiguration());
}
@Test
public void testSimpleTagLocationAndUpdate() throws Exception {
String newCommitTime = "001";
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config, jsc);
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
writeClient.startCommit();
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config);
// Test tagLocation without any entries in index
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, hoodieTable);
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
// Insert 200 records
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());
// Now tagLocation for these records, hbaseIndex should not tag them since it was a failed commit
javaRDD = index.tagLocation(writeRecords, hoodieTable);
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
// Now commit this & update location of records inserted and validate no errors
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them correctly
javaRDD = index.tagLocation(writeRecords, hoodieTable);
assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200);
assertTrue(javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count() == 200);
assertTrue(javaRDD.filter(record -> (record.getCurrentLocation() != null
&& record.getCurrentLocation().getCommitTime().equals(newCommitTime))).distinct().count() == 200);
}
@Test
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// Load to memory
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config, jsc);
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config);
// Insert 200 records
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());
// commit this upsert
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, hoodieTable);
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200);
// check tagged records are tagged with correct fileIds
List<String> fileIds = writeStatues.map(status -> status.getFileId()).collect();
assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0);
List<String> taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect();
// both lists should match
assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds));
// Rollback the last commit
writeClient.rollback(newCommitTime);
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled back commit
javaRDD = index.tagLocation(writeRecords, hoodieTable);
assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
}
@Test
public void testTotalGetsBatching() throws Exception {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config, jsc);
// Mock hbaseConnection and related entities
Connection hbaseConnection = Mockito.mock(Connection.class);
HTable table = Mockito.mock(HTable.class);
Mockito.when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
Mockito.when(table.get((List<Get>) anyObject())).thenReturn(new Result[0]);
// only for test, set the hbaseConnection to mocked object
index.setHbaseConnection(hbaseConnection);
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
// start a commit and generate test data
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config);
// Insert 250 records
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());
// Now tagLocation for these records, hbaseIndex should tag them
index.tagLocation(writeRecords, hoodieTable);
// 3 batches should be executed given batchSize = 100 and parallelism = 1
Mockito.verify(table, times(3)).get((List<Get>) anyObject());
}
@Test
public void testTotalPutsBatching() throws Exception {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config, jsc);
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
// start a commit and generate test data
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config);
// Insert 200 records
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
// commit this upsert
writeClient.commit(newCommitTime, writeStatues);
// Mock hbaseConnection and related entities
Connection hbaseConnection = Mockito.mock(Connection.class);
HTable table = Mockito.mock(HTable.class);
Mockito.when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
Mockito.when(table.get((List<Get>) anyObject())).thenReturn(new Result[0]);
// only for test, set the hbaseConnection to mocked object
index.setHbaseConnection(hbaseConnection);
// Get all the files generated
int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count();
index.updateLocation(writeStatues, hoodieTable);
// 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated,
// so each fileId ideally gets updates
Mockito.verify(table, atMost(numberOfDataFileIds)).put((List<Put>) anyObject());
}
private void assertNoWriteErrors(List<WriteStatus> statuses) {
// Verify there are no errors
for (WriteStatus status : statuses) {
assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
}
}
private HoodieWriteConfig getConfig() {
return getConfigBuilder().build();
}
private HoodieWriteConfig.Builder getConfigBuilder() {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(1, 1)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build())
.withAutoCommit(false)
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
.forTable("test-trip-table").withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
.hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
.hbaseIndexGetBatchSize(100).hbaseIndexPutBatchSize(100).build());
}
}