diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index eb9ad62e4..82bb9218f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -19,11 +19,14 @@ package com.uber.hoodie; import com.google.common.base.Optional; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.index.bloom.HoodieBloomIndex; +import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import java.io.Serializable; import java.util.HashSet; @@ -45,7 +48,7 @@ import scala.Tuple2; /** * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys. */ -public class HoodieReadClient implements Serializable { +public class HoodieReadClient implements Serializable { private static Logger logger = LogManager.getLogger(HoodieReadClient.class); @@ -57,7 +60,7 @@ public class HoodieReadClient implements Serializable { * just with a simple basepath pointing to the dataset. Until, then just always assume a * BloomIndex */ - private transient final HoodieBloomIndex index; + private transient final HoodieIndex index; private final HoodieTimeline commitTimeline; private HoodieTable hoodieTable; private transient Optional sqlContextOpt; @@ -66,15 +69,12 @@ public class HoodieReadClient implements Serializable { * @param basePath path to Hoodie dataset */ public HoodieReadClient(JavaSparkContext jsc, String basePath) { - this.jsc = jsc; - this.fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); - // Create a Hoodie table which encapsulated the commits and files visible - this.hoodieTable = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), null); - this.commitTimeline = hoodieTable.getCommitTimeline().filterCompletedInstants(); - this.index = - new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc); - this.sqlContextOpt = Optional.absent(); + this(jsc, HoodieWriteConfig.newBuilder() + .withPath(basePath) + // by default we use HoodieBloomIndex + .withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .build()); } /** @@ -88,6 +88,22 @@ public class HoodieReadClient implements Serializable { this.sqlContextOpt = Optional.of(sqlContext); } + /** + * @param clientConfig instance of HoodieWriteConfig + */ + public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) { + final String basePath = clientConfig.getBasePath(); + this.jsc = jsc; + this.fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); + // Create a Hoodie table which encapsulated the commits and files visible + this.hoodieTable = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), + clientConfig); + this.commitTimeline = hoodieTable.getCommitTimeline().filterCompletedInstants(); + this.index = HoodieIndex.createIndex(clientConfig, jsc); + this.sqlContextOpt = Optional.absent(); + } + /** * Adds support for accessing Hoodie built tables from SparkSQL, as you normally would. * @@ -158,8 +174,20 @@ public class HoodieReadClient implements Serializable { * @param hoodieRecords Input RDD of Hoodie records. * @return A subset of hoodieRecords RDD, with existing records filtered out. */ - public JavaRDD filterExists(JavaRDD hoodieRecords) { - JavaRDD recordsWithLocation = index.tagLocation(hoodieRecords, hoodieTable); + public JavaRDD> filterExists(JavaRDD> hoodieRecords) { + JavaRDD> recordsWithLocation = tagLocation(hoodieRecords); return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); } + + /** + * Looks up the index and tags each incoming record with a location of a file that contains the + * row (if it is actually present). Input RDD should contain no duplicates if needed. + * + * @param hoodieRecords Input RDD of Hoodie records + * @return Tagged RDD of Hoodie records + */ + public JavaRDD> tagLocation(JavaRDD> hoodieRecords) + throws HoodieIndexException { + return index.tagLocation(hoodieRecords, hoodieTable); + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 100dcfaa0..81224fab7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -142,12 +142,7 @@ public class HoodieWriteClient implements Seriali * Upserts a bunch of new records into the Hoodie table, at the supplied commitTime */ public JavaRDD upsert(JavaRDD> records, final String commitTime) { - writeContext = metrics.getCommitCtx(); - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), - config); - + HoodieTable table = getTableAndInitCtx(); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = @@ -165,6 +160,30 @@ public class HoodieWriteClient implements Seriali } } + /** + * Upserts the given prepared records into the Hoodie table, at the supplied commitTime. + * + * This implementation requires that the input records are already tagged, and de-duped if + * needed. + * + * @param preppedRecords Prepared HoodieRecords to upsert + * @param commitTime Commit Time handle + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts + */ + public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, + final String commitTime) { + HoodieTable table = getTableAndInitCtx(); + try { + return upsertRecordsInternal(preppedRecords, commitTime, table, true); + } catch (Throwable e) { + if (e instanceof HoodieUpsertException) { + throw (HoodieUpsertException) e; + } + throw new HoodieUpsertException("Failed to upsert prepared records for commit time " + + commitTime, e); + } + } + /** * Inserts the given HoodieRecords, into the table. This API is intended to be used for normal * writes. @@ -177,11 +196,7 @@ public class HoodieWriteClient implements Seriali * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insert(JavaRDD> records, final String commitTime) { - writeContext = metrics.getCommitCtx(); - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), - config); + HoodieTable table = getTableAndInitCtx(); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = @@ -197,6 +212,31 @@ public class HoodieWriteClient implements Seriali } } + /** + * Inserts the given prepared records into the Hoodie table, at the supplied commitTime. + * + * This implementation skips the index check, skips de-duping and is able to leverage benefits + * such as small file handling/blocking alignment, as with insert(), by profiling the workload. + * The prepared HoodieRecords should be de-duped if needed. + * + * @param preppedRecords HoodieRecords to insert + * @param commitTime Commit Time handle + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts + */ + public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, + final String commitTime) { + HoodieTable table = getTableAndInitCtx(); + try { + return upsertRecordsInternal(preppedRecords, commitTime, table, false); + } catch (Throwable e) { + if (e instanceof HoodieInsertException) { + throw e; + } + throw new HoodieInsertException("Failed to insert prepared records for commit time " + + commitTime, e); + } + } + /** * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk * loads into a Hoodie table for the very first time (e.g: converting an existing dataset to @@ -235,40 +275,14 @@ public class HoodieWriteClient implements Seriali public JavaRDD bulkInsert(JavaRDD> records, final String commitTime, Option bulkInsertPartitioner) { - writeContext = metrics.getCommitCtx(); - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), - config); - + HoodieTable table = getTableAndInitCtx(); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism()); - final JavaRDD> repartitionedRecords; - if (bulkInsertPartitioner.isDefined()) { - repartitionedRecords = - bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, - config.getBulkInsertShuffleParallelism()); - } else { - // Now, sort the records and line them up nicely for loading. - repartitionedRecords = dedupedRecords - .sortBy(record -> { - // Let's use "partitionPath + key" as the sort key. Spark, will ensure - // the records split evenly across RDD partitions, such that small partitions fit - // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions - return String - .format("%s+%s", record.getPartitionPath(), record.getRecordKey()); - }, true, config.getBulkInsertShuffleParallelism()); - } - JavaRDD writeStatusRDD = repartitionedRecords - .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, table), - true) - .flatMap(writeStatuses -> writeStatuses.iterator()); - - return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime); + return bulkInsertInternal(dedupedRecords, commitTime, table, bulkInsertPartitioner); } catch (Throwable e) { if (e instanceof HoodieInsertException) { throw e; @@ -278,6 +292,67 @@ public class HoodieWriteClient implements Seriali } } + /** + * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk + * loads into a Hoodie table for the very first time (e.g: converting an existing dataset to + * Hoodie). The input records should contain no duplicates if needed. + * + * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and + * attempts to control the numbers of files with less memory compared to the {@link + * HoodieWriteClient#insert(JavaRDD, String)}. Optionally it allows users to specify their own + * partitioner. If specified then it will be used for repartitioning records. See {@link + * UserDefinedBulkInsertPartitioner}. + * + * @param preppedRecords HoodieRecords to insert + * @param commitTime Commit Time handle + * @param bulkInsertPartitioner If specified then it will be used to partition input records + * before they are inserted into hoodie. + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts + */ + public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, + final String commitTime, + Option bulkInsertPartitioner) { + HoodieTable table = getTableAndInitCtx(); + try { + return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner); + } catch (Throwable e) { + if (e instanceof HoodieInsertException) { + throw e; + } + throw new HoodieInsertException("Failed to bulk insert prepared records for commit time " + + commitTime, e); + } + } + + private JavaRDD bulkInsertInternal( + JavaRDD> dedupedRecords, + String commitTime, + HoodieTable table, + Option bulkInsertPartitioner) { + final JavaRDD> repartitionedRecords; + if (bulkInsertPartitioner.isDefined()) { + repartitionedRecords = + bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, + config.getBulkInsertShuffleParallelism()); + } else { + // Now, sort the records and line them up nicely for loading. + repartitionedRecords = dedupedRecords + .sortBy(record -> { + // Let's use "partitionPath + key" as the sort key. Spark, will ensure + // the records split evenly across RDD partitions, such that small partitions fit + // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions + return String + .format("%s+%s", record.getPartitionPath(), record.getRecordKey()); + }, true, config.getBulkInsertShuffleParallelism()); + } + JavaRDD writeStatusRDD = repartitionedRecords + .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, table), + true) + .flatMap(writeStatuses -> writeStatuses.iterator()); + + return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime); + } + private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD) { if (config.shouldAutoCommit()) { logger.info("Auto commit enabled: Committing " + commitTime); @@ -907,4 +982,12 @@ public class HoodieWriteClient implements Seriali rollback(commit); } } + + private HoodieTable getTableAndInitCtx() { + writeContext = metrics.getCommitCtx(); + // Create a Hoodie table which encapsulated the commits and files visible + return HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), + config); + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 5caeec23b..1bf9ca506 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -80,6 +80,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import scala.Option; import scala.collection.Iterator; public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { @@ -190,7 +191,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); - JavaRDD result = client.bulkInsert(writeRecords, newCommitTime); + JavaRDD result = client.bulkInsertPreppedRecords(writeRecords, newCommitTime, + Option.empty()); assertFalse("If Autocommit is false, then commit should not be made automatically", HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); @@ -218,7 +220,6 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig) throws Exception { HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig); - HoodieIndex index = HoodieIndex.createIndex(hoodieWriteConfig, jsc); /** * Write 1 (only inserts) @@ -229,7 +230,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); - List statuses = client.upsert(writeRecords, newCommitTime).collect(); + List statuses = client.upsertPreppedRecords(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); // check the partition metadata is written out @@ -249,10 +250,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { records.size(), HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); // Should have 100 records in table (check using Index), all in locations marked at commit - HoodieTable table = HoodieTable - .getHoodieTable(metaClient, getConfig()); - - List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table) + HoodieReadClient readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath()); + List taggedRecords = readClient.tagLocation(jsc.parallelize(records, 1)) .collect(); checkTaggedRecords(taggedRecords, "001"); @@ -282,11 +281,9 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), newCommitTime); - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metaClient, getConfig()); - // Index should be able to locate all updates in correct locations. - taggedRecords = index.tagLocation(jsc.parallelize(dedupedRecords, 1), table).collect(); + readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath()); + taggedRecords = readClient.tagLocation(jsc.parallelize(dedupedRecords, 1)).collect(); checkTaggedRecords(taggedRecords, "004"); // Check the entire dataset has 100 records still @@ -732,7 +729,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { List records = dataGen.generateInserts(newCommitTime, 500); JavaRDD writeRecords = jsc.parallelize(records, 5); - List statuses = client.insert(writeRecords, newCommitTime).collect(); + List statuses = client.insertPreppedRecords(writeRecords, newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses);