Adding support for UserDefinedBulkInsertPartitioner
This commit is contained in:
committed by
vinoth chandar
parent
ec40d04d51
commit
5c639c0b05
@@ -51,6 +51,7 @@ import com.uber.hoodie.func.BulkInsertMapFunction;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
import com.uber.hoodie.io.HoodieCommitArchiveLog;
|
||||
import com.uber.hoodie.metrics.HoodieMetrics;
|
||||
import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import com.uber.hoodie.table.WorkloadProfile;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@@ -215,6 +216,27 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
|
||||
*/
|
||||
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
|
||||
return bulkInsert(records, commitTime, Option.empty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
|
||||
* loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
|
||||
* Hoodie).
|
||||
*
|
||||
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and
|
||||
* attempts to control the numbers of files with less memory compared to the {@link
|
||||
* HoodieWriteClient#insert(JavaRDD, String)}. Optionally it allows users to specify their own partitioner. If
|
||||
* specified then it will be used for repartitioning records. See {@link UserDefinedBulkInsertPartitioner}.
|
||||
*
|
||||
* @param records HoodieRecords to insert
|
||||
* @param commitTime Commit Time handle
|
||||
* @param bulkInsertPartitioner If specified then it will be used to partition input records before they are
|
||||
* inserted into hoodie.
|
||||
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
|
||||
*/
|
||||
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime,
|
||||
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
|
||||
writeContext = metrics.getCommitCtx();
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable
|
||||
@@ -225,8 +247,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
JavaRDD<HoodieRecord<T>> dedupedRecords =
|
||||
combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());
|
||||
|
||||
// Now, sort the records and line them up nicely for loading.
|
||||
JavaRDD<HoodieRecord<T>> sortedRecords = dedupedRecords
|
||||
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
|
||||
if (bulkInsertPartitioner.isDefined()) {
|
||||
repartitionedRecords =
|
||||
bulkInsertPartitioner.get().repartitionRecords(dedupedRecords,
|
||||
config.getBulkInsertShuffleParallelism());
|
||||
} else {
|
||||
// Now, sort the records and line them up nicely for loading.
|
||||
repartitionedRecords = dedupedRecords
|
||||
.sortBy(record -> {
|
||||
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
|
||||
// the records split evenly across RDD partitions, such that small partitions fit
|
||||
@@ -234,7 +262,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
return String
|
||||
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
|
||||
}, true, config.getBulkInsertShuffleParallelism());
|
||||
JavaRDD<WriteStatus> writeStatusRDD = sortedRecords
|
||||
}
|
||||
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
|
||||
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
|
||||
.flatMap(writeStatuses -> writeStatuses.iterator());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user