diff --git a/hoodie-client/pom.xml b/hoodie-client/pom.xml
index a3b1ec640..d082f2f60 100644
--- a/hoodie-client/pom.xml
+++ b/hoodie-client/pom.xml
@@ -191,6 +191,34 @@
test
-
+
+
+ org.apache.hbase
+ hbase-client
+ 1.2.3
+
+
+ org.htrace
+ htrace-core
+ 3.0.4
+
+
+
+ org.apache.hbase
+ hbase-testing-util
+ 1.2.3
+ test
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+
+
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
index a7a722de1..a84c57b11 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java
@@ -49,6 +49,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
+ public final static String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
+ public final static String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
+ public final static String DEFAULT_HBASE_BATCH_SIZE = "100";
// ***** Bucketed Index Configs *****
public final static String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets";
@@ -130,6 +133,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder hbaseIndexGetBatchSize(int getBatchSize) {
+ props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
+ return this;
+ }
+
+ public Builder hbaseIndexPutBatchSize(int putBatchSize) {
+ props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
+ return this;
+ }
+
public HoodieIndexConfig build() {
HoodieIndexConfig config = new HoodieIndexConfig(props);
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP),
@@ -144,9 +157,13 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
BLOOM_INDEX_PRUNE_BY_RANGES_PROP, DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_USE_CACHING_PROP),
BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING);
+ setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP),
+ HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
+ setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP),
+ HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE));
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;
}
}
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index b6d5f766b..9bbc8fa70 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -237,6 +237,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP);
}
+ public int getHbaseIndexGetBatchSize() {
+ return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_GET_BATCH_SIZE_PROP));
+ }
+
+ public int getHbaseIndexPutBatchSize() {
+ return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_PROP));
+ }
+
public int getBloomIndexParallelism() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
index 5d50ff646..e2542925c 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
@@ -18,6 +18,7 @@
package com.uber.hoodie.index.hbase;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieKey;
@@ -32,10 +33,6 @@ import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@@ -54,11 +51,16 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
/**
* Hoodie Index implementation backed by HBase
*/
public class HBaseIndex extends HoodieIndex {
-
private final static byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s");
private final static byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
private final static byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
@@ -70,22 +72,24 @@ public class HBaseIndex extends HoodieIndex {
public HBaseIndex(HoodieWriteConfig config, JavaSparkContext jsc) {
super(config, jsc);
- this.tableName = config.getProps().getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP);
+ this.tableName = config.getHbaseTableName();
+ addShutDownHook();
}
@Override
public JavaPairRDD> fetchRecordLocation(
JavaRDD hoodieKeys, HoodieTable table) {
- throw new UnsupportedOperationException("HBase index does not implement check exist yet");
+ //TODO : Change/Remove filterExists in HoodieReadClient() and revisit
+ throw new UnsupportedOperationException("HBase index does not implement check exist");
}
private static Connection hbaseConnection = null;
private Connection getHBaseConnection() {
Configuration hbaseConfig = HBaseConfiguration.create();
- String quorum = config.getProps().getProperty(HoodieIndexConfig.HBASE_ZKQUORUM_PROP);
+ String quorum = config.getHbaseZkQuorum();
hbaseConfig.set("hbase.zookeeper.quorum", quorum);
- String port = config.getProps().getProperty(HoodieIndexConfig.HBASE_ZKPORT_PROP);
+ String port = String.valueOf(config.getHbaseZkPort());
hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
try {
return ConnectionFactory.createConnection(hbaseConfig);
@@ -95,24 +99,53 @@ public class HBaseIndex extends HoodieIndex {
}
}
+ /**
+ * Since we are sharing the HbaseConnection across tasks in a JVM, make sure the HbaseConnectio is closed when
+ * JVM exits
+ */
+ private void addShutDownHook() {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ hbaseConnection.close();
+ } catch(Exception e) {
+ // fail silently for any sort of exception
+ }
+ }
+ });
+ }
+
+ private Get generateStatement(String key) throws IOException {
+ return new Get(Bytes.toBytes(key)).setMaxVersions(1)
+ .addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
+ .addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)
+ .addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN);
+ }
+
+ private boolean checkIfValidCommit(HoodieTable hoodieTable, String commitTs) {
+ HoodieTimeline commitTimeline = hoodieTable.getCompletedCommitTimeline();
+ // Check if the last commit ts for this row is 1) present in the timeline or
+ // 2) is less than the first commit ts in the timeline
+ return !commitTimeline.empty() && (commitTimeline.containsInstant(
+ new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs)) ||
+ HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(),
+ commitTs, HoodieTimeline.GREATER));
+ }
+
/**
* Function that tags each HoodieRecord with an existing location, if known.
*/
- class LocationTagFunction
- implements Function2>, Iterator>> {
+ private Function2>, Iterator>>
+ locationTagFunction(HoodieTable hoodieTable) {
- private final HoodieTable hoodieTable;
+ return (Function2>, Iterator>>)
+ (partitionNum, hoodieRecordIterator) -> {
- LocationTagFunction(HoodieTable hoodieTable) {
- this.hoodieTable = hoodieTable;
- }
+ Integer multiGetBatchSize = config.getHbaseIndexGetBatchSize();
- @Override
- public Iterator> call(Integer partitionNum,
- Iterator> hoodieRecordIterator) {
// Grab the global HBase connection
synchronized (HBaseIndex.class) {
- if (hbaseConnection == null) {
+ if (hbaseConnection == null || hbaseConnection.isClosed()) {
hbaseConnection = getHBaseConnection();
}
}
@@ -120,31 +153,46 @@ public class HBaseIndex extends HoodieIndex {
HTable hTable = null;
try {
hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
+ List statements = new ArrayList<>();
+ List currentBatchOfRecords = new LinkedList<>();
// Do the tagging.
while (hoodieRecordIterator.hasNext()) {
HoodieRecord rec = hoodieRecordIterator.next();
- // TODO(vc): This may need to be a multi get.
- Result result = hTable.get(
- new Get(Bytes.toBytes(rec.getRecordKey())).setMaxVersions(1)
- .addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
- .addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)
- .addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
+ statements.add(generateStatement(rec.getRecordKey()));
+ currentBatchOfRecords.add(rec);
+ // iterator till we reach batch size
+ if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
+ // get results for batch from Hbase
+ Result[] results = hTable.get(statements);
+ // clear statements to be GC'd
+ statements.clear();
+ for (Result result : results) {
+ // first, attempt to grab location from HBase
+ HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
+ if (result.getRow() != null) {
+ String keyFromResult = Bytes.toString(result.getRow());
+ String commitTs =
+ Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
+ String fileId =
+ Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
+ String partitionPath =
+ Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
- // first, attempt to grab location from HBase
- if (result.getRow() != null) {
- String commitTs =
- Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
- String fileId =
- Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
-
- HoodieTimeline commitTimeline = hoodieTable.getCompletedCommitTimeline();
- // if the last commit ts for this row is less than the system commit ts
- if (!commitTimeline.empty() && commitTimeline.containsInstant(
- new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs))) {
- rec.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
+ if (checkIfValidCommit(hoodieTable, commitTs)) {
+ currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(),
+ partitionPath), currentRecord.getData());
+ currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
+ taggedRecords.add(currentRecord);
+ // the key from Result and the key being processed should be same
+ assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
+ } else { //if commit is invalid, treat this as a new taggedRecord
+ taggedRecords.add(currentRecord);
+ }
+ } else {
+ taggedRecords.add(currentRecord);
+ }
}
}
- taggedRecords.add(rec);
}
} catch (IOException e) {
throw new HoodieIndexException(
@@ -160,25 +208,25 @@ public class HBaseIndex extends HoodieIndex {
}
return taggedRecords.iterator();
- }
+ };
}
@Override
public JavaRDD> tagLocation(JavaRDD> recordRDD,
- HoodieTable hoodieTable) {
- return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(hoodieTable), true);
+ HoodieTable hoodieTable) {
+ return recordRDD.mapPartitionsWithIndex(locationTagFunction(hoodieTable), true);
}
- class UpdateLocationTask implements
- Function2, Iterator> {
+ private Function2, Iterator> updateLocationFunction() {
- @Override
- public Iterator call(Integer partition, Iterator statusIterator) {
+ return (Function2, Iterator>) (partition, statusIterator) -> {
+
+ Integer multiPutBatchSize = config.getHbaseIndexPutBatchSize();
List writeStatusList = new ArrayList<>();
// Grab the global HBase connection
synchronized (HBaseIndex.class) {
- if (hbaseConnection == null) {
+ if (hbaseConnection == null || hbaseConnection.isClosed()) {
hbaseConnection = getHBaseConnection();
}
}
@@ -194,6 +242,10 @@ public class HBaseIndex extends HoodieIndex {
if (!writeStatus.isErrored(rec.getKey())) {
java.util.Optional loc = rec.getNewLocation();
if (loc.isPresent()) {
+ if (rec.getCurrentLocation() != null) {
+ // This is an update, no need to update index
+ continue;
+ }
Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN,
Bytes.toBytes(loc.get().getCommitTime()));
@@ -208,10 +260,13 @@ public class HBaseIndex extends HoodieIndex {
deletes.add(delete);
}
}
+ if (puts.size() + deletes.size() < multiPutBatchSize) {
+ continue;
+ }
+ doPutsAndDeletes(hTable, puts, deletes);
}
- hTable.put(puts);
- hTable.delete(deletes);
- hTable.flushCommits();
+ //process remaining puts and deletes, if any
+ doPutsAndDeletes(hTable, puts, deletes);
} catch (Exception e) {
Exception we = new Exception("Error updating index for " + writeStatus, e);
logger.error(we);
@@ -232,24 +287,43 @@ public class HBaseIndex extends HoodieIndex {
}
}
return writeStatusList.iterator();
+ };
+ }
+
+ /**
+ * Helper method to facilitate performing puts and deletes in Hbase
+ * @param hTable
+ * @param puts
+ * @param deletes
+ * @throws IOException
+ */
+ private void doPutsAndDeletes(HTable hTable, List puts, List deletes) throws IOException {
+ if(puts.size() > 0) {
+ hTable.put(puts);
}
+ if(deletes.size() > 0) {
+ hTable.delete(deletes);
+ }
+ hTable.flushCommits();
+ puts.clear();
+ deletes.clear();
}
@Override
public JavaRDD updateLocation(JavaRDD writeStatusRDD,
- HoodieTable hoodieTable) {
- return writeStatusRDD.mapPartitionsWithIndex(new UpdateLocationTask(), true);
+ HoodieTable hoodieTable) {
+ return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
}
@Override
public boolean rollbackCommit(String commitTime) {
- // Can't really rollback here. HBase only can let you go from recordKey to fileID,
- // not the other way around
+ // Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
return true;
}
/**
* Only looks up by recordKey
+ * @return
*/
@Override
public boolean isGlobal() {
@@ -258,6 +332,7 @@ public class HBaseIndex extends HoodieIndex {
/**
* Mapping is available in HBase already.
+ * @return
*/
@Override
public boolean canIndexLogFiles() {
@@ -266,9 +341,15 @@ public class HBaseIndex extends HoodieIndex {
/**
* Index needs to be explicitly updated after storage write.
+ * @return
*/
@Override
public boolean isImplicitWithStorage() {
return false;
}
-}
+
+ @VisibleForTesting
+ public void setHbaseConnection(Connection hbaseConnection) {
+ HBaseIndex.hbaseConnection = hbaseConnection;
+ }
+}
\ No newline at end of file
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
new file mode 100644
index 000000000..990e62d61
--- /dev/null
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
@@ -0,0 +1,318 @@
+/*
+ * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.index;
+
+import com.uber.hoodie.HoodieWriteClient;
+import com.uber.hoodie.WriteStatus;
+import com.uber.hoodie.common.HoodieTestDataGenerator;
+import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.model.HoodieTableType;
+import com.uber.hoodie.common.table.HoodieTableConfig;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.table.TableFileSystemView;
+import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
+import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.config.HoodieCompactionConfig;
+import com.uber.hoodie.config.HoodieIndexConfig;
+import com.uber.hoodie.config.HoodieStorageConfig;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.index.hbase.HBaseIndex;
+import com.uber.hoodie.table.HoodieTable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.MethodSorters;
+import org.mockito.Mockito;
+import scala.Tuple2;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.times;
+
+/**
+ * Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown
+ * across tests, (see one problem here : https://issues.apache.org/jira/browse/HBASE-15835).
+ * Hence, the need to use MethodSorters.NAME_ASCENDING to make sure the tests run in order. Please alter
+ * the order of tests running carefully.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestHbaseIndex {
+
+ private static JavaSparkContext jsc = null;
+ private String basePath = null;
+ private transient FileSystem fs;
+ private static HBaseTestingUtility utility;
+ private static Configuration hbaseConfig;
+ private static String tableName = "test_table";
+ private HoodieTableMetaClient metaClient;
+
+ @AfterClass
+ public static void clean() throws Exception {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ if (utility != null) {
+ utility.shutdownMiniCluster();
+ }
+ }
+
+ @After
+ public void clear() throws Exception {
+ if (basePath != null) {
+ new File(basePath).delete();
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ // Create a temp folder as the base path
+ TemporaryFolder folder = new TemporaryFolder();
+ folder.create();
+ basePath = folder.getRoot().getAbsolutePath();
+ // Initialize table
+ metaClient = HoodieTableMetaClient
+ .initTableType(utility.getTestFileSystem(), basePath, HoodieTableType.COPY_ON_WRITE,
+ tableName, HoodieTableConfig.DEFAULT_PAYLOAD_CLASS);
+ }
+
+ public TestHbaseIndex() throws Exception {
+ }
+
+ @BeforeClass
+ public static void init() throws Exception {
+
+ // Initialize HbaseMiniCluster
+ utility = new HBaseTestingUtility();
+ utility.startMiniCluster();
+ hbaseConfig = utility.getConnection().getConfiguration();
+ utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
+ // Initialize a local spark env
+ SparkConf sparkConf = new SparkConf().setAppName("TestHbaseIndex").setMaster("local[1]");
+ jsc = new JavaSparkContext(sparkConf);
+ jsc.hadoopConfiguration().addResource(utility.getConfiguration());
+ }
+
+ @Test
+ public void testSimpleTagLocationAndUpdate() throws Exception {
+
+ String newCommitTime = "001";
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ List records = dataGen.generateInserts(newCommitTime, 200);
+ JavaRDD writeRecords = jsc.parallelize(records, 1);
+
+ // Load to memory
+ HoodieWriteConfig config = getConfig();
+ HBaseIndex index = new HBaseIndex(config, jsc);
+ HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+ writeClient.startCommit();
+ HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config);
+
+ // Test tagLocation without any entries in index
+ JavaRDD javaRDD = index.tagLocation(writeRecords, hoodieTable);
+ assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+
+ // Insert 200 records
+ JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+ assertNoWriteErrors(writeStatues.collect());
+
+ // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed commit
+ javaRDD = index.tagLocation(writeRecords, hoodieTable);
+ assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+
+ // Now commit this & update location of records inserted and validate no errors
+ writeClient.commit(newCommitTime, writeStatues);
+
+ // Now tagLocation for these records, hbaseIndex should tag them correctly
+ javaRDD = index.tagLocation(writeRecords, hoodieTable);
+ assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200);
+ assertTrue(javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count() == 200);
+ assertTrue(javaRDD.filter(record -> (record.getCurrentLocation() != null
+ && record.getCurrentLocation().getCommitTime().equals(newCommitTime))).distinct().count() == 200);
+
+ }
+
+ @Test
+ public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ // Load to memory
+ HoodieWriteConfig config = getConfig();
+ HBaseIndex index = new HBaseIndex(config, jsc);
+ HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+
+ String newCommitTime = writeClient.startCommit();
+ List records = dataGen.generateInserts(newCommitTime, 200);
+ JavaRDD writeRecords = jsc.parallelize(records, 1);
+
+ HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config);
+
+ // Insert 200 records
+ JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+ assertNoWriteErrors(writeStatues.collect());
+
+ // commit this upsert
+ writeClient.commit(newCommitTime, writeStatues);
+
+ // Now tagLocation for these records, hbaseIndex should tag them
+ JavaRDD javaRDD = index.tagLocation(writeRecords, hoodieTable);
+ assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200);
+
+ // check tagged records are tagged with correct fileIds
+ List fileIds = writeStatues.map(status -> status.getFileId()).collect();
+ assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0);
+ List taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect();
+
+ // both lists should match
+ assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds));
+ // Rollback the last commit
+ writeClient.rollback(newCommitTime);
+
+ // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled back commit
+ javaRDD = index.tagLocation(writeRecords, hoodieTable);
+ assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+ assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
+ }
+
+ @Test
+ public void testTotalGetsBatching() throws Exception {
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ HoodieWriteConfig config = getConfig();
+ HBaseIndex index = new HBaseIndex(config, jsc);
+
+ // Mock hbaseConnection and related entities
+ Connection hbaseConnection = Mockito.mock(Connection.class);
+ HTable table = Mockito.mock(HTable.class);
+ Mockito.when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
+ Mockito.when(table.get((List) anyObject())).thenReturn(new Result[0]);
+
+ // only for test, set the hbaseConnection to mocked object
+ index.setHbaseConnection(hbaseConnection);
+
+ HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+
+ // start a commit and generate test data
+ String newCommitTime = writeClient.startCommit();
+ List records = dataGen.generateInserts(newCommitTime, 250);
+ JavaRDD writeRecords = jsc.parallelize(records, 1);
+
+ HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config);
+
+ // Insert 250 records
+ JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+ assertNoWriteErrors(writeStatues.collect());
+
+ // Now tagLocation for these records, hbaseIndex should tag them
+ index.tagLocation(writeRecords, hoodieTable);
+
+ // 3 batches should be executed given batchSize = 100 and parallelism = 1
+ Mockito.verify(table, times(3)).get((List) anyObject());
+
+ }
+
+ @Test
+ public void testTotalPutsBatching() throws Exception {
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ HoodieWriteConfig config = getConfig();
+ HBaseIndex index = new HBaseIndex(config, jsc);
+ HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+
+ // start a commit and generate test data
+ String newCommitTime = writeClient.startCommit();
+ List records = dataGen.generateInserts(newCommitTime, 250);
+ JavaRDD writeRecords = jsc.parallelize(records, 1);
+
+ HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config);
+
+ // Insert 200 records
+ JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+
+ // commit this upsert
+ writeClient.commit(newCommitTime, writeStatues);
+
+ // Mock hbaseConnection and related entities
+ Connection hbaseConnection = Mockito.mock(Connection.class);
+ HTable table = Mockito.mock(HTable.class);
+ Mockito.when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
+ Mockito.when(table.get((List) anyObject())).thenReturn(new Result[0]);
+
+ // only for test, set the hbaseConnection to mocked object
+ index.setHbaseConnection(hbaseConnection);
+
+ // Get all the files generated
+ int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count();
+
+ index.updateLocation(writeStatues, hoodieTable);
+ // 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated,
+ // so each fileId ideally gets updates
+ Mockito.verify(table, atMost(numberOfDataFileIds)).put((List) anyObject());
+ }
+
+ private void assertNoWriteErrors(List statuses) {
+ // Verify there are no errors
+ for (WriteStatus status : statuses) {
+ assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
+ }
+ }
+
+ private HoodieWriteConfig getConfig() {
+ return getConfigBuilder().build();
+ }
+
+ private HoodieWriteConfig.Builder getConfigBuilder() {
+ return HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(1, 1)
+ .withCompactionConfig(
+ HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
+ .withInlineCompaction(false).build())
+ .withAutoCommit(false)
+ .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+ .forTable("test-trip-table").withIndexConfig(
+ HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
+ .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
+ .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
+ .hbaseIndexGetBatchSize(100).hbaseIndexPutBatchSize(100).build());
+ }
+}
\ No newline at end of file