1
0

[HUDI-1877] Support records staying in same fileId after clustering (#3833)

* [HUDI-1877] Support records staying in same fileId after clustering

Add plan strategy

* Ensure same filegroup id and refactor based on comments
This commit is contained in:
Sagar Sumit
2021-11-10 09:47:50 +05:30
committed by GitHub
parent dfe3b84715
commit bb6a19e7d7
14 changed files with 280 additions and 81 deletions

View File

@@ -23,7 +23,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
public class CreateHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {
import java.io.Serializable;
public class CreateHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> implements Serializable {
private boolean preserveMetadata = false;

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -31,11 +32,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
* <p>
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
public class SingleFileHandleCreateFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {
public class SingleFileHandleCreateFactory<T extends HoodieRecordPayload, I, K, O> extends CreateHandleFactory<T, I, K, O> implements Serializable {
private AtomicBoolean isHandleCreated = new AtomicBoolean(false);
private String fileId;
private boolean preserveHoodieMetadata;
private final AtomicBoolean isHandleCreated = new AtomicBoolean(false);
private final String fileId;
private final boolean preserveHoodieMetadata;
public SingleFileHandleCreateFactory(String fileId, boolean preserveHoodieMetadata) {
super();

View File

@@ -23,7 +23,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
public abstract class WriteHandleFactory<T extends HoodieRecordPayload, I, K, O> {
import java.io.Serializable;
public abstract class WriteHandleFactory<T extends HoodieRecordPayload, I, K, O> implements Serializable {
private int numFilesWritten = 0;
public abstract HoodieWriteHandle<T, I, K, O> create(HoodieWriteConfig config, String commitTime, HoodieTable<T, I, K, O> hoodieTable,

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -44,5 +45,5 @@ public abstract class AbstractBulkInsertHelper<T extends HoodieRecordPayload, I,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean addMetadataFields,
int parallelism,
boolean preserveMetadata);
WriteHandleFactory writeHandleFactory);
}

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.JavaLazyInsertIterable;
import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.FileIdPrefixProvider;
import org.apache.hudi.table.HoodieTable;
@@ -76,7 +77,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abst
}
// write new files
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false);
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
@@ -91,7 +92,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abst
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism,
boolean preserveHoodieMetadata) {
WriteHandleFactory writeHandleFactory) {
// De-dupe/merge if needed
List<HoodieRecord<T>> dedupedRecords = inputRecords;

View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* In this strategy, clustering group for each partition is built in the same way as {@link SparkSizeBasedClusteringPlanStrategy}.
* The difference is that the output groups is 1 and file group id remains the same.
*/
public class SparkSingleFileSortPlanStrategy<T extends HoodieRecordPayload<T>>
extends SparkSizeBasedClusteringPlanStrategy<T> {
public SparkSingleFileSortPlanStrategy(HoodieSparkCopyOnWriteTable<T> table, HoodieSparkEngineContext engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public SparkSingleFileSortPlanStrategy(HoodieSparkMergeOnReadTable<T> table, HoodieSparkEngineContext engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = fileSlices.stream()
.map(fileSlice -> Pair.of(Collections.singletonList(fileSlice), 1)).collect(Collectors.toList());
return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
}

View File

@@ -71,24 +71,30 @@ public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
long totalSizeSoFar = 0;
HoodieWriteConfig writeConfig = getWriteConfig();
for (FileSlice currentSlice : fileSlices) {
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize();
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
currentGroup.add(currentSlice);
// totalSizeSoFar could be 0 when new group was created in the previous conditional block.
// reset to the size of current slice, otherwise the number of output file group will become 0 even though current slice is present.
if (totalSizeSoFar == 0) {
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
}
}
if (!currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}

View File

@@ -34,19 +34,21 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
@@ -69,6 +71,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
/**
* Clustering strategy to submit multiple spark jobs and union the results.
*/
@@ -102,21 +107,50 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
/**
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. The number of new
* file groups created is bounded by numOutputGroups.
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters.
* The number of new file groups created is bounded by numOutputGroups.
* Note that commit is not done as part of strategy. commit is callers responsibility.
*
* @param inputRecords RDD of {@link HoodieRecord}.
* @param numOutputGroups Number of output file groups.
* @param instantTime Clustering (replace commit) instant time.
* @param strategyParams Strategy parameters containing columns to sort the data by when clustering.
* @param schema Schema of the data including metadata fields.
* @param fileGroupIdList File group id corresponding to each out group.
* @param preserveHoodieMetadata Whether to preserve commit metadata while clustering.
* @return RDD of {@link WriteStatus}.
*/
public abstract JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata);
/**
* Create {@link BulkInsertPartitioner} based on strategy params.
*
* @param strategyParams Strategy parameters containing columns to sort the data by when clustering.
* @param schema Schema of the data including metadata fields.
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty.
*/
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (getWriteConfig().isLayoutOptimizationEnabled()) {
// sort input records by z-order/hilbert
return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(),
getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema)));
} else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
} else {
return Option.empty();
}
}
/**
* Submit job to execute clustering for the group.
*/
private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
boolean preserveHoodieMetadata, String instantTime) {
CompletableFuture<JavaRDD<WriteStatus>> writeStatusesFuture = CompletableFuture.supplyAsync(() -> {
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
JavaRDD<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
@@ -125,8 +159,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
.collect(Collectors.toList());
return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata);
});
return writeStatusesFuture;
}
/**
@@ -134,7 +166,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
*/
private JavaRDD<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent();
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
if (hasLogFiles) {
// if there are log files, we read all records into memory for a file group and apply updates.
return readRecordsForGroupWithLogs(jsc, clusteringOps, instantTime);
@@ -174,7 +206,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
.build();
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema,
tableConfig.getPayloadClass(),
tableConfig.getPreCombineField(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),

View File

@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.SingleFileHandleCreateFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* This strategy is similar to {@link SparkSortAndSizeExecutionStrategy} with the difference being that
* there should be only one large file group per clustering group.
*/
public class SparkSingleFileSortExecutionStrategy<T extends HoodieRecordPayload<T>>
extends MultipleSparkJobExecutionStrategy<T> {
private static final Logger LOG = LogManager.getLogger(SparkSingleFileSortExecutionStrategy.class);
public SparkSingleFileSortExecutionStrategy(HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(JavaRDD<HoodieRecord<T>> inputRecords,
int numOutputGroups,
String instantTime,
Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean preserveHoodieMetadata) {
if (numOutputGroups != 1 || fileGroupIdList.size() != 1) {
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
}
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(Long.MAX_VALUE));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
}
}

View File

@@ -18,19 +18,14 @@
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
@@ -43,8 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
/**
* Clustering Strategy based on following.
* 1) Spark execution engine.
@@ -62,8 +55,8 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
@Override
public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
@@ -72,22 +65,6 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, preserveHoodieMetadata);
}
/**
* Create BulkInsertPartitioner based on strategy params.
*/
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (getWriteConfig().isLayoutOptimizationEnabled()) {
// sort input records by z-order/hilbert
return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(),
getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema)));
} else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
} else {
return Option.empty();
}
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
}
}

View File

@@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.function.Function2;
@@ -43,25 +43,25 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
private HoodieTable hoodieTable;
private List<String> fileIDPrefixes;
private boolean useWriterSchema;
private boolean preserveMetadata;
private WriteHandleFactory writeHandleFactory;
public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
HoodieWriteConfig config, HoodieTable hoodieTable,
List<String> fileIDPrefixes, boolean useWriterSchema,
boolean preserveMetadata) {
WriteHandleFactory writeHandleFactory) {
this.instantTime = instantTime;
this.areRecordsSorted = areRecordsSorted;
this.config = config;
this.hoodieTable = hoodieTable;
this.fileIDPrefixes = fileIDPrefixes;
this.useWriterSchema = useWriterSchema;
this.preserveMetadata = preserveMetadata;
this.writeHandleFactory = writeHandleFactory;
}
@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> recordItr) {
return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema,
new CreateHandleFactory(preserveMetadata));
writeHandleFactory);
}
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -48,7 +49,6 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -115,9 +115,13 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
Set<HoodieFileGroupId> newFilesWritten = new HashSet(writeMetadata.getWriteStats().get().stream()
.map(s -> new HoodieFileGroupId(s.getPartitionPath(),s.getFileId()))
.collect(Collectors.toList()));
Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
.map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
// for the below execution strategy, new filegroup id would be same as old filegroup id
if (SparkSingleFileSortExecutionStrategy.class.getName().equals(config.getClusteringExecutionStrategyClass())) {
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
}
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.filter(fg -> !newFilesWritten.contains(fg))
.collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));

View File

@@ -28,9 +28,12 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
@@ -72,7 +75,8 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
// write new files
JavaRDD<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false);
JavaRDD<WriteStatus> writeStatuses =
bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
@@ -87,14 +91,14 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism,
boolean preserveMetadata) {
WriteHandleFactory writeHandleFactory) {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords = inputRecords;
if (performDedupe) {
dedupedRecords = (JavaRDD<HoodieRecord<T>>) SparkWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
parallelism, table);
parallelism, table);
}
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
@@ -109,7 +113,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime,
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, preserveMetadata), true)
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true)
.flatMap(List::iterator);
return writeStatusRDD;

View File

@@ -26,6 +26,8 @@ import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
import org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
@@ -1356,7 +1358,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
@@ -1367,7 +1369,21 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
@MethodSource("populateMetaFieldsAndPreserveMetadataParams")
public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("begin_lat,begin_lon")
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
// note that assertSameFileIds is true for this test because of the plan and execution strategy
testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
@@ -1409,7 +1425,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withClusteringSortColumns("_hoodie_record_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, true, true, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
testInsertAndClustering(clusteringConfig, true, true, false, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
fail("expected pre-commit clustering validation to fail");
} catch (HoodieValidationException e) {
// expected
@@ -1422,7 +1438,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, false, true, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
testInsertAndClustering(clusteringConfig, false, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
fail("expected pre-commit clustering validation to fail because sql query is not configured");
} catch (HoodieValidationException e) {
// expected
@@ -1435,7 +1451,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(),
testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(),
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#400");
}
@@ -1446,7 +1462,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(),
testInsertAndClustering(clusteringConfig, false, true, false, SqlQuerySingleResultPreCommitValidator.class.getName(),
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#802");
fail("expected pre-commit clustering validation to fail because of count mismatch. expect 400 rows, not 802");
} catch (HoodieValidationException e) {
@@ -1455,19 +1471,25 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering) throws Exception {
return testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, "", "", "");
return testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, false, "", "", "");
}
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields,
boolean completeClustering, String validatorClasses,
boolean completeClustering, boolean assertSameFileIds, String validatorClasses,
String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception {
Pair<List<HoodieRecord>, List<String>> allRecords = testInsertTwoBatches(populateMetaFields);
testClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
return allRecords.getLeft();
Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> allRecords = testInsertTwoBatches(populateMetaFields);
testClustering(clusteringConfig, populateMetaFields, completeClustering, assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
return allRecords.getLeft().getLeft();
}
private Pair<List<HoodieRecord>, List<String>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
@@ -1482,27 +1504,34 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
Set<HoodieFileGroupId> fileIdsUnion = new HashSet<>(fileIds1);
fileIdsUnion.addAll(fileIds2);
//verify new files are created for 2nd write
Set<HoodieFileGroupId> fileIdIntersection = new HashSet<>(fileIds1);
fileIdIntersection.retainAll(fileIds2);
assertEquals(0, fileIdIntersection.size());
return Pair.of(Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1, commitTime2));
return Pair.of(Pair.of(Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1, commitTime2)), fileIdsUnion);
}
private void testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering,
private void testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, boolean assertSameFileIds,
String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
Pair<List<HoodieRecord>, List<String>> allRecords) throws IOException {
Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> allRecords) throws IOException {
HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
.withClusteringConfig(clusteringConfig)
.withProps(getPropertiesForKeyGen()).build();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata =
performClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
performClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords.getLeft());
if (assertSameFileIds) {
Set<HoodieFileGroupId> replacedFileIds = clusterMetadata.getWriteStats().get().stream()
.map(s -> new HoodieFileGroupId(s.getPartitionPath(),s.getFileId())).collect(Collectors.toSet());
Set<HoodieFileGroupId> insertedFileIds = allRecords.getRight();
assertEquals(insertedFileIds, replacedFileIds);
}
if (completeClustering) {
String clusteringCommitTime = metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.getReverseOrderedInstants().findFirst().get().getTimestamp();
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft().getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
}
}