diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index b8d5948c1..3a3e04a6b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -233,6 +233,9 @@ public abstract class BaseCommitActionExecutor> writeMetadata = ( (ClusteringExecutionStrategy>, HoodieData, HoodieData>) diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java index 7751833fc..d34673c2d 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java @@ -37,7 +37,6 @@ import org.apache.log4j.Logger; import java.util.List; import java.util.Map; -import java.util.Properties; /** * Clustering Strategy based on following. @@ -60,13 +59,12 @@ public class JavaSortAndSizeExecutionStrategy> final String instantTime, final Map strategyParams, final Schema schema, final List fileGroupIdList, final boolean preserveHoodieMetadata) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); - Properties props = getWriteConfig().getProps(); - props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups)); - // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. - props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); - props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder() - .withEngineType(EngineType.JAVA).withProps(props).build(); + .withBulkInsertParallelism(numOutputGroups) + .withEngineType(EngineType.JAVA) + .withProps(getWriteConfig().getProps()).build(); + newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); return (List) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 30f1d931a..39b291673 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -111,7 +112,7 @@ public class JavaBulkInsertHelper extends Base FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass( config.getFileIdPrefixProviderClassName(), - config.getProps()); + new TypedProperties(config.getProps())); List writeStatuses = new ArrayList<>(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index 4dedabaec..4a7ee7bce 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -38,7 +38,6 @@ import org.apache.log4j.Logger; import java.util.List; import java.util.Map; -import java.util.Properties; /** * This strategy is similar to {@link SparkSortAndSizeExecutionStrategy} with the difference being that @@ -67,13 +66,12 @@ public class SparkSingleFileSortExecutionStrategy) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index d664c835e..7db63d416 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -36,7 +36,6 @@ import org.apache.log4j.Logger; import java.util.List; import java.util.Map; -import java.util.Properties; /** * Clustering Strategy based on following. @@ -58,12 +57,11 @@ public class SparkSortAndSizeExecutionStrategy> final String instantTime, final Map strategyParams, final Schema schema, final List fileGroupIdList, final boolean preserveHoodieMetadata) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); - Properties props = getWriteConfig().getProps(); - props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups)); - // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. - props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); - props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); - HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); + + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder() + .withBulkInsertParallelism(numOutputGroups) + .withProps(getWriteConfig().getProps()).build(); + newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); return (HoodieData) SparkBulkInsertHelper.newInstance() .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); }