1
0

[HUDI-2444] Fixing delete files corner cases wrt cleaning and rollback when applying changes to metadata (#3678)

This commit is contained in:
Sivabalan Narayanan
2021-09-20 11:05:31 -04:00
committed by GitHub
parent 3354fac42f
commit 5091ab7311
2 changed files with 15 additions and 3 deletions

View File

@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
@@ -126,6 +127,8 @@ public class BaseRollbackHelper implements Serializable {
String latestBaseInstant = rollbackRequest.getLatestBaseInstant(); String latestBaseInstant = rollbackRequest.getLatestBaseInstant();
FileSystem fs = metaClient.getFs(); FileSystem fs = metaClient.getFs();
// collect all log files that is supposed to be deleted with this rollback // collect all log files that is supposed to be deleted with this rollback
// what happens if file was deleted when invoking fs.getFileStatus(?) below.
// I understand we don't delete log files. but just curious if we need to handle this case.
Map<FileStatus, Long> writtenLogFileSizeMap = new HashMap<>(); Map<FileStatus, Long> writtenLogFileSizeMap = new HashMap<>();
for (Map.Entry<String, Long> entry : logFilesToBeDeleted.entrySet()) { for (Map.Entry<String, Long> entry : logFilesToBeDeleted.entrySet()) {
writtenLogFileSizeMap.put(fs.getFileStatus(new Path(entry.getKey())), entry.getValue()); writtenLogFileSizeMap.put(fs.getFileStatus(new Path(entry.getKey())), entry.getValue());
@@ -188,7 +191,12 @@ public class BaseRollbackHelper implements Serializable {
String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent()); String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
boolean isDeleted = true; boolean isDeleted = true;
if (doDelete) { if (doDelete) {
isDeleted = metaClient.getFs().delete(fullDeletePath); try {
isDeleted = metaClient.getFs().delete(fullDeletePath);
} catch (FileNotFoundException e) {
// if first rollback attempt failed and retried again, chances that some files are already deleted.
isDeleted = true;
}
} }
return HoodieRollbackStat.newBuilder() return HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath) .withPartitionPath(partitionPath)

View File

@@ -195,7 +195,7 @@ public class HoodieTableMetadataUtil {
cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
// Files deleted from a partition // Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles(); List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Option.of(new ArrayList<>(deletedFiles))); Option.of(new ArrayList<>(deletedFiles)));
@@ -285,7 +285,7 @@ public class HoodieTableMetadataUtil {
} }
final String partition = pm.getPartitionPath(); final String partition = pm.getPartitionPath();
if (!pm.getSuccessDeleteFiles().isEmpty() && !shouldSkip) { if ((!pm.getSuccessDeleteFiles().isEmpty() || !pm.getFailedDeleteFiles().isEmpty()) && !shouldSkip) {
if (!partitionToDeletedFiles.containsKey(partition)) { if (!partitionToDeletedFiles.containsKey(partition)) {
partitionToDeletedFiles.put(partition, new ArrayList<>()); partitionToDeletedFiles.put(partition, new ArrayList<>());
} }
@@ -293,6 +293,10 @@ public class HoodieTableMetadataUtil {
// Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles() // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles()
List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName()) List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName())
.collect(Collectors.toList()); .collect(Collectors.toList());
if (!pm.getFailedDeleteFiles().isEmpty()) {
deletedFiles.addAll(pm.getFailedDeleteFiles().stream().map(p -> new Path(p).getName())
.collect(Collectors.toList()));
}
partitionToDeletedFiles.get(partition).addAll(deletedFiles); partitionToDeletedFiles.get(partition).addAll(deletedFiles);
} }