1
0

[HUDI-3825] Fixing non-partitioned table Partition Records persistence in MT (#5259)

* Filter out empty string (for non-partitioned table) being added to "__all_partitions__" record

* Instead of filtering, transform empty partition-id to `NON_PARTITIONED_NAME`

* Cleaned up `HoodieBackedTableMetadataWriter`

* Make sure REPLACE_COMMITS are handled as well
This commit is contained in:
Alexey Kudinkin
2022-04-08 03:28:31 -07:00
committed by GitHub
parent 67215abaf0
commit d7cc767dbc
4 changed files with 59 additions and 57 deletions

View File

@@ -87,7 +87,6 @@ import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
@@ -1012,28 +1011,21 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions"); engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions");
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient); List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
List<String> partitions = new ArrayList<>(); Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
AtomicLong totalFiles = new AtomicLong(0); .map(p -> {
Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream().map(p -> { String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
final String partitionName = HoodieTableMetadataUtil.getPartition(p.getRelativePath());
partitions.add(partitionName);
totalFiles.addAndGet(p.getTotalFiles());
return Pair.of(partitionName, p.getFileNameToSizeMap()); return Pair.of(partitionName, p.getFileNameToSizeMap());
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); })
final Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
if (partitionTypes.contains(MetadataPartitionType.FILES)) { if (partitionTypes.contains(MetadataPartitionType.FILES)) {
// Record which saves the list of all partitions // Record which saves the list of all partitions
HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
if (partitions.isEmpty()) {
// in case of initializing of a fresh table, there won't be any partitions, but we need to make a boostrap commit
final HoodieData<HoodieRecord> allPartitionRecordsRDD = engineContext.parallelize(
Collections.singletonList(allPartitionRecord), 1);
partitionToRecordsMap.put(MetadataPartitionType.FILES, allPartitionRecordsRDD);
commit(createInstantTime, partitionToRecordsMap, false);
return;
}
HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
@@ -1051,13 +1043,17 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
} }
LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata"); LOG.info("Committing " + partitions.size() + " partitions and " + partitionToFilesMap.values().size() + " files to metadata");
commit(createInstantTime, partitionToRecordsMap, false); commit(createInstantTime, partitionToRecordsMap, false);
} }
private HoodieData<HoodieRecord> getFilesPartitionRecords(String createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord allPartitionRecord) { private HoodieData<HoodieRecord> getFilesPartitionRecords(String createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord allPartitionRecord) {
HoodieData<HoodieRecord> filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); HoodieData<HoodieRecord> filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
if (!partitionInfoList.isEmpty()) { if (partitionInfoList.isEmpty()) {
return filesPartitionRecords;
}
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> { HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap(); Map<String, Long> fileNameToSizeMap = partitionInfo.getFileNameToSizeMap();
// filter for files that are part of the completed commits // filter for files that are part of the completed commits
@@ -1068,11 +1064,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
// Record which saves files within a partition // Record which saves files within a partition
return HoodieMetadataPayload.createPartitionFilesRecord( return HoodieMetadataPayload.createPartitionFilesRecord(
HoodieTableMetadataUtil.getPartition(partitionInfo.getRelativePath()), Option.of(validFileNameToSizeMap), Option.empty()); HoodieTableMetadataUtil.getPartitionIdentifier(partitionInfo.getRelativePath()), Option.of(validFileNameToSizeMap), Option.empty());
}); });
filesPartitionRecords = filesPartitionRecords.union(fileListRecords);
} return filesPartitionRecords.union(fileListRecords);
return filesPartitionRecords;
} }
/** /**

View File

@@ -1454,7 +1454,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
.forEach(partitionWriteStat -> { .forEach(partitionWriteStat -> {
String partitionStatName = partitionWriteStat.getKey(); String partitionStatName = partitionWriteStat.getKey();
List<HoodieWriteStat> writeStats = partitionWriteStat.getValue(); List<HoodieWriteStat> writeStats = partitionWriteStat.getValue();
String partition = HoodieTableMetadataUtil.getPartition(partitionStatName); String partition = HoodieTableMetadataUtil.getPartitionIdentifier(partitionStatName);
if (!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) { if (!commitToPartitionsToFiles.get(commitTime).containsKey(partition)) {
commitToPartitionsToFiles.get(commitTime).put(partition, new ArrayList<>()); commitToPartitionsToFiles.get(commitTime).put(partition, new ArrayList<>());
} }

View File

@@ -80,7 +80,7 @@ import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument; import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartition; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
/** /**
@@ -256,7 +256,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
*/ */
public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions, boolean isDeleted) { public static HoodieRecord<HoodieMetadataPayload> createPartitionListRecord(List<String> partitions, boolean isDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(); Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, isDeleted))); partitions.forEach(partition -> fileInfo.put(getPartitionIdentifier(partition), new HoodieMetadataFileInfo(0L, isDeleted)));
HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath());
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST, HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,

View File

@@ -297,7 +297,7 @@ public class HoodieTableMetadataUtil {
List<HoodieRecord> records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size()); List<HoodieRecord> records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
// Add record bearing added partitions list // Add record bearing added partitions list
List<String> partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); List<String> partitionsAdded = getPartitionsAdded(commitMetadata);
// Add record bearing deleted partitions list // Add record bearing deleted partitions list
List<String> partitionsDeleted = getPartitionsDeleted(commitMetadata); List<String> partitionsDeleted = getPartitionsDeleted(commitMetadata);
@@ -312,7 +312,7 @@ public class HoodieTableMetadataUtil {
String partitionStatName = entry.getKey(); String partitionStatName = entry.getKey();
List<HoodieWriteStat> writeStats = entry.getValue(); List<HoodieWriteStat> writeStats = entry.getValue();
String partition = getPartition(partitionStatName); String partition = getPartitionIdentifier(partitionStatName);
HashMap<String, Long> updatedFilesToSizesMapping = HashMap<String, Long> updatedFilesToSizesMapping =
writeStats.stream().reduce(new HashMap<>(writeStats.size()), writeStats.stream().reduce(new HashMap<>(writeStats.size()),
@@ -352,16 +352,26 @@ public class HoodieTableMetadataUtil {
return records; return records;
} }
private static ArrayList<String> getPartitionsDeleted(HoodieCommitMetadata commitMetadata) { private static List<String> getPartitionsAdded(HoodieCommitMetadata commitMetadata) {
return commitMetadata.getPartitionToWriteStats().keySet().stream()
// We need to make sure we properly handle case of non-partitioned tables
.map(HoodieTableMetadataUtil::getPartitionIdentifier)
.collect(Collectors.toList());
}
private static List<String> getPartitionsDeleted(HoodieCommitMetadata commitMetadata) {
if (commitMetadata instanceof HoodieReplaceCommitMetadata if (commitMetadata instanceof HoodieReplaceCommitMetadata
&& WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { && WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds = Map<String, List<String>> partitionToReplaceFileIds =
((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds(); ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds();
if (!partitionToReplaceFileIds.isEmpty()) {
return new ArrayList<>(partitionToReplaceFileIds.keySet()); return partitionToReplaceFileIds.keySet().stream()
// We need to make sure we properly handle case of non-partitioned tables
.map(HoodieTableMetadataUtil::getPartitionIdentifier)
.collect(Collectors.toList());
} }
}
return new ArrayList<>(); return Collections.emptyList();
} }
/** /**
@@ -469,7 +479,7 @@ public class HoodieTableMetadataUtil {
int[] fileDeleteCount = {0}; int[] fileDeleteCount = {0};
List<String> deletedPartitions = new ArrayList<>(); List<String> deletedPartitions = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> { cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
final String partition = getPartition(partitionName); final String partition = getPartitionIdentifier(partitionName);
// Files deleted from a partition // Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns(); List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
@@ -776,7 +786,7 @@ public class HoodieTableMetadataUtil {
partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> { partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
fileChangeCount[0] += deletedFiles.size(); fileChangeCount[0] += deletedFiles.size();
final String partition = getPartition(partitionName); final String partition = getPartitionIdentifier(partitionName);
Option<Map<String, Long>> filesAdded = Option.empty(); Option<Map<String, Long>> filesAdded = Option.empty();
if (partitionToAppendedFiles.containsKey(partitionName)) { if (partitionToAppendedFiles.containsKey(partitionName)) {
@@ -789,7 +799,7 @@ public class HoodieTableMetadataUtil {
}); });
partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> { partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
final String partition = getPartition(partitionName); final String partition = getPartitionIdentifier(partitionName);
fileChangeCount[1] += appendedFileMap.size(); fileChangeCount[1] += appendedFileMap.size();
// Validate that no appended file has been deleted // Validate that no appended file has been deleted
@@ -811,12 +821,9 @@ public class HoodieTableMetadataUtil {
/** /**
* Returns partition name for the given path. * Returns partition name for the given path.
*
* @param path
* @return
*/ */
public static String getPartition(@Nonnull String path) { public static String getPartitionIdentifier(@Nonnull String relativePartitionPath) {
return EMPTY_PARTITION_NAME.equals(path) ? NON_PARTITIONED_NAME : path; return EMPTY_PARTITION_NAME.equals(relativePartitionPath) ? NON_PARTITIONED_NAME : relativePartitionPath;
} }
/** /**
@@ -842,7 +849,7 @@ public class HoodieTableMetadataUtil {
return Stream.empty(); return Stream.empty();
} }
final String partition = getPartition(partitionName); final String partition = getPartitionIdentifier(partitionName);
return Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord( return Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, ByteBuffer.allocate(0), true)); partition, deletedFile, instantTime, StringUtils.EMPTY_STRING, ByteBuffer.allocate(0), true));
}).iterator(); }).iterator();
@@ -857,7 +864,7 @@ public class HoodieTableMetadataUtil {
HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> { HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
final String partitionName = partitionToAppendedFilesPair.getLeft(); final String partitionName = partitionToAppendedFilesPair.getLeft();
final Map<String, Long> appendedFileMap = partitionToAppendedFilesPair.getRight(); final Map<String, Long> appendedFileMap = partitionToAppendedFilesPair.getRight();
final String partition = getPartition(partitionName); final String partition = getPartitionIdentifier(partitionName);
return appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> { return appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
final String appendedFile = appendedFileLengthPairEntry.getKey(); final String appendedFile = appendedFileLengthPairEntry.getKey();
if (!FSUtils.isBaseFile(new Path(appendedFile))) { if (!FSUtils.isBaseFile(new Path(appendedFile))) {
@@ -912,7 +919,7 @@ public class HoodieTableMetadataUtil {
HoodieData<HoodieRecord> deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> { HoodieData<HoodieRecord> deletedFilesRecordsRDD = partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
final String partitionName = partitionToDeletedFilesPair.getLeft(); final String partitionName = partitionToDeletedFilesPair.getLeft();
final String partition = getPartition(partitionName); final String partition = getPartitionIdentifier(partitionName);
final List<String> deletedFileList = partitionToDeletedFilesPair.getRight(); final List<String> deletedFileList = partitionToDeletedFilesPair.getRight();
return deletedFileList.stream().flatMap(deletedFile -> { return deletedFileList.stream().flatMap(deletedFile -> {
@@ -929,7 +936,7 @@ public class HoodieTableMetadataUtil {
HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> { HoodieData<HoodieRecord> appendedFilesRecordsRDD = partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
final String partitionName = partitionToAppendedFilesPair.getLeft(); final String partitionName = partitionToAppendedFilesPair.getLeft();
final String partition = getPartition(partitionName); final String partition = getPartitionIdentifier(partitionName);
final Map<String, Long> appendedFileMap = partitionToAppendedFilesPair.getRight(); final Map<String, Long> appendedFileMap = partitionToAppendedFilesPair.getRight();
return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> { return appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthEntry -> {
@@ -1133,7 +1140,7 @@ public class HoodieTableMetadataUtil {
HoodieTableMetaClient datasetMetaClient, HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex, List<String> columnsToIndex,
boolean isDeleted) { boolean isDeleted) {
String partitionName = getPartition(partitionPath); String partitionName = getPartitionIdentifier(partitionPath);
// NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like // NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like
// absolute path // absolute path
String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath; String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;