From 8e72ed69b430b67bc4d3fcaeeee580890c3e305a Mon Sep 17 00:00:00 2001 From: vinoth chandar Date: Fri, 27 Jan 2017 10:51:00 -0800 Subject: [PATCH] Split insert() into insert() & bulkInsert() (#69) - Behaviour change for existing insert() users - Made the current insert() implementation, as something to use for bulkInsert() - Normal inserts now share a lot of code with upsert, which provides benefits like small file handling - Refactored/Cleaned up code in HoodieWriteClient for reuse - Added a unit test, switching few tests to call bulkInsert() and few to call insert() --- .../com/uber/hoodie/HoodieWriteClient.java | 333 ++++++++++-------- ...nction.java => BulkInsertMapFunction.java} | 6 +- .../hoodie/table/HoodieCopyOnWriteTable.java | 11 +- .../com/uber/hoodie/table/HoodieTable.java | 11 + .../com/uber/hoodie/TestHoodieClient.java | 104 +++++- .../common/HoodieTestDataGenerator.java | 3 + 6 files changed, 309 insertions(+), 159 deletions(-) rename hoodie-client/src/main/java/com/uber/hoodie/func/{InsertMapFunction.java => BulkInsertMapFunction.java} (88%) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 286174c2c..622a20ed7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -31,7 +31,7 @@ import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieUpsertException; -import com.uber.hoodie.func.InsertMapFunction; +import com.uber.hoodie.func.BulkInsertMapFunction; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.HoodieCleaner; import com.uber.hoodie.io.HoodieCommitArchiveLog; @@ -72,12 +72,11 @@ import scala.Option; import scala.Tuple2; /** - * Hoodie Write Client helps you build datasets on HDFS [insert()] and then - * perform efficient mutations on a HDFS dataset [upsert()] - * - * Note that, at any given time, there can only be one Spark job performing - * these operatons on a Hoodie dataset. + * Hoodie Write Client helps you build datasets on HDFS [insert()] and then perform efficient + * mutations on a HDFS dataset [upsert()] * + * Note that, at any given time, there can only be one Spark job performing these operatons on a + * Hoodie dataset. */ public class HoodieWriteClient implements Serializable { @@ -142,72 +141,18 @@ public class HoodieWriteClient implements Seriali */ public JavaRDD upsert(JavaRDD> records, final String commitTime) { final HoodieTableMetadata metadata = - new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName()); + new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName()); writeContext = metrics.getCommitCtx(); - final HoodieTable table = - HoodieTable.getHoodieTable(metadata.getTableType(), commitTime, config, metadata); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = - combineOnCondition(config.shouldCombineBeforeUpsert(), records, - config.getUpsertShuffleParallelism()); + combineOnCondition(config.shouldCombineBeforeUpsert(), records, + config.getUpsertShuffleParallelism()); // perform index loop up to get existing location of records JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, metadata); - - // Cache the tagged records, so we don't end up computing both - taggedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); - - - WorkloadProfile profile = null; - if (table.isWorkloadProfileNeeded()) { - profile = new WorkloadProfile(taggedRecords); - logger.info("Workload profile :" + profile); - } - - // obtain the upsert partitioner, and the run the tagger records through that & get a partitioned RDD. - final Partitioner upsertPartitioner = table.getUpsertPartitioner(profile); - JavaRDD> partitionedRecords = taggedRecords.mapToPair( - new PairFunction, Tuple2>, HoodieRecord>() { - @Override - public Tuple2>, HoodieRecord> call( - HoodieRecord record) throws Exception { - return new Tuple2<>(new Tuple2<>(record.getKey(), - Option.apply(record.getCurrentLocation())), record); - } - }).partitionBy(upsertPartitioner).map( - new Function>, HoodieRecord>, HoodieRecord>() { - @Override - public HoodieRecord call( - Tuple2>, HoodieRecord> tuple) - throws Exception { - return tuple._2(); - } - }); - - - // Perform the actual writing. - JavaRDD upsertStatusRDD = partitionedRecords.mapPartitionsWithIndex( - new Function2>, Iterator>>() { - @Override - public Iterator> call(Integer partition, - Iterator> recordItr) throws Exception { - return table.handleUpsertPartition(partition, recordItr, upsertPartitioner); - } - }, true).flatMap(new FlatMapFunction, WriteStatus>() { - @Override - public Iterable call(List writeStatuses) - throws Exception { - return writeStatuses; - } - }); - - // Update the index back. - JavaRDD resultRDD = index.updateLocation(upsertStatusRDD, metadata); - resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel()); - commitOnAutoCommit(commitTime, resultRDD); - return resultRDD; + return upsertRecordsInternal(taggedRecords, commitTime, metadata, true); } catch (Throwable e) { if (e instanceof HoodieUpsertException) { throw (HoodieUpsertException) e; @@ -216,8 +161,38 @@ public class HoodieWriteClient implements Seriali } } + /** + * Inserts the given HoodieRecords, into the table. This API is intended to be used for normal + * writes. + * + * This implementation skips the index check & is able to leverage benefits such as + * small file handling/blocking alignment, as with upsert(), by profiling the workload + * + * @param records HoodieRecords to insert + * @param commitTime Commit Time handle + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts + */ + public JavaRDD insert(JavaRDD> records, final String commitTime) { + final HoodieTableMetadata metadata = + new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName()); + writeContext = metrics.getCommitCtx(); + try { + // De-dupe/merge if needed + JavaRDD> dedupedRecords = + combineOnCondition(config.shouldCombineBeforeInsert(), records, + config.getInsertShuffleParallelism()); + + return upsertRecordsInternal(dedupedRecords, commitTime, metadata, false); + } catch (Throwable e) { + if (e instanceof HoodieInsertException) { + throw e; + } + throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e); + } + } + private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD) { - if(config.shouldAutoCommit()) { + if (config.shouldAutoCommit()) { logger.info("Auto commit enabled: Committing " + commitTime); boolean commitResult = commit(commitTime, resultRDD); if (!commitResult) { @@ -229,64 +204,146 @@ public class HoodieWriteClient implements Seriali } private JavaRDD> combineOnCondition(boolean condition, - JavaRDD> records, int parallelism) { - if(condition) { + JavaRDD> records, int parallelism) { + if (condition) { return deduplicateRecords(records, parallelism); } return records; } + private JavaRDD> partition(JavaRDD> dedupedRecords, Partitioner partitioner) { + return dedupedRecords.mapToPair( + new PairFunction, Tuple2>, HoodieRecord>() { + @Override + public Tuple2>, HoodieRecord> call( + HoodieRecord record) throws Exception { + return new Tuple2<>(new Tuple2<>(record.getKey(), + Option.apply(record.getCurrentLocation())), record); + } + }).partitionBy(partitioner).map( + new Function>, HoodieRecord>, HoodieRecord>() { + @Override + public HoodieRecord call( + Tuple2>, HoodieRecord> tuple) + throws Exception { + return tuple._2(); + } + }); + } + + private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) { + if (isUpsert) { + return table.getUpsertPartitioner(profile); + } else { + return table.getInsertPartitioner(profile); + } + } + + private JavaRDD updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, + HoodieTableMetadata metadata, + String commitTime) { + // Update the index back + JavaRDD statuses = index.updateLocation(writeStatusRDD, metadata); + // Trigger the insert and collect statuses + statuses = statuses.persist(config.getWriteStatusStorageLevel()); + commitOnAutoCommit(commitTime, statuses); + return statuses; + } + + private JavaRDD upsertRecordsInternal(JavaRDD> preppedRecords, + String commitTime, + HoodieTableMetadata metadata, + final boolean isUpsert) { + + final HoodieTable table = + HoodieTable.getHoodieTable(metadata.getTableType(), commitTime, config, metadata); + + // Cache the tagged records, so we don't end up computing both + preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); + + WorkloadProfile profile = null; + if (table.isWorkloadProfileNeeded()) { + profile = new WorkloadProfile(preppedRecords); + logger.info("Workload profile :" + profile); + } + + // partition using the insert partitioner + final Partitioner partitioner = getPartitioner(table, isUpsert, profile); + JavaRDD> partitionedRecords = partition(preppedRecords, partitioner); + JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex( + new Function2>, Iterator>>() { + @Override + public Iterator> call(Integer partition, + Iterator> recordItr) throws Exception { + if (isUpsert) { + return table.handleUpsertPartition(partition, recordItr, partitioner); + } else { + return table.handleInsertPartition(partition, recordItr, partitioner); + } + } + }, true).flatMap(new FlatMapFunction, WriteStatus>() { + @Override + public Iterable call(List writeStatuses) + throws Exception { + return writeStatuses; + } + }); + + return updateIndexAndCommitIfNeeded(writeStatusRDD, metadata, commitTime); + } + + /** - * Loads the given HoodieRecords, as inserts into the table. - * (This implementation uses sortBy and attempts to control the numbers of files with less memory) + * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk + * loads into a Hoodie table for the very first time (e.g: converting an existing dataset to + * Hoodie). * - * @param records HoodieRecords to insert + * This implementation uses sortBy (which does range partitioning based on reservoir sampling) & + * attempts to control the numbers of files with less memory compared to the {@link + * HoodieWriteClient#insert(JavaRDD, String)} + * + * @param records HoodieRecords to insert * @param commitTime Commit Time handle * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts - * */ - public JavaRDD insert(JavaRDD> records, final String commitTime) { + public JavaRDD bulkInsert(JavaRDD> records, final String commitTime) { final HoodieTableMetadata metadata = - new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName()); + new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName()); writeContext = metrics.getCommitCtx(); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = - combineOnCondition(config.shouldCombineBeforeInsert(), records, - config.getInsertShuffleParallelism()); + combineOnCondition(config.shouldCombineBeforeInsert(), records, + config.getInsertShuffleParallelism()); // Now, sort the records and line them up nicely for loading. JavaRDD> sortedRecords = - dedupedRecords.sortBy(new Function, String>() { - @Override - public String call(HoodieRecord record) { - // Let's use "partitionPath + key" as the sort key. Spark, will ensure - // the records split evenly across RDD partitions, such that small partitions fit - // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions - return String - .format("%s+%s", record.getPartitionPath(), record.getRecordKey()); - } - }, true, config.getInsertShuffleParallelism()); + dedupedRecords.sortBy(new Function, String>() { + @Override + public String call(HoodieRecord record) { + // Let's use "partitionPath + key" as the sort key. Spark, will ensure + // the records split evenly across RDD partitions, such that small partitions fit + // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions + return String + .format("%s+%s", record.getPartitionPath(), record.getRecordKey()); + } + }, true, config.getInsertShuffleParallelism()); JavaRDD writeStatusRDD = sortedRecords - .mapPartitionsWithIndex(new InsertMapFunction(commitTime, config, metadata), - true).flatMap(new FlatMapFunction, WriteStatus>() { - @Override - public Iterable call(List writeStatuses) - throws Exception { - return writeStatuses; - } - }); - // Update the index back - JavaRDD statuses = index.updateLocation(writeStatusRDD, metadata); - // Trigger the insert and collect statuses - statuses = statuses.persist(config.getWriteStatusStorageLevel()); - commitOnAutoCommit(commitTime, statuses); - return statuses; + .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, metadata), + true).flatMap(new FlatMapFunction, WriteStatus>() { + @Override + public Iterable call(List writeStatuses) + throws Exception { + return writeStatuses; + } + }); + + return updateIndexAndCommitIfNeeded(writeStatusRDD, metadata, commitTime); } catch (Throwable e) { if (e instanceof HoodieInsertException) { throw e; } - throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e); + throw new HoodieInsertException("Failed to bulk insert for commit time " + commitTime, e); } } @@ -296,7 +353,7 @@ public class HoodieWriteClient implements Seriali public boolean commit(String commitTime, JavaRDD writeStatuses) { logger.info("Comitting " + commitTime); Path commitFile = - new Path(config.getBasePath() + "/.hoodie/" + FSUtils.makeCommitFileName(commitTime)); + new Path(config.getBasePath() + "/.hoodie/" + FSUtils.makeCommitFileName(commitTime)); try { if (fs.exists(commitFile)) { @@ -304,13 +361,13 @@ public class HoodieWriteClient implements Seriali } List> stats = - writeStatuses.mapToPair(new PairFunction() { - @Override - public Tuple2 call(WriteStatus writeStatus) - throws Exception { - return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()); - } - }).collect(); + writeStatuses.mapToPair(new PairFunction() { + @Override + public Tuple2 call(WriteStatus writeStatus) + throws Exception { + return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()); + } + }).collect(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); for (Tuple2 stat : stats) { @@ -319,10 +376,10 @@ public class HoodieWriteClient implements Seriali // open a new file and write the commit metadata in Path inflightCommitFile = new Path(config.getBasePath() + "/.hoodie/" + FSUtils - .makeInflightCommitFileName(commitTime)); + .makeInflightCommitFileName(commitTime)); FSDataOutputStream fsout = fs.create(inflightCommitFile, true); fsout.writeBytes(new String(metadata.toJsonString().getBytes(StandardCharsets.UTF_8), - StandardCharsets.UTF_8)); + StandardCharsets.UTF_8)); fsout.close(); boolean success = fs.rename(inflightCommitFile, commitFile); @@ -331,10 +388,10 @@ public class HoodieWriteClient implements Seriali archiveLog.archiveIfRequired(); // Call clean to cleanup if there is anything to cleanup after the commit, clean(); - if(writeContext != null) { + if (writeContext != null) { long durationInMs = metrics.getDurationInMs(writeContext.stop()); metrics.updateCommitMetrics(FORMATTER.parse(commitTime).getTime(), durationInMs, - metadata); + metadata); writeContext = null; } } @@ -342,21 +399,18 @@ public class HoodieWriteClient implements Seriali return success; } catch (IOException e) { throw new HoodieCommitException( - "Failed to commit " + config.getBasePath() + " at time " + commitTime, e); + "Failed to commit " + config.getBasePath() + " at time " + commitTime, e); } catch (ParseException e) { throw new HoodieCommitException( - "Commit time is not of valid format.Failed to commit " + config.getBasePath() - + " at time " + commitTime, e); + "Commit time is not of valid format.Failed to commit " + config.getBasePath() + + " at time " + commitTime, e); } } /** - * Rollback the (inflight/committed) record changes with the given commit time. - * Three steps: - * (0) Obtain the commit or rollback file - * (1) clean indexing data, - * (2) clean new generated parquet files. - * (3) Finally delete .commit or .inflight file, + * Rollback the (inflight/committed) record changes with the given commit time. Three steps: (0) + * Obtain the commit or rollback file (1) clean indexing data, (2) clean new generated parquet + * files. (3) Finally delete .commit or .inflight file, */ public boolean rollback(final String commitTime) throws HoodieRollbackException { @@ -450,7 +504,7 @@ public class HoodieWriteClient implements Seriali /** * Clean up any stale/old files/data lying around (either on file storage or index storage) */ - private void clean() throws HoodieIOException { + private void clean() throws HoodieIOException { try { logger.info("Cleaner started"); final Timer.Context context = metrics.getCleanCtx(); @@ -459,26 +513,26 @@ public class HoodieWriteClient implements Seriali // shuffle to distribute cleaning work across partitions evenly Collections.shuffle(partitionsToClean); logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy()); - if(partitionsToClean.isEmpty()) { + if (partitionsToClean.isEmpty()) { logger.info("Nothing to clean here mom. It is already clean"); return; } int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); int numFilesDeleted = jsc.parallelize(partitionsToClean, cleanerParallelism) - .map(new Function() { - @Override - public Integer call(String partitionPathToClean) throws Exception { - FileSystem fs = FSUtils.getFs(); - HoodieCleaner cleaner = new HoodieCleaner(metadata, config, fs); - return cleaner.clean(partitionPathToClean); - } - }).reduce(new Function2() { - @Override - public Integer call(Integer v1, Integer v2) throws Exception { - return v1 + v2; - } - }); + .map(new Function() { + @Override + public Integer call(String partitionPathToClean) throws Exception { + FileSystem fs = FSUtils.getFs(); + HoodieCleaner cleaner = new HoodieCleaner(metadata, config, fs); + return cleaner.clean(partitionPathToClean); + } + }).reduce(new Function2() { + @Override + public Integer call(Integer v1, Integer v2) throws Exception { + return v1 + v2; + } + }); logger.info("Cleaned " + numFilesDeleted + " files"); // Emit metrics (duration, numFilesDeleted) if needed if (context != null) { @@ -504,18 +558,18 @@ public class HoodieWriteClient implements Seriali logger.info("Generate a new commit time " + commitTime); // Create the in-flight commit file Path inflightCommitFilePath = new Path( - config.getBasePath() + "/.hoodie/" + FSUtils.makeInflightCommitFileName(commitTime)); + config.getBasePath() + "/.hoodie/" + FSUtils.makeInflightCommitFileName(commitTime)); try { if (fs.createNewFile(inflightCommitFilePath)) { logger.info("Create an inflight commit file " + inflightCommitFilePath); return; } throw new HoodieCommitException( - "Failed to create the inflight commit file " + inflightCommitFilePath); + "Failed to create the inflight commit file " + inflightCommitFilePath); } catch (IOException e) { // handled below throw new HoodieCommitException( - "Failed to create the inflight commit file " + inflightCommitFilePath, e); + "Failed to create the inflight commit file " + inflightCommitFilePath, e); } } @@ -552,7 +606,6 @@ public class HoodieWriteClient implements Seriali /** * Cleanup all inflight commits - * @throws IOException */ private void rollbackInflightCommits() { final HoodieTableMetadata metadata = new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/InsertMapFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java similarity index 88% rename from hoodie-client/src/main/java/com/uber/hoodie/func/InsertMapFunction.java rename to hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java index 98703221e..55959270b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/InsertMapFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java @@ -30,15 +30,15 @@ import java.util.List; /** * Map function that handles a sorted stream of HoodieRecords */ -public class InsertMapFunction +public class BulkInsertMapFunction implements Function2>, Iterator>> { private String commitTime; private HoodieWriteConfig config; private HoodieTableMetadata metadata; - public InsertMapFunction(String commitTime, HoodieWriteConfig config, - HoodieTableMetadata metadata) { + public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config, + HoodieTableMetadata metadata) { this.commitTime = commitTime; this.config = config; this.metadata = metadata; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 64880f032..d8edb000b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -25,6 +25,7 @@ import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieTableMetadata; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.LazyInsertIterable; import com.uber.hoodie.io.HoodieUpdateHandle; @@ -376,7 +377,7 @@ public class HoodieCopyOnWriteTable extends Hoodi @Override public Partitioner getInsertPartitioner(WorkloadProfile profile) { - return null; + return getUpsertPartitioner(profile); } @Override @@ -385,7 +386,6 @@ public class HoodieCopyOnWriteTable extends Hoodi } - public Iterator> handleUpdate(String fileLoc, Iterator> recordItr) throws Exception { // these are updates HoodieUpdateHandle upsertHandle = @@ -449,4 +449,11 @@ public class HoodieCopyOnWriteTable extends Hoodi throw new HoodieUpsertException(msg, t); } } + + @Override + public Iterator> handleInsertPartition(Integer partition, + Iterator recordItr, + Partitioner partitioner) { + return handleUpsertPartition(partition, recordItr, partitioner); + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 9bf2f59fe..c79a55144 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -84,6 +84,17 @@ public abstract class HoodieTable implements Seri Iterator> recordIterator, Partitioner partitioner); + /** + * Perform the ultimate IO for a given inserted (RDD) partition + * + * @param partition + * @param recordIterator + * @param partitioner + */ + public abstract Iterator> handleInsertPartition(Integer partition, + Iterator> recordIterator, + Partitioner partitioner); + public static HoodieTable getHoodieTable(HoodieTableType type, String commitTime, diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index 6bcc4c56b..8981309e2 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -137,7 +137,7 @@ public class TestHoodieClient implements Serializable { JavaRDD smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1); // We create three parquet file, each having one record. (two different partitions) - List statuses = writeClient.insert(smallRecordsRDD, newCommitTime).collect(); + List statuses = writeClient.bulkInsert(smallRecordsRDD, newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); @@ -158,7 +158,7 @@ public class TestHoodieClient implements Serializable { List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); - JavaRDD result = client.insert(writeRecords, newCommitTime); + JavaRDD result = client.bulkInsert(writeRecords, newCommitTime); assertFalse("If Autocommit is false, then commit should not be made automatically", HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); @@ -169,7 +169,7 @@ public class TestHoodieClient implements Serializable { newCommitTime = "002"; records = dataGen.generateUpdates(newCommitTime, 100); JavaRDD updateRecords = jsc.parallelize(records, 1); - result = client.upsert(writeRecords, newCommitTime); + result = client.upsert(updateRecords, newCommitTime); assertFalse("If Autocommit is false, then commit should not be made automatically", HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); assertTrue("Commit should succeed", client.commit(newCommitTime, result)); @@ -542,24 +542,27 @@ public class TestHoodieClient implements Serializable { } + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) { + HoodieWriteConfig.Builder builder = getConfigBuilder(); + return builder.withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .compactionSmallFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 15) + .insertSplitSize(insertSplitSize).build()) // tolerate upto 15 records + .withStorageConfig(HoodieStorageConfig.newBuilder() + .limitFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 20) + .build()) + .build(); + } + @Test - public void testSmallInsertHandling() throws Exception { + public void testSmallInsertHandlingForUpserts() throws Exception { - HoodieWriteConfig.Builder builder = getConfigBuilder(); FileSystem fs = FSUtils.getFs(); - - final String TEST_PARTITION_PATH = "2016/09/26"; final int INSERT_SPLIT_LIMIT = 10; - // based on examination of sample file, the schema produces the following per record size - final int SIZE_PER_RECORD = 50 * 1024; // setup the small file handling params - HoodieWriteConfig config = builder.withCompactionConfig( - HoodieCompactionConfig.newBuilder().compactionSmallFileSize(SIZE_PER_RECORD * 15) - .insertSplitSize(INSERT_SPLIT_LIMIT).build()) // tolerate upto 15 records - .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(SIZE_PER_RECORD * 20) - .build()).build(); // hold upto 20 records max + HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH}); HoodieWriteClient client = new HoodieWriteClient(jsc, config); @@ -651,6 +654,79 @@ public class TestHoodieClient implements Serializable { assertEquals("Total inserts in commit3 must add up", keys3.size(), numTotalInsertsInCommit3); } + @Test + public void testSmallInsertHandlingForInserts() throws Exception { + + final String TEST_PARTITION_PATH = "2016/09/26"; + final int INSERT_SPLIT_LIMIT = 10; + // setup the small file handling params + HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max + dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH}); + + HoodieWriteClient client = new HoodieWriteClient(jsc, config); + + // Inserts => will write file1 + String commitTime1 = "001"; + List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb + Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); + JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); + List statuses= client.insert(insertRecordsRDD1, commitTime1).collect(); + + assertNoWriteErrors(statuses); + + assertEquals("Just 1 file needs to be added.", 1, statuses.size()); + String file1 = statuses.get(0).getFileId(); + assertEquals("file should contain 10 records", + ParquetUtils.readRowKeysFromParquet(new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(), + 10); + + // Second, set of Inserts should just expand file1 + String commitTime2 = "002"; + List inserts2 = dataGen.generateInserts(commitTime2, 4); + Set keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); + JavaRDD insertRecordsRDD2 = jsc.parallelize(inserts2, 1); + statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); + assertNoWriteErrors(statuses); + + assertEquals("Just 1 file needs to be updated.", 1, statuses.size()); + assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId()); + assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit()); + Path newFile = new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1)); + assertEquals("file should contain 14 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 14); + + List records = ParquetUtils.readAvroRecords(newFile); + for (GenericRecord record: records) { + String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); + assertTrue("Record expected to be part of commit 1 or commit2", commitTime1.equals(recCommitTime) || commitTime2.equals(recCommitTime)); + assertTrue("key expected to be part of commit 1 or commit2", keys2.contains(recordKey) || keys1.contains(recordKey)); + } + + // Lots of inserts such that file1 is updated and expanded, a new file2 is created. + String commitTime3 = "003"; + List insert3 = dataGen.generateInserts(commitTime3, 20); + JavaRDD insertRecordsRDD3 = jsc.parallelize(insert3, 1); + statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); + assertNoWriteErrors(statuses); + assertEquals("2 files needs to be committed.", 2, statuses.size()); + + + FileSystem fs = FSUtils.getFs(); + HoodieTableMetadata metadata = new HoodieTableMetadata(fs, basePath); + FileStatus[] files = metadata.getLatestVersionInPartition(fs, TEST_PARTITION_PATH, commitTime3); + assertEquals("Total of 2 valid data files", 2, files.length); + + + int totalInserts = 0; + for (FileStatus file: files) { + assertEquals("All files must be at commit 3", commitTime3, FSUtils.getCommitTime(file.getPath().getName())); + records = ParquetUtils.readAvroRecords(file.getPath()); + totalInserts += records.size(); + } + assertEquals("Total number of records must add up", totalInserts, inserts1.size() + inserts2.size() + insert3.size()); + } + + @After public void clean() { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index 00010f95a..ecd87a4ec 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -62,6 +62,9 @@ public class HoodieTestDataGenerator { + "{\"name\": \"end_lon\", \"type\": \"double\"}," + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + // based on examination of sample file, the schema produces the following per record size + public static final int SIZE_PER_RECORD = 50 * 1024; + private List existingKeysList = new ArrayList<>(); private static Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));