[HUDI-3322][HUDI-3343] Fixing Metadata Table Records Duplication Issues (#4716)
This change is addressing issues in regards to Metadata Table observing ingesting duplicated records leading to it persisting incorrect file-sizes for the files referred to in those records. There are multiple issues that were leading to that: - [HUDI-3322] Incorrect Rollback Plan generation: Rollback Plan generated for MOR tables was overly expansively listing all log-files with the latest base-instant as the ones that have been affected by the rollback, leading to invalid MT records being ingested referring to those. - [HUDI-3343] Metadata Table including Uncommitted Log Files during Bootstrap: Since MT is bootstrapped at the end of the commit operation execution (after FS activity, but before committing to the timeline), it was actually incorrectly ingesting some files that were part of the intermediate state of the operation being committed. This change will unblock Stack of PRs based off #4556
This commit is contained in:
@@ -38,14 +38,6 @@
|
||||
"type": "long",
|
||||
"doc": "Size of this file in bytes"
|
||||
}
|
||||
}], "default":null },
|
||||
{"name": "writtenLogFiles", "type": ["null", {
|
||||
"type": "map",
|
||||
"doc": "Log files written that were expected to be rolledback",
|
||||
"values": {
|
||||
"type": "long",
|
||||
"doc": "Size of this file in bytes"
|
||||
}
|
||||
}], "default":null }
|
||||
]
|
||||
}}},
|
||||
|
||||
@@ -38,16 +38,13 @@ public class HoodieRollbackStat implements Serializable {
|
||||
private final List<String> failedDeleteFiles;
|
||||
// Count of HoodieLogFile to commandBlocks written for a particular rollback
|
||||
private final Map<FileStatus, Long> commandBlocksCount;
|
||||
// all log files with same base instant as instant to be rolledback
|
||||
private final Map<FileStatus, Long> writtenLogFileSizeMap;
|
||||
|
||||
public HoodieRollbackStat(String partitionPath, List<String> successDeleteFiles, List<String> failedDeleteFiles,
|
||||
Map<FileStatus, Long> commandBlocksCount, Map<FileStatus, Long> writtenLogFileSizeMap) {
|
||||
Map<FileStatus, Long> commandBlocksCount) {
|
||||
this.partitionPath = partitionPath;
|
||||
this.successDeleteFiles = successDeleteFiles;
|
||||
this.failedDeleteFiles = failedDeleteFiles;
|
||||
this.commandBlocksCount = commandBlocksCount;
|
||||
this.writtenLogFileSizeMap = writtenLogFileSizeMap;
|
||||
}
|
||||
|
||||
public Map<FileStatus, Long> getCommandBlocksCount() {
|
||||
@@ -66,10 +63,6 @@ public class HoodieRollbackStat implements Serializable {
|
||||
return failedDeleteFiles;
|
||||
}
|
||||
|
||||
public Map<FileStatus, Long> getWrittenLogFileSizeMap() {
|
||||
return writtenLogFileSizeMap;
|
||||
}
|
||||
|
||||
public static HoodieRollbackStat.Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
@@ -82,7 +75,6 @@ public class HoodieRollbackStat implements Serializable {
|
||||
private List<String> successDeleteFiles;
|
||||
private List<String> failedDeleteFiles;
|
||||
private Map<FileStatus, Long> commandBlocksCount;
|
||||
private Map<FileStatus, Long> writtenLogFileSizeMap;
|
||||
private String partitionPath;
|
||||
|
||||
public Builder withDeletedFileResults(Map<FileStatus, Boolean> deletedFiles) {
|
||||
@@ -108,11 +100,6 @@ public class HoodieRollbackStat implements Serializable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withWrittenLogFileSizeMap(Map<FileStatus, Long> writtenLogFileSizeMap) {
|
||||
this.writtenLogFileSizeMap = writtenLogFileSizeMap;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withPartitionPath(String partitionPath) {
|
||||
this.partitionPath = partitionPath;
|
||||
return this;
|
||||
@@ -128,10 +115,7 @@ public class HoodieRollbackStat implements Serializable {
|
||||
if (commandBlocksCount == null) {
|
||||
commandBlocksCount = Collections.EMPTY_MAP;
|
||||
}
|
||||
if (writtenLogFileSizeMap == null) {
|
||||
writtenLogFileSizeMap = Collections.EMPTY_MAP;
|
||||
}
|
||||
return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap);
|
||||
return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -495,24 +495,25 @@ public class FSUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the latest log file written from the list of log files passed in.
|
||||
* Get the latest log file for the passed in file-id in the partition path
|
||||
*/
|
||||
public static Option<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) {
|
||||
return Option.fromJavaOptional(logFiles.min(HoodieLogFile.getReverseLogFileComparator()));
|
||||
public static Option<HoodieLogFile> getLatestLogFile(FileSystem fs, Path partitionPath, String fileId,
|
||||
String logFileExtension, String baseCommitTime) throws IOException {
|
||||
return getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the log files for the passed in FileId in the partition path.
|
||||
* Get all the log files for the passed in file-id in the partition path.
|
||||
*/
|
||||
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId,
|
||||
final String logFileExtension, final String baseCommitTime) throws IOException {
|
||||
try {
|
||||
return Arrays
|
||||
.stream(fs.listStatus(partitionPath,
|
||||
path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension)))
|
||||
.map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
|
||||
PathFilter pathFilter = path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension);
|
||||
return Arrays.stream(fs.listStatus(partitionPath, pathFilter))
|
||||
.map(HoodieLogFile::new)
|
||||
.filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
|
||||
} catch (FileNotFoundException e) {
|
||||
return Stream.<HoodieLogFile>builder().build();
|
||||
return Stream.of();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -787,4 +788,8 @@ public class FSUtils {
|
||||
|
||||
public interface SerializableFunction<T, R> extends Function<T, R>, Serializable {
|
||||
}
|
||||
|
||||
private static Option<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) {
|
||||
return Option.fromJavaOptional(logFiles.min(HoodieLogFile.getReverseLogFileComparator()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,10 +18,6 @@
|
||||
|
||||
package org.apache.hudi.common.model;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
@@ -29,6 +25,9 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
|
||||
@@ -77,10 +77,8 @@ public class TimelineMetadataUtils {
|
||||
for (HoodieRollbackStat stat : rollbackStats) {
|
||||
Map<String, Long> rollbackLogFiles = stat.getCommandBlocksCount().keySet().stream()
|
||||
.collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
|
||||
Map<String, Long> probableLogFiles = stat.getWrittenLogFileSizeMap().keySet().stream()
|
||||
.collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen));
|
||||
HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(),
|
||||
stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles, probableLogFiles);
|
||||
stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles);
|
||||
partitionMetadataBuilder.put(stat.getPartitionPath(), metadata);
|
||||
totalDeleted += stat.getSuccessDeleteFiles().size();
|
||||
}
|
||||
|
||||
@@ -194,7 +194,8 @@ public class HoodieTableMetadataUtil {
|
||||
* @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition.
|
||||
* @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes.
|
||||
*/
|
||||
private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata rollbackMetadata,
|
||||
private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline,
|
||||
HoodieRollbackMetadata rollbackMetadata,
|
||||
Map<String, List<String>> partitionToDeletedFiles,
|
||||
Map<String, Map<String, Long>> partitionToAppendedFiles,
|
||||
Option<String> lastSyncTs) {
|
||||
@@ -264,17 +265,6 @@ public class HoodieTableMetadataUtil {
|
||||
partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn);
|
||||
});
|
||||
}
|
||||
|
||||
if (pm.getWrittenLogFiles() != null && !pm.getWrittenLogFiles().isEmpty()) {
|
||||
if (!partitionToAppendedFiles.containsKey(partition)) {
|
||||
partitionToAppendedFiles.put(partition, new HashMap<>());
|
||||
}
|
||||
|
||||
// Extract appended file name from the absolute paths saved in getWrittenLogFiles()
|
||||
pm.getWrittenLogFiles().forEach((path, size) -> {
|
||||
partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user