[HUDI-3135] Make delete partitions lazy to be executed by the cleaner (#4489)
As of now, delete partitions will ensure all file groups are deleted, but the partition as such is not deleted. So, get all partitions might be returning the deleted partitions as well. but no data will be served since all file groups are deleted. With this patch, we are fixing it. We are letting cleaner take care of deleting the partitions when all file groups pertaining to a partitions are deleted. - Fixed the CleanPlanActionExecutor to return meta info about list of partitions to be deleted. If there are no valid file groups for a partition, clean planner will include the partition to be deleted. - Fixed HoodieCleanPlan avro schema to include the list of partitions to be deleted - CleanActionExecutor is fixed to delete partitions if any (as per clean plan) - Same info is added to HoodieCleanMetadata - Metadata table when applying clean metadata, will check for partitions to be deleted and will update the "all_partitions" record for the deleted partitions. Co-authored-by: sivabalan <n.siva.b@gmail.com>
This commit is contained in:
@@ -24,6 +24,7 @@
|
||||
{"name": "policy", "type": "string"},
|
||||
{"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}},
|
||||
{"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
|
||||
{"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}
|
||||
{"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}},
|
||||
{"name": "isPartitionDeleted", "type":["null", "boolean"], "default": null }
|
||||
]
|
||||
}
|
||||
|
||||
@@ -92,6 +92,14 @@
|
||||
}
|
||||
}}],
|
||||
"default" : null
|
||||
},
|
||||
{
|
||||
"name": "partitionsToBeDeleted",
|
||||
"doc": "partitions to be deleted",
|
||||
"type":["null",
|
||||
{ "type":"array", "items":"string"}
|
||||
],
|
||||
"default": null
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -47,19 +47,22 @@ public class HoodieCleanStat implements Serializable {
|
||||
private final List<String> failedDeleteBootstrapBaseFiles;
|
||||
// Earliest commit that was retained in this clean
|
||||
private final String earliestCommitToRetain;
|
||||
// set to true if partition is deleted
|
||||
private final boolean isPartitionDeleted;
|
||||
|
||||
public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
|
||||
List<String> successDeleteFiles, List<String> failedDeleteFiles, String earliestCommitToRetain) {
|
||||
this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain,
|
||||
CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(),
|
||||
CollectionUtils.createImmutableList());
|
||||
CollectionUtils.createImmutableList(), false);
|
||||
}
|
||||
|
||||
public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
|
||||
List<String> successDeleteFiles, List<String> failedDeleteFiles,
|
||||
String earliestCommitToRetain, List<String> deleteBootstrapBasePathPatterns,
|
||||
List<String> successDeleteBootstrapBaseFiles,
|
||||
List<String> failedDeleteBootstrapBaseFiles) {
|
||||
List<String> failedDeleteBootstrapBaseFiles,
|
||||
boolean isPartitionDeleted) {
|
||||
this.policy = policy;
|
||||
this.partitionPath = partitionPath;
|
||||
this.deletePathPatterns = deletePathPatterns;
|
||||
@@ -69,6 +72,7 @@ public class HoodieCleanStat implements Serializable {
|
||||
this.deleteBootstrapBasePathPatterns = deleteBootstrapBasePathPatterns;
|
||||
this.successDeleteBootstrapBaseFiles = successDeleteBootstrapBaseFiles;
|
||||
this.failedDeleteBootstrapBaseFiles = failedDeleteBootstrapBaseFiles;
|
||||
this.isPartitionDeleted = isPartitionDeleted;
|
||||
}
|
||||
|
||||
public HoodieCleaningPolicy getPolicy() {
|
||||
@@ -107,6 +111,10 @@ public class HoodieCleanStat implements Serializable {
|
||||
return earliestCommitToRetain;
|
||||
}
|
||||
|
||||
public boolean isPartitionDeleted() {
|
||||
return isPartitionDeleted;
|
||||
}
|
||||
|
||||
public static HoodieCleanStat.Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
@@ -125,6 +133,7 @@ public class HoodieCleanStat implements Serializable {
|
||||
private List<String> deleteBootstrapBasePathPatterns;
|
||||
private List<String> successDeleteBootstrapBaseFiles;
|
||||
private List<String> failedDeleteBootstrapBaseFiles;
|
||||
private boolean isPartitionDeleted;
|
||||
|
||||
public Builder withPolicy(HoodieCleaningPolicy policy) {
|
||||
this.policy = policy;
|
||||
@@ -172,10 +181,15 @@ public class HoodieCleanStat implements Serializable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder isPartitionDeleted(boolean isPartitionDeleted) {
|
||||
this.isPartitionDeleted = isPartitionDeleted;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieCleanStat build() {
|
||||
return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles,
|
||||
earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles,
|
||||
failedDeleteBootstrapBaseFiles);
|
||||
failedDeleteBootstrapBaseFiles, isPartitionDeleted);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,7 +204,8 @@ public class HoodieCleanStat implements Serializable {
|
||||
+ ", earliestCommitToRetain='" + earliestCommitToRetain
|
||||
+ ", deleteBootstrapBasePathPatterns=" + deleteBootstrapBasePathPatterns
|
||||
+ ", successDeleteBootstrapBaseFiles=" + successDeleteBootstrapBaseFiles
|
||||
+ ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles + '\''
|
||||
+ ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles
|
||||
+ ", isPartitionDeleted=" + isPartitionDeleted + '\''
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.common.table.timeline.versioning.clean;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
@@ -61,6 +62,6 @@ public class CleanPlanV1MigrationHandler extends AbstractMigratorBase<HoodieClea
|
||||
.collect(Collectors.toList()));
|
||||
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||
return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), filesPerPartition, VERSION,
|
||||
new HashMap<>());
|
||||
new HashMap<>(), new ArrayList<>());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -53,7 +54,7 @@ public class CleanPlanV2MigrationHandler extends AbstractMigratorBase<HoodieClea
|
||||
new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), e.getKey()), v).toString(), false))
|
||||
.collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||
return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), new HashMap<>(), VERSION,
|
||||
filePathsPerPartition);
|
||||
filePathsPerPartition, new ArrayList<>());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -64,13 +64,13 @@ public class CleanerUtils {
|
||||
for (HoodieCleanStat stat : cleanStats) {
|
||||
HoodieCleanPartitionMetadata metadata =
|
||||
new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(),
|
||||
stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles());
|
||||
stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), stat.isPartitionDeleted());
|
||||
partitionMetadataMap.put(stat.getPartitionPath(), metadata);
|
||||
if ((null != stat.getDeleteBootstrapBasePathPatterns())
|
||||
&& (!stat.getDeleteBootstrapBasePathPatterns().isEmpty())) {
|
||||
HoodieCleanPartitionMetadata bootstrapMetadata = new HoodieCleanPartitionMetadata(stat.getPartitionPath(),
|
||||
stat.getPolicy().name(), stat.getDeleteBootstrapBasePathPatterns(), stat.getSuccessDeleteBootstrapBaseFiles(),
|
||||
stat.getFailedDeleteBootstrapBaseFiles());
|
||||
stat.getFailedDeleteBootstrapBaseFiles(), stat.isPartitionDeleted());
|
||||
partitionBootstrapMetadataMap.put(stat.getPartitionPath(), bootstrapMetadata);
|
||||
}
|
||||
totalDeleted += stat.getSuccessDeleteFiles().size();
|
||||
|
||||
@@ -18,6 +18,13 @@
|
||||
|
||||
package org.apache.hudi.metadata;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
|
||||
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
|
||||
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
|
||||
@@ -35,14 +42,6 @@ import org.apache.hudi.common.util.hash.PartitionIndexID;
|
||||
import org.apache.hudi.exception.HoodieMetadataException;
|
||||
import org.apache.hudi.io.storage.HoodieHFileReader;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
@@ -222,8 +221,17 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
||||
* @param partitions The list of partitions
|
||||
*/
|
||||
public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions) {
|
||||
return createPartitionListRecord(partitions, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and return a {@code HoodieMetadataPayload} to save list of partitions.
|
||||
*
|
||||
* @param partitions The list of partitions
|
||||
*/
|
||||
public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions, boolean isDeleted) {
|
||||
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
|
||||
partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, false)));
|
||||
partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, isDeleted)));
|
||||
|
||||
HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath());
|
||||
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
|
||||
|
||||
@@ -18,6 +18,11 @@
|
||||
|
||||
package org.apache.hudi.metadata;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
@@ -53,17 +58,10 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieMetadataException;
|
||||
import org.apache.hudi.io.storage.HoodieFileReader;
|
||||
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
@@ -162,10 +160,10 @@ public class HoodieTableMetadataUtil {
|
||||
String instantTime) {
|
||||
List<HoodieRecord> records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
|
||||
|
||||
// Add record bearing partitions list
|
||||
ArrayList<String> partitionsList = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
|
||||
// Add record bearing added partitions list
|
||||
ArrayList<String> partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
|
||||
|
||||
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList));
|
||||
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));
|
||||
|
||||
// Update files listing records for each individual partition
|
||||
List<HoodieRecord<HoodieMetadataPayload>> updatedPartitionFilesRecords =
|
||||
@@ -318,6 +316,7 @@ public class HoodieTableMetadataUtil {
|
||||
String instantTime) {
|
||||
List<HoodieRecord> records = new LinkedList<>();
|
||||
int[] fileDeleteCount = {0};
|
||||
List<String> deletedPartitions = new ArrayList<>();
|
||||
cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
|
||||
final String partition = getPartition(partitionName);
|
||||
// Files deleted from a partition
|
||||
@@ -327,8 +326,16 @@ public class HoodieTableMetadataUtil {
|
||||
|
||||
records.add(record);
|
||||
fileDeleteCount[0] += deletedFiles.size();
|
||||
boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted();
|
||||
if (isPartitionDeleted) {
|
||||
deletedPartitions.add(partitionName);
|
||||
}
|
||||
});
|
||||
|
||||
if (!deletedPartitions.isEmpty()) {
|
||||
// if there are partitions to be deleted, add them to delete list
|
||||
records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
|
||||
}
|
||||
LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
|
||||
+ ", #files_deleted=" + fileDeleteCount[0]);
|
||||
return records;
|
||||
|
||||
Reference in New Issue
Block a user