diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java index 29a13509a..dffd926ae 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java @@ -34,7 +34,7 @@ public abstract class BaseBulkInsertHelper bulkInsert(I inputRecords, String instantTime, HoodieTable table, HoodieWriteConfig config, BaseCommitActionExecutor executor, boolean performDedupe, - Option> userDefinedBulkInsertPartitioner); + Option> userDefinedBulkInsertPartitioner); /** * Only write input records. Does not change timeline/index. Return information about new files created. @@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper table, HoodieWriteConfig config, boolean performDedupe, - Option> userDefinedBulkInsertPartitioner, + Option> userDefinedBulkInsertPartitioner, boolean addMetadataFields, int parallelism, WriteHandleFactory writeHandleFactory); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 7e67b087f..8cbe0d7fa 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -121,7 +121,7 @@ public abstract class JavaExecutionStrategy> * @param schema Schema of the data including metadata fields. * @return empty for now. */ - protected Option> getPartitioner(Map strategyParams, Schema schema) { + protected Option>>> getPartitioner(Map strategyParams, Schema schema) { if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { return Option.of(new JavaCustomColumnsSortPartitioner( strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 87b7fe74e..de7afdf00 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -65,7 +65,7 @@ public class JavaBulkInsertHelper extends Base final HoodieWriteConfig config, final BaseCommitActionExecutor>, List, List, R> executor, final boolean performDedupe, - final Option> userDefinedBulkInsertPartitioner) { + final Option>>> userDefinedBulkInsertPartitioner) { HoodieWriteMetadata result = new HoodieWriteMetadata(); // It's possible the transition to inflight could have already happened. @@ -89,7 +89,7 @@ public class JavaBulkInsertHelper extends Base HoodieTable>, List, List> table, HoodieWriteConfig config, boolean performDedupe, - Option> userDefinedBulkInsertPartitioner, + Option>>> userDefinedBulkInsertPartitioner, boolean useWriterSchema, int parallelism, WriteHandleFactory writeHandleFactory) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java index 37b56b632..ed72fbe78 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java @@ -26,9 +26,8 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; -import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.BulkInsertPartitioner; - +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import java.util.List; @@ -37,12 +36,12 @@ public class JavaBulkInsertPreppedCommitActionExecutor { private final List> preppedInputRecord; - private final Option> userDefinedBulkInsertPartitioner; + private final Option>>> userDefinedBulkInsertPartitioner; public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, List> preppedInputRecord, - Option> userDefinedBulkInsertPartitioner) { + Option>>> userDefinedBulkInsertPartitioner) { super(context, config, table, instantTime, WriteOperationType.BULK_INSERT); this.preppedInputRecord = preppedInputRecord; this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner; @@ -60,4 +59,4 @@ public class JavaBulkInsertPreppedCommitActionExecutor> getPartitioner(Map strategyParams, Schema schema) { + protected Option>>> getPartitioner(Map strategyParams, Schema schema) { Option orderByColumnsOpt = Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) .map(listStr -> listStr.split(",")); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java index 2b00d47b0..f4f1d3ad0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java @@ -29,6 +29,7 @@ import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; + import org.apache.spark.api.java.JavaRDD; import java.util.Map; @@ -36,17 +37,17 @@ import java.util.Map; public class SparkBulkInsertCommitActionExecutor> extends BaseSparkCommitActionExecutor { private final JavaRDD> inputRecordsRDD; - private final Option> bulkInsertPartitioner; + private final Option>>> bulkInsertPartitioner; public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD> inputRecordsRDD, - Option> bulkInsertPartitioner) { + Option>>> bulkInsertPartitioner) { this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty()); } public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD> inputRecordsRDD, - Option> bulkInsertPartitioner, + Option>>> bulkInsertPartitioner, Option> extraMetadata) { super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata); this.inputRecordsRDD = inputRecordsRDD; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index d17b9b45b..d0c5ddef5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -67,7 +67,7 @@ public class SparkBulkInsertHelper extends Bas final HoodieWriteConfig config, final BaseCommitActionExecutor>, JavaRDD, JavaRDD, R> executor, final boolean performDedupe, - final Option> userDefinedBulkInsertPartitioner) { + final Option>>> userDefinedBulkInsertPartitioner) { HoodieWriteMetadata result = new HoodieWriteMetadata(); //transition bulk_insert state to inflight @@ -88,7 +88,7 @@ public class SparkBulkInsertHelper extends Bas HoodieTable>, JavaRDD, JavaRDD> table, HoodieWriteConfig config, boolean performDedupe, - Option> userDefinedBulkInsertPartitioner, + Option>>> userDefinedBulkInsertPartitioner, boolean useWriterSchema, int parallelism, WriteHandleFactory writeHandleFactory) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java index e6b680949..28d8cb0b2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java @@ -26,22 +26,22 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; -import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.BulkInsertPartitioner; - +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; + import org.apache.spark.api.java.JavaRDD; public class SparkBulkInsertPreppedCommitActionExecutor> extends BaseSparkCommitActionExecutor { private final JavaRDD> preppedInputRecordRdd; - private final Option> userDefinedBulkInsertPartitioner; + private final Option>>> userDefinedBulkInsertPartitioner; public SparkBulkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD> preppedInputRecordRdd, - Option> userDefinedBulkInsertPartitioner) { + Option>>> userDefinedBulkInsertPartitioner) { super(context, config, table, instantTime, WriteOperationType.BULK_INSERT); this.preppedInputRecordRdd = preppedInputRecordRdd; this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner; @@ -60,4 +60,4 @@ public class SparkBulkInsertPreppedCommitActionExecutor { private final JavaRDD> inputRecordsRDD; - private final Option> bulkInsertPartitioner; + private final Option>>> bulkInsertPartitioner; public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD> inputRecordsRDD, - Option> bulkInsertPartitioner) { + Option>>> bulkInsertPartitioner) { this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty()); } public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD> inputRecordsRDD, - Option> bulkInsertPartitioner, + Option>>> bulkInsertPartitioner, Option> extraMetadata) { super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata); this.inputRecordsRDD = inputRecordsRDD; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java index a4d7493e8..be5b903c7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java @@ -37,12 +37,12 @@ public class SparkBulkInsertPreppedDeltaCommitActionExecutor { private final JavaRDD> preppedInputRecordRdd; - private final Option> bulkInsertPartitioner; + private final Option>>> bulkInsertPartitioner; public SparkBulkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD> preppedInputRecordRdd, - Option> bulkInsertPartitioner) { + Option>>> bulkInsertPartitioner) { super(context, config, table, instantTime, WriteOperationType.BULK_INSERT); this.preppedInputRecordRdd = preppedInputRecordRdd; this.bulkInsertPartitioner = bulkInsertPartitioner;