From bb6a19e7d71a91f23e17bf2fe5dcb8ecf4affde2 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 10 Nov 2021 09:47:50 +0530 Subject: [PATCH] [HUDI-1877] Support records staying in same fileId after clustering (#3833) * [HUDI-1877] Support records staying in same fileId after clustering Add plan strategy * Ensure same filegroup id and refactor based on comments --- .../apache/hudi/io/CreateHandleFactory.java | 4 +- .../io/SingleFileHandleCreateFactory.java | 9 ++- .../apache/hudi/io/WriteHandleFactory.java | 4 +- .../commit/AbstractBulkInsertHelper.java | 3 +- .../action/commit/JavaBulkInsertHelper.java | 5 +- .../SparkSingleFileSortPlanStrategy.java | 60 ++++++++++++++ .../SparkSizeBasedClusteringPlanStrategy.java | 18 +++-- .../MultipleSparkJobExecutionStrategy.java | 48 +++++++++-- .../SparkSingleFileSortExecutionStrategy.java | 80 +++++++++++++++++++ .../SparkSortAndSizeExecutionStrategy.java | 31 +------ .../bulkinsert/BulkInsertMapFunction.java | 10 +-- ...ExecuteClusteringCommitActionExecutor.java | 12 ++- .../action/commit/SparkBulkInsertHelper.java | 12 ++- .../TestHoodieClientOnCopyOnWriteStorage.java | 65 ++++++++++----- 14 files changed, 280 insertions(+), 81 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java index 9e23b6f2c..09131b421 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java @@ -23,7 +23,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; -public class CreateHandleFactory extends WriteHandleFactory { +import java.io.Serializable; + +public class CreateHandleFactory extends WriteHandleFactory implements Serializable { private boolean preserveMetadata = false; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java index dd917cb3c..a3f7c04ef 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java @@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; +import java.io.Serializable; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -31,11 +32,11 @@ import java.util.concurrent.atomic.AtomicBoolean; *

* Please use this with caution. This can end up creating very large files if not used correctly. */ -public class SingleFileHandleCreateFactory extends WriteHandleFactory { +public class SingleFileHandleCreateFactory extends CreateHandleFactory implements Serializable { - private AtomicBoolean isHandleCreated = new AtomicBoolean(false); - private String fileId; - private boolean preserveHoodieMetadata; + private final AtomicBoolean isHandleCreated = new AtomicBoolean(false); + private final String fileId; + private final boolean preserveHoodieMetadata; public SingleFileHandleCreateFactory(String fileId, boolean preserveHoodieMetadata) { super(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java index d2923d447..36fae304d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java @@ -23,7 +23,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; -public abstract class WriteHandleFactory { +import java.io.Serializable; + +public abstract class WriteHandleFactory implements Serializable { private int numFilesWritten = 0; public abstract HoodieWriteHandle create(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java index 850f3e076..6e1ddeb72 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -44,5 +45,5 @@ public abstract class AbstractBulkInsertHelper> userDefinedBulkInsertPartitioner, boolean addMetadataFields, int parallelism, - boolean preserveMetadata); + WriteHandleFactory writeHandleFactory); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index b7ea916aa..cdfa303cd 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -29,6 +29,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.JavaLazyInsertIterable; import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory; import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.FileIdPrefixProvider; import org.apache.hudi.table.HoodieTable; @@ -76,7 +77,7 @@ public class JavaBulkInsertHelper extends Abst } // write new files - List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false); + List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -91,7 +92,7 @@ public class JavaBulkInsertHelper extends Abst Option> userDefinedBulkInsertPartitioner, boolean useWriterSchema, int parallelism, - boolean preserveHoodieMetadata) { + WriteHandleFactory writeHandleFactory) { // De-dupe/merge if needed List> dedupedRecords = inputRecords; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java new file mode 100644 index 000000000..b98dbac39 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkMergeOnReadTable; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * In this strategy, clustering group for each partition is built in the same way as {@link SparkSizeBasedClusteringPlanStrategy}. + * The difference is that the output groups is 1 and file group id remains the same. + */ +public class SparkSingleFileSortPlanStrategy> + extends SparkSizeBasedClusteringPlanStrategy { + + public SparkSingleFileSortPlanStrategy(HoodieSparkCopyOnWriteTable table, HoodieSparkEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public SparkSingleFileSortPlanStrategy(HoodieSparkMergeOnReadTable table, HoodieSparkEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + protected Stream buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { + List, Integer>> fileSliceGroups = fileSlices.stream() + .map(fileSlice -> Pair.of(Collections.singletonList(fileSlice), 1)).collect(Collectors.toList()); + return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputFileGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java index d58619b2b..c89ff2f2b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java @@ -71,24 +71,30 @@ public class SparkSizeBasedClusteringPlanStrategy, Integer>> fileSliceGroups = new ArrayList<>(); List currentGroup = new ArrayList<>(); long totalSizeSoFar = 0; + HoodieWriteConfig writeConfig = getWriteConfig(); for (FileSlice currentSlice : fileSlices) { // assume each filegroup size is ~= parquet.max.file.size - totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize(); + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); // check if max size is reached and create new group, if needed. - if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { - int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()); + if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: " - + getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); currentGroup = new ArrayList<>(); totalSizeSoFar = 0; } currentGroup.add(currentSlice); + // totalSizeSoFar could be 0 when new group was created in the previous conditional block. + // reset to the size of current slice, otherwise the number of output file group will become 0 even though current slice is present. + if (totalSizeSoFar == 0) { + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); + } } if (!currentGroup.isEmpty()) { - int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()); + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: " - + getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 4bf81dde8..044b77362 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -34,19 +34,21 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.table.log.HoodieFileSliceReader; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; +import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; @@ -69,6 +71,9 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; +import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; + /** * Clustering strategy to submit multiple spark jobs and union the results. */ @@ -102,21 +107,50 @@ public abstract class MultipleSparkJobExecutionStrategy performClusteringWithRecordsRDD(final JavaRDD> inputRecords, final int numOutputGroups, final String instantTime, final Map strategyParams, final Schema schema, final List fileGroupIdList, final boolean preserveHoodieMetadata); + /** + * Create {@link BulkInsertPartitioner} based on strategy params. + * + * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. + * @param schema Schema of the data including metadata fields. + * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty. + */ + protected Option> getPartitioner(Map strategyParams, Schema schema) { + if (getWriteConfig().isLayoutOptimizationEnabled()) { + // sort input records by z-order/hilbert + return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(), + getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema))); + } else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { + return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), + HoodieAvroUtils.addMetadataFields(schema))); + } else { + return Option.empty(); + } + } + /** * Submit job to execute clustering for the group. */ private CompletableFuture> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map strategyParams, boolean preserveHoodieMetadata, String instantTime) { - CompletableFuture> writeStatusesFuture = CompletableFuture.supplyAsync(() -> { + return CompletableFuture.supplyAsync(() -> { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); JavaRDD> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); @@ -125,8 +159,6 @@ public abstract class MultipleSparkJobExecutionStrategy> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) { List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList()); - boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent(); + boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); if (hasLogFiles) { // if there are log files, we read all records into memory for a file group and apply updates. return readRecordsForGroupWithLogs(jsc, clusteringOps, instantTime); @@ -174,7 +206,7 @@ public abstract class MultipleSparkJobExecutionStrategy> + extends MultipleSparkJobExecutionStrategy { + + private static final Logger LOG = LogManager.getLogger(SparkSingleFileSortExecutionStrategy.class); + + public SparkSingleFileSortExecutionStrategy(HoodieTable table, + HoodieEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + public JavaRDD performClusteringWithRecordsRDD(JavaRDD> inputRecords, + int numOutputGroups, + String instantTime, + Map strategyParams, + Schema schema, + List fileGroupIdList, + boolean preserveHoodieMetadata) { + if (numOutputGroups != 1 || fileGroupIdList.size() != 1) { + throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName()); + } + LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); + Properties props = getWriteConfig().getProps(); + props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups)); + // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. + props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); + // Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value. + props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(Long.MAX_VALUE)); + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); + return (JavaRDD) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, + false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata)); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index f2cecc9b4..3f89745ab 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -18,19 +18,14 @@ package org.apache.hudi.client.clustering.run.strategy; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; -import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner; -import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; @@ -43,8 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; - /** * Clustering Strategy based on following. * 1) Spark execution engine. @@ -62,8 +55,8 @@ public class SparkSortAndSizeExecutionStrategy> @Override public JavaRDD performClusteringWithRecordsRDD(final JavaRDD> inputRecords, final int numOutputGroups, - final String instantTime, final Map strategyParams, final Schema schema, - final List fileGroupIdList, final boolean preserveHoodieMetadata) { + final String instantTime, final Map strategyParams, final Schema schema, + final List fileGroupIdList, final boolean preserveHoodieMetadata) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); Properties props = getWriteConfig().getProps(); props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups)); @@ -72,22 +65,6 @@ public class SparkSortAndSizeExecutionStrategy> props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); return (JavaRDD) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, getPartitioner(strategyParams, schema), true, numOutputGroups, preserveHoodieMetadata); - } - - /** - * Create BulkInsertPartitioner based on strategy params. - */ - protected Option> getPartitioner(Map strategyParams, Schema schema) { - if (getWriteConfig().isLayoutOptimizationEnabled()) { - // sort input records by z-order/hilbert - return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(), - getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema))); - } else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { - return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), - HoodieAvroUtils.addMetadataFields(schema))); - } else { - return Option.empty(); - } + false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java index f7d04df13..24cdd7060 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.SparkLazyInsertIterable; -import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.function.Function2; @@ -43,25 +43,25 @@ public class BulkInsertMapFunction private HoodieTable hoodieTable; private List fileIDPrefixes; private boolean useWriterSchema; - private boolean preserveMetadata; + private WriteHandleFactory writeHandleFactory; public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted, HoodieWriteConfig config, HoodieTable hoodieTable, List fileIDPrefixes, boolean useWriterSchema, - boolean preserveMetadata) { + WriteHandleFactory writeHandleFactory) { this.instantTime = instantTime; this.areRecordsSorted = areRecordsSorted; this.config = config; this.hoodieTable = hoodieTable; this.fileIDPrefixes = fileIDPrefixes; this.useWriterSchema = useWriterSchema; - this.preserveMetadata = preserveMetadata; + this.writeHandleFactory = writeHandleFactory; } @Override public Iterator> call(Integer partition, Iterator> recordItr) { return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, - new CreateHandleFactory(preserveMetadata)); + writeHandleFactory); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index e734b4a40..1e8b7bead 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroupId; @@ -48,7 +49,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -115,9 +115,13 @@ public class SparkExecuteClusteringCommitActionExecutor> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeMetadata) { - Set newFilesWritten = new HashSet(writeMetadata.getWriteStats().get().stream() - .map(s -> new HoodieFileGroupId(s.getPartitionPath(),s.getFileId())) - .collect(Collectors.toList())); + Set newFilesWritten = writeMetadata.getWriteStats().get().stream() + .map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet()); + // for the below execution strategy, new filegroup id would be same as old filegroup id + if (SparkSingleFileSortExecutionStrategy.class.getName().equals(config.getClusteringExecutionStrategyClass())) { + return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan) + .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); + } return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan) .filter(fg -> !newFilesWritten.contains(fg)) .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 9013901c9..4644d29e0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -28,9 +28,12 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction; +import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; + import org.apache.spark.api.java.JavaRDD; import java.util.List; @@ -72,7 +75,8 @@ public class SparkBulkInsertHelper extends Abs executor.getCommitActionType(), instantTime), Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); // write new files - JavaRDD writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false); + JavaRDD writeStatuses = + bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -87,14 +91,14 @@ public class SparkBulkInsertHelper extends Abs Option> userDefinedBulkInsertPartitioner, boolean useWriterSchema, int parallelism, - boolean preserveMetadata) { + WriteHandleFactory writeHandleFactory) { // De-dupe/merge if needed JavaRDD> dedupedRecords = inputRecords; if (performDedupe) { dedupedRecords = (JavaRDD>) SparkWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - parallelism, table); + parallelism, table); } final JavaRDD> repartitionedRecords; @@ -109,7 +113,7 @@ public class SparkBulkInsertHelper extends Abs JavaRDD writeStatusRDD = repartitionedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction(instantTime, - partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, preserveMetadata), true) + partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true) .flatMap(List::iterator); return writeStatusRDD; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index bff9724b5..66fb1cd04 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -26,6 +26,8 @@ import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy; +import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy; import org.apache.hudi.client.validator.SparkPreCommitValidator; import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator; import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator; @@ -1356,7 +1358,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1) .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); - testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); + testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } @ParameterizedTest @@ -1367,7 +1369,21 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { .withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key") .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1) .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); - testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); + testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); + } + + @ParameterizedTest + @MethodSource("populateMetaFieldsAndPreserveMetadataParams") + public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { + // setup clustering config. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringSortColumns("begin_lat,begin_lon") + .withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName()) + .withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName()) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1) + .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); + // note that assertSameFileIds is true for this test because of the plan and execution strategy + testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } @ParameterizedTest @@ -1409,7 +1425,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { .withClusteringSortColumns("_hoodie_record_key") .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); try { - testInsertAndClustering(clusteringConfig, true, true, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); + testInsertAndClustering(clusteringConfig, true, true, false, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); fail("expected pre-commit clustering validation to fail"); } catch (HoodieValidationException e) { // expected @@ -1422,7 +1438,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); try { - testInsertAndClustering(clusteringConfig, false, true, SqlQueryEqualityPreCommitValidator.class.getName(), "", ""); + testInsertAndClustering(clusteringConfig, false, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), "", ""); fail("expected pre-commit clustering validation to fail because sql query is not configured"); } catch (HoodieValidationException e) { // expected @@ -1435,7 +1451,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); - testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(), + testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", COUNT_SQL_QUERY_FOR_VALIDATION + "#400"); } @@ -1446,7 +1462,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); try { - testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(), + testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(), "", COUNT_SQL_QUERY_FOR_VALIDATION + "#802"); fail("expected pre-commit clustering validation to fail because of count mismatch. expect 400 rows, not 802"); } catch (HoodieValidationException e) { @@ -1455,19 +1471,25 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } private List testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering) throws Exception { - return testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, "", "", ""); + return testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, false, "", "", ""); } private List testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, - boolean completeClustering, String validatorClasses, + boolean completeClustering, boolean assertSameFileIds, String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception { - Pair, List> allRecords = testInsertTwoBatches(populateMetaFields); - testClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords); - return allRecords.getLeft(); + Pair, List>, Set> allRecords = testInsertTwoBatches(populateMetaFields); + testClustering(clusteringConfig, populateMetaFields, completeClustering, assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords); + return allRecords.getLeft().getLeft(); } - private Pair, List> testInsertTwoBatches(boolean populateMetaFields) throws IOException { + /** + * This method returns following three items: + * 1. List of all HoodieRecord written in the two batches of insert. + * 2. Commit instants of the two batches. + * 3. List of new file group ids that were written in the two batches. + */ + private Pair, List>, Set> testInsertTwoBatches(boolean populateMetaFields) throws IOException { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); @@ -1482,27 +1504,34 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { List records2 = dataGen.generateInserts(commitTime2, 200); List statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields); Set fileIds2 = getFileGroupIdsFromWriteStatus(statuses2); + Set fileIdsUnion = new HashSet<>(fileIds1); + fileIdsUnion.addAll(fileIds2); //verify new files are created for 2nd write Set fileIdIntersection = new HashSet<>(fileIds1); fileIdIntersection.retainAll(fileIds2); assertEquals(0, fileIdIntersection.size()); - return Pair.of(Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1, commitTime2)); + return Pair.of(Pair.of(Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1, commitTime2)), fileIdsUnion); } - private void testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, + private void testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, boolean assertSameFileIds, String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation, - Pair, List> allRecords) throws IOException { + Pair, List>, Set> allRecords) throws IOException { HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false) .withClusteringConfig(clusteringConfig) .withProps(getPropertiesForKeyGen()).build(); HoodieWriteMetadata> clusterMetadata = - performClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords); - + performClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords.getLeft()); + if (assertSameFileIds) { + Set replacedFileIds = clusterMetadata.getWriteStats().get().stream() + .map(s -> new HoodieFileGroupId(s.getPartitionPath(),s.getFileId())).collect(Collectors.toSet()); + Set insertedFileIds = allRecords.getRight(); + assertEquals(insertedFileIds, replacedFileIds); + } if (completeClustering) { String clusteringCommitTime = metaClient.reloadActiveTimeline().getCompletedReplaceTimeline() .getReverseOrderedInstants().findFirst().get().getTimestamp(); - verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config); + verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft().getLeft(), clusterMetadata.getWriteStatuses().collect(), config); } }