1
0

[HUDI-1354] Block updates and replace on file groups in clustering (#2275)

* [HUDI-1354] Block updates and replace on file groups in clustering

* [HUDI-1354]  Block updates and replace on file groups in clustering
This commit is contained in:
lw0090
2020-12-28 12:30:29 +08:00
committed by GitHub
parent 9e6889a8ce
commit 6cdf59d92b
8 changed files with 339 additions and 6 deletions

View File

@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.update.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.HashSet;
import java.util.List;
/**
* Update strategy based on following.
* if some file group have update record, throw exception
*/
public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
private static final Logger LOG = LogManager.getLogger(SparkRejectUpdateStrategy.class);
public SparkRejectUpdateStrategy(HoodieSparkEngineContext engineContext, HashSet<HoodieFileGroupId> fileGroupsInPendingClustering) {
super(engineContext, fileGroupsInPendingClustering);
}
private List<HoodieFileGroupId> getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> inputRecords) {
List<HoodieFileGroupId> fileGroupIdsWithUpdates = inputRecords
.filter(record -> record.getCurrentLocation() != null)
.map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect();
return fileGroupIdsWithUpdates;
}
@Override
public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) {
String msg = String.format("Not allowed to update the clustering file group %s. "
+ "For pending clustering operations, we are not going to support update for now.",
fileGroupIdWithRecordUpdate.toString());
LOG.error(msg);
throw new HoodieClusteringUpdateException(msg);
}
});
return taggedRecordsRDD;
}
}

View File

@@ -28,11 +28,13 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
@@ -46,6 +48,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -59,11 +62,13 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.stream.Collectors;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Map;
public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> extends
@@ -88,6 +93,18 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
super(context, config, table, instantTime, operationType, extraMetadata);
}
private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
if (config.isClusteringEnabled()) {
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
return (JavaRDD<HoodieRecord<T>>)updateStrategy.handleUpdate(inputRecordsRDD);
} else {
return inputRecordsRDD;
}
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
@@ -107,9 +124,12 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
saveWorkloadProfileMetadataToInflight(profile, instantTime);
}
// handle records update with clustering
JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate = clusteringHandleUpdate(inputRecordsRDD);
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDD, partitioner);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner);
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
if (WriteOperationType.isChangingRecords(operationType)) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);

View File

@@ -129,6 +129,34 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
return bucket;
}
/**
* Get the in pending clustering fileId for each partition path.
* @return partition path to pending clustering file groups id
*/
private Map<String, Set<String>> getPartitionPathToPendingClusteringFileGroupsId() {
Map<String, Set<String>> partitionPathToInPendingClusteringFileId =
table.getFileSystemView().getFileGroupsInPendingClustering()
.map(fileGroupIdAndInstantPair ->
Pair.of(fileGroupIdAndInstantPair.getKey().getPartitionPath(), fileGroupIdAndInstantPair.getKey().getFileId()))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
return partitionPathToInPendingClusteringFileId;
}
/**
* Exclude small file handling for clustering since update path is not supported.
* @param pendingClusteringFileGroupsId pending clustering file groups id of partition
* @param smallFiles small files of partition
* @return smallFiles not in clustering
*/
private List<SmallFile> filterSmallFilesInClustering(final Set<String> pendingClusteringFileGroupsId, final List<SmallFile> smallFiles) {
if (this.config.isClusteringEnabled()) {
return smallFiles.stream()
.filter(smallFile -> !pendingClusteringFileGroupsId.contains(smallFile.location.getFileId())).collect(Collectors.toList());
} else {
return smallFiles;
}
}
private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) {
// for new inserts, compute buckets depending on how many records we have for each partition
Set<String> partitionPaths = profile.getPartitionPaths();
@@ -140,11 +168,16 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), context);
Map<String, Set<String>> partitionPathToPendingClusteringFileGroupsId = getPartitionPathToPendingClusteringFileGroupsId();
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) {
List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
List<SmallFile> smallFiles =
filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath, Collections.emptySet()),
partitionSmallFilesMap.get(partitionPath));
this.smallFiles.addAll(smallFiles);
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);