diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index c24227d39..ae2cf88fe 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -51,6 +51,7 @@ import com.uber.hoodie.func.BulkInsertMapFunction; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.HoodieCommitArchiveLog; import com.uber.hoodie.metrics.HoodieMetrics; +import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner; import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.WorkloadProfile; import org.apache.hadoop.fs.FileStatus; @@ -215,6 +216,27 @@ public class HoodieWriteClient implements Seriali * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD bulkInsert(JavaRDD> records, final String commitTime) { + return bulkInsert(records, commitTime, Option.empty()); + } + + /** + * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk + * loads into a Hoodie table for the very first time (e.g: converting an existing dataset to + * Hoodie). + * + * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and + * attempts to control the numbers of files with less memory compared to the {@link + * HoodieWriteClient#insert(JavaRDD, String)}. Optionally it allows users to specify their own partitioner. If + * specified then it will be used for repartitioning records. See {@link UserDefinedBulkInsertPartitioner}. + * + * @param records HoodieRecords to insert + * @param commitTime Commit Time handle + * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are + * inserted into hoodie. + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts + */ + public JavaRDD bulkInsert(JavaRDD> records, final String commitTime, + Option bulkInsertPartitioner) { writeContext = metrics.getCommitCtx(); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable @@ -225,8 +247,14 @@ public class HoodieWriteClient implements Seriali JavaRDD> dedupedRecords = combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism()); - // Now, sort the records and line them up nicely for loading. - JavaRDD> sortedRecords = dedupedRecords + final JavaRDD> repartitionedRecords; + if (bulkInsertPartitioner.isDefined()) { + repartitionedRecords = + bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, + config.getBulkInsertShuffleParallelism()); + } else { + // Now, sort the records and line them up nicely for loading. + repartitionedRecords = dedupedRecords .sortBy(record -> { // Let's use "partitionPath + key" as the sort key. Spark, will ensure // the records split evenly across RDD partitions, such that small partitions fit @@ -234,7 +262,8 @@ public class HoodieWriteClient implements Seriali return String .format("%s+%s", record.getPartitionPath(), record.getRecordKey()); }, true, config.getBulkInsertShuffleParallelism()); - JavaRDD writeStatusRDD = sortedRecords + } + JavaRDD writeStatusRDD = repartitionedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, table), true) .flatMap(writeStatuses -> writeStatuses.iterator()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/UserDefinedBulkInsertPartitioner.java b/hoodie-client/src/main/java/com/uber/hoodie/table/UserDefinedBulkInsertPartitioner.java new file mode 100644 index 000000000..9a676f4e7 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/UserDefinedBulkInsertPartitioner.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.uber.hoodie.table; + +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import org.apache.spark.api.java.JavaRDD; + +/** + * Repartition input records into at least expected number of output spark partitions. It should give + * below guarantees + * - Output spark partition will have records from only one hoodie partition. + * - Average records per output spark partitions should be almost equal to (#inputRecords / #outputSparkPartitions) + * to avoid possible skews. + */ +public interface UserDefinedBulkInsertPartitioner { + + JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions); +}