From be0b1f3e571a803418fa4f182788403e41aed306 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Fri, 15 Dec 2017 21:29:02 -0800 Subject: [PATCH] Adding global indexing to HbaseIndex implementation - Adding tests or HbaseIndex - Enabling global index functionality --- hoodie-client/pom.xml | 30 +- .../uber/hoodie/config/HoodieIndexConfig.java | 19 +- .../uber/hoodie/config/HoodieWriteConfig.java | 8 + .../uber/hoodie/index/hbase/HBaseIndex.java | 189 ++++++++--- .../com/uber/hoodie/index/TestHbaseIndex.java | 318 ++++++++++++++++++ 5 files changed, 508 insertions(+), 56 deletions(-) create mode 100644 hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java diff --git a/hoodie-client/pom.xml b/hoodie-client/pom.xml index a3b1ec640..d082f2f60 100644 --- a/hoodie-client/pom.xml +++ b/hoodie-client/pom.xml @@ -191,6 +191,34 @@ test - + + + org.apache.hbase + hbase-client + 1.2.3 + + + org.htrace + htrace-core + 3.0.4 + + + + org.apache.hbase + hbase-testing-util + 1.2.3 + test + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-core-asl + + + + diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index a7a722de1..a84c57b11 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -49,6 +49,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; public final static String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport"; public final static String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table"; + public final static String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size"; + public final static String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size"; + public final static String DEFAULT_HBASE_BATCH_SIZE = "100"; // ***** Bucketed Index Configs ***** public final static String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets"; @@ -130,6 +133,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder hbaseIndexGetBatchSize(int getBatchSize) { + props.setProperty(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize)); + return this; + } + + public Builder hbaseIndexPutBatchSize(int putBatchSize) { + props.setProperty(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize)); + return this; + } + public HoodieIndexConfig build() { HoodieIndexConfig config = new HoodieIndexConfig(props); setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), @@ -144,9 +157,13 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { BLOOM_INDEX_PRUNE_BY_RANGES_PROP, DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_USE_CACHING_PROP), BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING); + setDefaultOnCondition(props, !props.containsKey(HBASE_GET_BATCH_SIZE_PROP), + HBASE_GET_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); + setDefaultOnCondition(props, !props.containsKey(HBASE_PUT_BATCH_SIZE_PROP), + HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(DEFAULT_HBASE_BATCH_SIZE)); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); return config; } } -} +} \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index b6d5f766b..9bbc8fa70 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -237,6 +237,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return props.getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP); } + public int getHbaseIndexGetBatchSize() { + return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_GET_BATCH_SIZE_PROP)); + } + + public int getHbaseIndexPutBatchSize() { + return Integer.valueOf(props.getProperty(HoodieIndexConfig.HBASE_PUT_BATCH_SIZE_PROP)); + } + public int getBloomIndexParallelism() { return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP)); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java index 5d50ff646..e2542925c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java @@ -18,6 +18,7 @@ package com.uber.hoodie.index.hbase; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieKey; @@ -32,10 +33,6 @@ import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException; import com.uber.hoodie.exception.HoodieIndexException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; @@ -54,11 +51,16 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + /** * Hoodie Index implementation backed by HBase */ public class HBaseIndex extends HoodieIndex { - private final static byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s"); private final static byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts"); private final static byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name"); @@ -70,22 +72,24 @@ public class HBaseIndex extends HoodieIndex { public HBaseIndex(HoodieWriteConfig config, JavaSparkContext jsc) { super(config, jsc); - this.tableName = config.getProps().getProperty(HoodieIndexConfig.HBASE_TABLENAME_PROP); + this.tableName = config.getHbaseTableName(); + addShutDownHook(); } @Override public JavaPairRDD> fetchRecordLocation( JavaRDD hoodieKeys, HoodieTable table) { - throw new UnsupportedOperationException("HBase index does not implement check exist yet"); + //TODO : Change/Remove filterExists in HoodieReadClient() and revisit + throw new UnsupportedOperationException("HBase index does not implement check exist"); } private static Connection hbaseConnection = null; private Connection getHBaseConnection() { Configuration hbaseConfig = HBaseConfiguration.create(); - String quorum = config.getProps().getProperty(HoodieIndexConfig.HBASE_ZKQUORUM_PROP); + String quorum = config.getHbaseZkQuorum(); hbaseConfig.set("hbase.zookeeper.quorum", quorum); - String port = config.getProps().getProperty(HoodieIndexConfig.HBASE_ZKPORT_PROP); + String port = String.valueOf(config.getHbaseZkPort()); hbaseConfig.set("hbase.zookeeper.property.clientPort", port); try { return ConnectionFactory.createConnection(hbaseConfig); @@ -95,24 +99,53 @@ public class HBaseIndex extends HoodieIndex { } } + /** + * Since we are sharing the HbaseConnection across tasks in a JVM, make sure the HbaseConnectio is closed when + * JVM exits + */ + private void addShutDownHook() { + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + try { + hbaseConnection.close(); + } catch(Exception e) { + // fail silently for any sort of exception + } + } + }); + } + + private Get generateStatement(String key) throws IOException { + return new Get(Bytes.toBytes(key)).setMaxVersions(1) + .addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN) + .addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN) + .addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN); + } + + private boolean checkIfValidCommit(HoodieTable hoodieTable, String commitTs) { + HoodieTimeline commitTimeline = hoodieTable.getCompletedCommitTimeline(); + // Check if the last commit ts for this row is 1) present in the timeline or + // 2) is less than the first commit ts in the timeline + return !commitTimeline.empty() && (commitTimeline.containsInstant( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs)) || + HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), + commitTs, HoodieTimeline.GREATER)); + } + /** * Function that tags each HoodieRecord with an existing location, if known. */ - class LocationTagFunction - implements Function2>, Iterator>> { + private Function2>, Iterator>> + locationTagFunction(HoodieTable hoodieTable) { - private final HoodieTable hoodieTable; + return (Function2>, Iterator>>) + (partitionNum, hoodieRecordIterator) -> { - LocationTagFunction(HoodieTable hoodieTable) { - this.hoodieTable = hoodieTable; - } + Integer multiGetBatchSize = config.getHbaseIndexGetBatchSize(); - @Override - public Iterator> call(Integer partitionNum, - Iterator> hoodieRecordIterator) { // Grab the global HBase connection synchronized (HBaseIndex.class) { - if (hbaseConnection == null) { + if (hbaseConnection == null || hbaseConnection.isClosed()) { hbaseConnection = getHBaseConnection(); } } @@ -120,31 +153,46 @@ public class HBaseIndex extends HoodieIndex { HTable hTable = null; try { hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName)); + List statements = new ArrayList<>(); + List currentBatchOfRecords = new LinkedList<>(); // Do the tagging. while (hoodieRecordIterator.hasNext()) { HoodieRecord rec = hoodieRecordIterator.next(); - // TODO(vc): This may need to be a multi get. - Result result = hTable.get( - new Get(Bytes.toBytes(rec.getRecordKey())).setMaxVersions(1) - .addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN) - .addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN) - .addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); + statements.add(generateStatement(rec.getRecordKey())); + currentBatchOfRecords.add(rec); + // iterator till we reach batch size + if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) { + // get results for batch from Hbase + Result[] results = hTable.get(statements); + // clear statements to be GC'd + statements.clear(); + for (Result result : results) { + // first, attempt to grab location from HBase + HoodieRecord currentRecord = currentBatchOfRecords.remove(0); + if (result.getRow() != null) { + String keyFromResult = Bytes.toString(result.getRow()); + String commitTs = + Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)); + String fileId = + Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)); + String partitionPath = + Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); - // first, attempt to grab location from HBase - if (result.getRow() != null) { - String commitTs = - Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)); - String fileId = - Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)); - - HoodieTimeline commitTimeline = hoodieTable.getCompletedCommitTimeline(); - // if the last commit ts for this row is less than the system commit ts - if (!commitTimeline.empty() && commitTimeline.containsInstant( - new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs))) { - rec.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId)); + if (checkIfValidCommit(hoodieTable, commitTs)) { + currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), + partitionPath), currentRecord.getData()); + currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId)); + taggedRecords.add(currentRecord); + // the key from Result and the key being processed should be same + assert (currentRecord.getRecordKey().contentEquals(keyFromResult)); + } else { //if commit is invalid, treat this as a new taggedRecord + taggedRecords.add(currentRecord); + } + } else { + taggedRecords.add(currentRecord); + } } } - taggedRecords.add(rec); } } catch (IOException e) { throw new HoodieIndexException( @@ -160,25 +208,25 @@ public class HBaseIndex extends HoodieIndex { } return taggedRecords.iterator(); - } + }; } @Override public JavaRDD> tagLocation(JavaRDD> recordRDD, - HoodieTable hoodieTable) { - return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(hoodieTable), true); + HoodieTable hoodieTable) { + return recordRDD.mapPartitionsWithIndex(locationTagFunction(hoodieTable), true); } - class UpdateLocationTask implements - Function2, Iterator> { + private Function2, Iterator> updateLocationFunction() { - @Override - public Iterator call(Integer partition, Iterator statusIterator) { + return (Function2, Iterator>) (partition, statusIterator) -> { + + Integer multiPutBatchSize = config.getHbaseIndexPutBatchSize(); List writeStatusList = new ArrayList<>(); // Grab the global HBase connection synchronized (HBaseIndex.class) { - if (hbaseConnection == null) { + if (hbaseConnection == null || hbaseConnection.isClosed()) { hbaseConnection = getHBaseConnection(); } } @@ -194,6 +242,10 @@ public class HBaseIndex extends HoodieIndex { if (!writeStatus.isErrored(rec.getKey())) { java.util.Optional loc = rec.getNewLocation(); if (loc.isPresent()) { + if (rec.getCurrentLocation() != null) { + // This is an update, no need to update index + continue; + } Put put = new Put(Bytes.toBytes(rec.getRecordKey())); put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getCommitTime())); @@ -208,10 +260,13 @@ public class HBaseIndex extends HoodieIndex { deletes.add(delete); } } + if (puts.size() + deletes.size() < multiPutBatchSize) { + continue; + } + doPutsAndDeletes(hTable, puts, deletes); } - hTable.put(puts); - hTable.delete(deletes); - hTable.flushCommits(); + //process remaining puts and deletes, if any + doPutsAndDeletes(hTable, puts, deletes); } catch (Exception e) { Exception we = new Exception("Error updating index for " + writeStatus, e); logger.error(we); @@ -232,24 +287,43 @@ public class HBaseIndex extends HoodieIndex { } } return writeStatusList.iterator(); + }; + } + + /** + * Helper method to facilitate performing puts and deletes in Hbase + * @param hTable + * @param puts + * @param deletes + * @throws IOException + */ + private void doPutsAndDeletes(HTable hTable, List puts, List deletes) throws IOException { + if(puts.size() > 0) { + hTable.put(puts); } + if(deletes.size() > 0) { + hTable.delete(deletes); + } + hTable.flushCommits(); + puts.clear(); + deletes.clear(); } @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, - HoodieTable hoodieTable) { - return writeStatusRDD.mapPartitionsWithIndex(new UpdateLocationTask(), true); + HoodieTable hoodieTable) { + return writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); } @Override public boolean rollbackCommit(String commitTime) { - // Can't really rollback here. HBase only can let you go from recordKey to fileID, - // not the other way around + // Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()} return true; } /** * Only looks up by recordKey + * @return */ @Override public boolean isGlobal() { @@ -258,6 +332,7 @@ public class HBaseIndex extends HoodieIndex { /** * Mapping is available in HBase already. + * @return */ @Override public boolean canIndexLogFiles() { @@ -266,9 +341,15 @@ public class HBaseIndex extends HoodieIndex { /** * Index needs to be explicitly updated after storage write. + * @return */ @Override public boolean isImplicitWithStorage() { return false; } -} + + @VisibleForTesting + public void setHbaseConnection(Connection hbaseConnection) { + HBaseIndex.hbaseConnection = hbaseConnection; + } +} \ No newline at end of file diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java new file mode 100644 index 000000000..990e62d61 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java @@ -0,0 +1,318 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.index; + +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.table.HoodieTableConfig; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieStorageConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.hbase.HBaseIndex; +import com.uber.hoodie.table.HoodieTable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runners.MethodSorters; +import org.mockito.Mockito; +import scala.Tuple2; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.times; + +/** + * Note :: HBaseTestingUtility is really flaky with issues where the HbaseMiniCluster fails to shutdown + * across tests, (see one problem here : https://issues.apache.org/jira/browse/HBASE-15835). + * Hence, the need to use MethodSorters.NAME_ASCENDING to make sure the tests run in order. Please alter + * the order of tests running carefully. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class TestHbaseIndex { + + private static JavaSparkContext jsc = null; + private String basePath = null; + private transient FileSystem fs; + private static HBaseTestingUtility utility; + private static Configuration hbaseConfig; + private static String tableName = "test_table"; + private HoodieTableMetaClient metaClient; + + @AfterClass + public static void clean() throws Exception { + if (jsc != null) { + jsc.stop(); + } + if (utility != null) { + utility.shutdownMiniCluster(); + } + } + + @After + public void clear() throws Exception { + if (basePath != null) { + new File(basePath).delete(); + } + } + + @Before + public void before() throws Exception { + // Create a temp folder as the base path + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + basePath = folder.getRoot().getAbsolutePath(); + // Initialize table + metaClient = HoodieTableMetaClient + .initTableType(utility.getTestFileSystem(), basePath, HoodieTableType.COPY_ON_WRITE, + tableName, HoodieTableConfig.DEFAULT_PAYLOAD_CLASS); + } + + public TestHbaseIndex() throws Exception { + } + + @BeforeClass + public static void init() throws Exception { + + // Initialize HbaseMiniCluster + utility = new HBaseTestingUtility(); + utility.startMiniCluster(); + hbaseConfig = utility.getConnection().getConfiguration(); + utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s")); + // Initialize a local spark env + SparkConf sparkConf = new SparkConf().setAppName("TestHbaseIndex").setMaster("local[1]"); + jsc = new JavaSparkContext(sparkConf); + jsc.hadoopConfiguration().addResource(utility.getConfiguration()); + } + + @Test + public void testSimpleTagLocationAndUpdate() throws Exception { + + String newCommitTime = "001"; + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + // Load to memory + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config, jsc); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + writeClient.startCommit(); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config); + + // Test tagLocation without any entries in index + JavaRDD javaRDD = index.tagLocation(writeRecords, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + + // Insert 200 records + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + + // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed commit + javaRDD = index.tagLocation(writeRecords, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + + // Now commit this & update location of records inserted and validate no errors + writeClient.commit(newCommitTime, writeStatues); + + // Now tagLocation for these records, hbaseIndex should tag them correctly + javaRDD = index.tagLocation(writeRecords, hoodieTable); + assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200); + assertTrue(javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count() == 200); + assertTrue(javaRDD.filter(record -> (record.getCurrentLocation() != null + && record.getCurrentLocation().getCommitTime().equals(newCommitTime))).distinct().count() == 200); + + } + + @Test + public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + // Load to memory + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config, jsc); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + + String newCommitTime = writeClient.startCommit(); + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config); + + // Insert 200 records + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + + // commit this upsert + writeClient.commit(newCommitTime, writeStatues); + + // Now tagLocation for these records, hbaseIndex should tag them + JavaRDD javaRDD = index.tagLocation(writeRecords, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200); + + // check tagged records are tagged with correct fileIds + List fileIds = writeStatues.map(status -> status.getFileId()).collect(); + assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0); + List taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect(); + + // both lists should match + assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds)); + // Rollback the last commit + writeClient.rollback(newCommitTime); + + // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled back commit + javaRDD = index.tagLocation(writeRecords, hoodieTable); + assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0); + assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0); + } + + @Test + public void testTotalGetsBatching() throws Exception { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config, jsc); + + // Mock hbaseConnection and related entities + Connection hbaseConnection = Mockito.mock(Connection.class); + HTable table = Mockito.mock(HTable.class); + Mockito.when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table); + Mockito.when(table.get((List) anyObject())).thenReturn(new Result[0]); + + // only for test, set the hbaseConnection to mocked object + index.setHbaseConnection(hbaseConnection); + + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + + // start a commit and generate test data + String newCommitTime = writeClient.startCommit(); + List records = dataGen.generateInserts(newCommitTime, 250); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config); + + // Insert 250 records + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + + // Now tagLocation for these records, hbaseIndex should tag them + index.tagLocation(writeRecords, hoodieTable); + + // 3 batches should be executed given batchSize = 100 and parallelism = 1 + Mockito.verify(table, times(3)).get((List) anyObject()); + + } + + @Test + public void testTotalPutsBatching() throws Exception { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config, jsc); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + + // start a commit and generate test data + String newCommitTime = writeClient.startCommit(); + List records = dataGen.generateInserts(newCommitTime, 250); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config); + + // Insert 200 records + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + + // commit this upsert + writeClient.commit(newCommitTime, writeStatues); + + // Mock hbaseConnection and related entities + Connection hbaseConnection = Mockito.mock(Connection.class); + HTable table = Mockito.mock(HTable.class); + Mockito.when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table); + Mockito.when(table.get((List) anyObject())).thenReturn(new Result[0]); + + // only for test, set the hbaseConnection to mocked object + index.setHbaseConnection(hbaseConnection); + + // Get all the files generated + int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count(); + + index.updateLocation(writeStatues, hoodieTable); + // 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated, + // so each fileId ideally gets updates + Mockito.verify(table, atMost(numberOfDataFileIds)).put((List) anyObject()); + } + + private void assertNoWriteErrors(List statuses) { + // Verify there are no errors + for (WriteStatus status : statuses) { + assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); + } + } + + private HoodieWriteConfig getConfig() { + return getConfigBuilder().build(); + } + + private HoodieWriteConfig.Builder getConfigBuilder() { + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(1, 1) + .withCompactionConfig( + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) + .withInlineCompaction(false).build()) + .withAutoCommit(false) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .forTable("test-trip-table").withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE) + .hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort"))) + .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName) + .hbaseIndexGetBatchSize(100).hbaseIndexPutBatchSize(100).build()); + } +} \ No newline at end of file