1
0

[HUDI-4221] Optimzing getAllPartitionPaths (#6234)

- Levering spark par for dir processing
This commit is contained in:
Sivabalan Narayanan
2022-07-29 03:49:56 -04:00
committed by GitHub
parent ce4330d62b
commit 765dd2eae6

View File

@@ -29,11 +29,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.exception.HoodieMetadataException;
import java.io.IOException;
import java.util.Arrays;
@@ -83,31 +83,39 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
// List all directories in parallel
List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
return Pair.of(path, fileSystem.listStatus(path));
return Arrays.stream(fileSystem.listStatus(path));
}, listingParallelism);
pathsToList.clear();
// if current dictionary contains PartitionMetadata, add it to result
// if current dictionary does not contain PartitionMetadata, add it to queue
dirToFileListing.forEach(p -> {
Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
.filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
.findFirst());
// if current dictionary does not contain PartitionMetadata, add it to queue to be processed.
int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size());
if (!dirToFileListing.isEmpty()) {
// result below holds a list of pair. first entry in the pair optionally holds the deduced list of partitions.
// and second entry holds optionally a directory path to be processed further.
List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> {
FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get());
if (fileStatus.isDirectory()) {
if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) {
return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())), Option.empty());
} else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
}
} else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
return Pair.of(Option.of(partitionName), Option.empty());
}
return Pair.of(Option.empty(), Option.empty());
}, fileListingParallelism);
if (partitionMetaFile.isPresent()) {
// Is a partition.
String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
partitionPaths.add(partitionName);
} else {
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
.filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
.map(fileStatus -> fileStatus.getPath())
.collect(Collectors.toList()));
}
});
partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get())
.collect(Collectors.toList()));
pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get())
.collect(Collectors.toList()));
}
}
return partitionPaths;
}