1
0

[HUDI-3458] Fix BulkInsertPartitioner generic type (#4854)

This commit is contained in:
Raymond Xu
2022-02-20 10:51:58 -08:00
committed by GitHub
parent 66ac1446dd
commit 0938f55a2b
10 changed files with 31 additions and 30 deletions

View File

@@ -34,7 +34,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instantTime, public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe, BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner); Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
/** /**
* Only write input records. Does not change timeline/index. Return information about new files created. * Only write input records. Does not change timeline/index. Return information about new files created.
@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
public abstract O bulkInsert(I inputRecords, String instantTime, public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
boolean performDedupe, boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner, Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner,
boolean addMetadataFields, boolean addMetadataFields,
int parallelism, int parallelism,
WriteHandleFactory writeHandleFactory); WriteHandleFactory writeHandleFactory);

View File

@@ -121,7 +121,7 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
* @param schema Schema of the data including metadata fields. * @param schema Schema of the data including metadata fields.
* @return empty for now. * @return empty for now.
*/ */
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) { protected Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new JavaCustomColumnsSortPartitioner( return Option.of(new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),

View File

@@ -65,7 +65,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
final HoodieWriteConfig config, final HoodieWriteConfig config,
final BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor, final BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor,
final boolean performDedupe, final boolean performDedupe,
final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) { final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata(); HoodieWriteMetadata result = new HoodieWriteMetadata();
// It's possible the transition to inflight could have already happened. // It's possible the transition to inflight could have already happened.
@@ -89,7 +89,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieWriteConfig config, HoodieWriteConfig config,
boolean performDedupe, boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema, boolean useWriterSchema,
int parallelism, int parallelism,
WriteHandleFactory writeHandleFactory) { WriteHandleFactory writeHandleFactory) {

View File

@@ -26,9 +26,8 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List; import java.util.List;
@@ -37,12 +36,12 @@ public class JavaBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPay
extends BaseJavaCommitActionExecutor<T> { extends BaseJavaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> preppedInputRecord; private final List<HoodieRecord<T>> preppedInputRecord;
private final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner; private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner;
public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context, public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
HoodieWriteConfig config, HoodieTable table, HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> preppedInputRecord, String instantTime, List<HoodieRecord<T>> preppedInputRecord,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) { Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT); super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecord = preppedInputRecord; this.preppedInputRecord = preppedInputRecord;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner; this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;

View File

@@ -18,10 +18,6 @@
package org.apache.hudi.client.clustering.run.strategy; package org.apache.hudi.client.clustering.run.strategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -60,6 +56,11 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@@ -134,7 +135,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
* @param schema Schema of the data including metadata fields. * @param schema Schema of the data including metadata fields.
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty. * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty.
*/ */
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) { protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
Option<String[]> orderByColumnsOpt = Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(",")); .map(listStr -> listStr.split(","));

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import java.util.Map; import java.util.Map;
@@ -36,17 +37,17 @@ import java.util.Map;
public class SparkBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> { public class SparkBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD; private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner; private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD, String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) { Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty()); this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
} }
public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD, String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
Option<Map<String, String>> extraMetadata) { Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata); super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecordsRDD = inputRecordsRDD; this.inputRecordsRDD = inputRecordsRDD;

View File

@@ -67,7 +67,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
final HoodieWriteConfig config, final HoodieWriteConfig config,
final BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor, final BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor,
final boolean performDedupe, final boolean performDedupe,
final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) { final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata(); HoodieWriteMetadata result = new HoodieWriteMetadata();
//transition bulk_insert state to inflight //transition bulk_insert state to inflight
@@ -88,7 +88,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
HoodieWriteConfig config, HoodieWriteConfig config,
boolean performDedupe, boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema, boolean useWriterSchema,
int parallelism, int parallelism,
WriteHandleFactory writeHandleFactory) { WriteHandleFactory writeHandleFactory) {

View File

@@ -26,22 +26,22 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
public class SparkBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>> public class SparkBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> { extends BaseSparkCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd; private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
private final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner; private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner;
public SparkBulkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext context, public SparkBulkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd, String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) { Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT); super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd; this.preppedInputRecordRdd = preppedInputRecordRdd;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner; this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;

View File

@@ -39,17 +39,17 @@ public class SparkBulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayl
extends BaseSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD; private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner; private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD, String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) { Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty()); this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
} }
public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD, String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
Option<Map<String, String>> extraMetadata) { Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata); super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecordsRDD = inputRecordsRDD; this.inputRecordsRDD = inputRecordsRDD;

View File

@@ -37,12 +37,12 @@ public class SparkBulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRec
extends BaseSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd; private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner; private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
public SparkBulkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context, public SparkBulkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table, HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd, String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) { Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT); super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd; this.preppedInputRecordRdd = preppedInputRecordRdd;
this.bulkInsertPartitioner = bulkInsertPartitioner; this.bulkInsertPartitioner = bulkInsertPartitioner;