1
0

[HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829)

This commit is contained in:
Sivabalan Narayanan
2022-06-11 16:17:42 -04:00
committed by GitHub
parent 2b3a85528a
commit 08fe281091

View File

@@ -68,13 +68,14 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
@Override @Override
public List<String> getAllPartitionPaths() throws IOException { public List<String> getAllPartitionPaths() throws IOException {
FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get()); Path basePath = new Path(datasetBasePath);
FileSystem fs = basePath.getFileSystem(hadoopConf.get());
if (assumeDatePartitioning) { if (assumeDatePartitioning) {
return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath); return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
} }
List<Path> pathsToList = new CopyOnWriteArrayList<>(); List<Path> pathsToList = new CopyOnWriteArrayList<>();
pathsToList.add(new Path(datasetBasePath)); pathsToList.add(basePath);
List<String> partitionPaths = new CopyOnWriteArrayList<>(); List<String> partitionPaths = new CopyOnWriteArrayList<>();
while (!pathsToList.isEmpty()) { while (!pathsToList.isEmpty()) {
@@ -82,25 +83,29 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size()); int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
// List all directories in parallel // List all directories in parallel
List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList, path -> { List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
return fileSystem.listStatus(path); return Pair.of(path, fileSystem.listStatus(path));
}, listingParallelism); }, listingParallelism);
pathsToList.clear(); pathsToList.clear();
// if current dictionary contains PartitionMetadata, add it to result // if current dictionary contains PartitionMetadata, add it to result
// if current dictionary does not contain PartitionMetadata, add it to queue // if current dictionary does not contain PartitionMetadata, add it to queue
dirToFileListing.stream().flatMap(Arrays::stream).parallel() dirToFileListing.forEach(p -> {
.forEach(fileStatus -> { Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
if (fileStatus.isDirectory()) { .filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) { .findFirst());
partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath()));
} else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { if (partitionMetaFile.isPresent()) {
pathsToList.add(fileStatus.getPath()); // Is a partition.
} String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
} else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
partitionPaths.add(partitionName); partitionPaths.add(partitionName);
} else {
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
.filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
.map(fileStatus -> fileStatus.getPath())
.collect(Collectors.toList()));
} }
}); });
} }