diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 3c8b36cb6..0d4b415af 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -466,10 +466,12 @@ public class HoodieWriteClient extends AbstractHo private JavaRDD updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, HoodieTable table, String commitTime) { + // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future + // RDD actions that are performed after updating the index. + writeStatusRDD = writeStatusRDD.persist(config.getWriteStatusStorageLevel()); // Update the index back JavaRDD statuses = index.updateLocation(writeStatusRDD, jsc, table); // Trigger the insert and collect statuses - statuses = statuses.persist(config.getWriteStatusStorageLevel()); commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType()); return statuses; } @@ -974,6 +976,9 @@ public class HoodieWriteClient extends AbstractHo public void close() { // Stop timeline-server if running super.close(); + // Calling this here releases any resources used by your index, so make sure to finish any related operations + // before this point + this.index.close(); } /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index 073ed5175..9cbc734b6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -57,6 +57,15 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket"; public static final String DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET = "10000000"; + // ***** HBase Index Configs ***** + public static final String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; + public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport"; + public static final String HBASE_ZK_ZNODEPARENT = "hoodie.index.hbase.zknode.path"; + public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table"; + public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size"; + public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size"; + public static final String DEFAULT_HBASE_BATCH_SIZE = "100"; + public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage" + ".level"; @@ -109,6 +118,26 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder hbaseZkQuorum(String zkString) { + props.setProperty(HBASE_ZKQUORUM_PROP, zkString); + return this; + } + + public Builder hbaseZkPort(int port) { + props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port)); + return this; + } + + public Builder hbaseZkZnodeParent(String zkZnodeParent) { + props.setProperty(HBASE_ZK_ZNODEPARENT, zkZnodeParent); + return this; + } + + public Builder hbaseTableName(String tableName) { + props.setProperty(HBASE_TABLENAME_PROP, tableName); + return this; + } + public Builder bloomIndexParallelism(int parallelism) { props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism)); return this; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index e78c35863..63ba10ef8 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -306,6 +306,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKPORT_PROP)); } + public String getHBaseZkZnodeParent() { + return props.getProperty(HoodieIndexConfig.HBASE_ZK_ZNODEPARENT); + } + public String getHbaseTableName() { return props.getProperty(HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java index 7f4eb6365..e15d07bdb 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java @@ -115,6 +115,10 @@ public abstract class HoodieIndex implements Seri */ public abstract boolean isImplicitWithStorage(); + /** + * Each index type should implement it's own logic to release any resources acquired during the process. + */ + public void close() {} public enum IndexType { HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java index 4eb944386..276472142 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java @@ -83,6 +83,7 @@ public class HBaseIndex extends HoodieIndex { private static Logger logger = LogManager.getLogger(HBaseIndex.class); private static Connection hbaseConnection = null; + private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null; private float qpsFraction; private int maxQpsPerRegionServer; /** @@ -106,6 +107,7 @@ public class HBaseIndex extends HoodieIndex { this.qpsFraction = config.getHbaseIndexQPSFraction(); this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer(); this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator(); + this.hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); } @VisibleForTesting @@ -132,6 +134,10 @@ public class HBaseIndex extends HoodieIndex { Configuration hbaseConfig = HBaseConfiguration.create(); String quorum = config.getHbaseZkQuorum(); hbaseConfig.set("hbase.zookeeper.quorum", quorum); + String zkZnodeParent = config.getHBaseZkZnodeParent(); + if (zkZnodeParent != null) { + hbaseConfig.set("zookeeper.znode.parent", zkZnodeParent); + } String port = String.valueOf(config.getHbaseZkPort()); hbaseConfig.set("hbase.zookeeper.property.clientPort", port); try { @@ -158,6 +164,13 @@ public class HBaseIndex extends HoodieIndex { }); } + /** + * Ensure that any resources used for indexing are released here. + */ + public void close() { + this.hBaseIndexQPSResourceAllocator.releaseQPSResources(); + } + private Get generateStatement(String key) throws IOException { return new Get(Bytes.toBytes(key)).setMaxVersions(1) .addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN) @@ -368,21 +381,13 @@ public class HBaseIndex extends HoodieIndex { public JavaRDD updateLocation(JavaRDD writeStatusRDD, JavaSparkContext jsc, HoodieTable hoodieTable) { final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); - JavaRDD writeStatusResultRDD; setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); - logger.info("multiPutBatchSize: before puts" + multiPutBatchSize); + logger.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex( updateLocationFunction(), true); - // Forcing a spark action so HBase puts are triggered before releasing resources - if (this.config.getHBaseIndexShouldComputeQPSDynamically()) { - logger.info("writestatus count: " + writeStatusJavaRDD.count()); - writeStatusResultRDD = writeStatusRDD; - } else { - writeStatusResultRDD = writeStatusJavaRDD; - } - // Release QPS resources as HBAse puts are done at this point - hBaseIndexQPSResourceAllocator.releaseQPSResources(); - return writeStatusResultRDD; + // caching the index updated status RDD + writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel()); + return writeStatusJavaRDD; } private void setPutBatchSize(JavaRDD writeStatusRDD, @@ -430,7 +435,7 @@ public class HBaseIndex extends HoodieIndex { final JavaPairRDD insertOnlyWriteStatusRDD = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) .mapToPair(w -> new Tuple2<>(w.getStat().getNumInserts(), 1)); - return insertOnlyWriteStatusRDD.reduce((w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2)); + return insertOnlyWriteStatusRDD.fold(new Tuple2<>(0L, 0), (w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2)); } public static class HbasePutBatchSizeCalculator implements Serializable { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index bd6650e31..8ebf31ef2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -149,6 +149,7 @@ public class HoodieMergeHandle extends HoodieWrit * Extract old file path, initialize StorageWriter and WriteStatus */ private void init(String fileId, String partitionPath, HoodieDataFile dataFileToBeMerged) { + logger.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId); this.writtenRecordKeys = new HashSet<>(); writeStatus.setStat(new HoodieWriteStat()); try { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java index f2c5d5c65..5ff72208c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java @@ -185,6 +185,40 @@ public class TestHbaseIndex { } + @Test + public void testTagLocationAndDuplicateUpdate() throws Exception { + String newCommitTime = "001"; + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 10); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + // Load to memory + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + writeClient.startCommit(); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + + JavaRDD writeStatues = writeClient.upsert(writeRecords, newCommitTime); + JavaRDD javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable); + // Duplicate upsert and ensure correctness is maintained + writeClient.upsert(writeRecords, newCommitTime); + assertNoWriteErrors(writeStatues.collect()); + + // Now commit this & update location of records inserted and validate no errors + writeClient.commit(newCommitTime, writeStatues); + // Now tagLocation for these records, hbaseIndex should tag them correctly + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); + JavaRDD javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable); + assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 10); + assertTrue(javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count() == 10); + assertTrue(javaRDD.filter( + record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime() + .equals(newCommitTime))).distinct().count() == 10); + } + @Test public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { @@ -359,6 +393,23 @@ public class TestHbaseIndex { Assert.assertEquals(11, hbaseNumPuts); } + @Test + public void testsHBasePutAccessParallelismWithNoInserts() { + HoodieWriteConfig config = getConfig(); + HBaseIndex index = new HBaseIndex(config); + final JavaRDD writeStatusRDD = jsc.parallelize( + Arrays.asList( + getSampleWriteStatus(0, 2), + getSampleWriteStatus(0, 1)), + 10); + final Tuple2 tuple = index.getHBasePutAccessParallelism(writeStatusRDD); + final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString()); + final int hbaseNumPuts = Integer.parseInt(tuple._1.toString()); + Assert.assertEquals(10, writeStatusRDD.getNumPartitions()); + Assert.assertEquals(0, hbasePutAccessParallelism); + Assert.assertEquals(0, hbaseNumPuts); + } + @Test public void testsHBaseIndexDefaultQPSResourceAllocator() { HoodieWriteConfig config = getConfig();