[HUDI-472] Introduce configurations and new modes of sorting for bulk_insert (#1149)
* [HUDI-472] Introduce the configuration and new modes of record sorting for bulk_insert(#1149). Three sorting modes are implemented: global sort ("global_sort"), local sort inside each RDD partition ("partition_sort") and no sort ("none")
This commit is contained in:
@@ -42,7 +42,7 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.keygen.KeyGenerator;
|
||||
import org.apache.hudi.keygen.parser.HoodieDateTimeParser;
|
||||
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
|
||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
|
||||
import org.apache.avro.LogicalTypes;
|
||||
import org.apache.avro.Schema;
|
||||
@@ -200,13 +200,13 @@ public class DataSourceUtils {
|
||||
* if the class name of UserDefinedBulkInsertPartitioner is configured through the HoodieWriteConfig.
|
||||
* @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass()
|
||||
*/
|
||||
private static Option<UserDefinedBulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config)
|
||||
private static Option<BulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config)
|
||||
throws HoodieException {
|
||||
String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass();
|
||||
try {
|
||||
return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
|
||||
? Option.empty() :
|
||||
Option.of((UserDefinedBulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
|
||||
Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
|
||||
} catch (Throwable e) {
|
||||
throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e);
|
||||
}
|
||||
@@ -258,7 +258,7 @@ public class DataSourceUtils {
|
||||
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
|
||||
String instantTime, String operation) throws HoodieException {
|
||||
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
|
||||
Option<UserDefinedBulkInsertPartitioner> userDefinedBulkInsertPartitioner =
|
||||
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner =
|
||||
createUserDefinedBulkInsertPartitioner(client.getConfig());
|
||||
return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner);
|
||||
} else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
|
||||
|
||||
@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
|
||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
|
||||
@@ -60,12 +60,17 @@ public class DataSourceTestUtils {
|
||||
}
|
||||
|
||||
public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload>
|
||||
implements UserDefinedBulkInsertPartitioner<T> {
|
||||
implements BulkInsertPartitioner<T> {
|
||||
|
||||
@Override
|
||||
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
|
||||
return records;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean arePartitionRecordsSorted() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user