diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 9877755b3..bcfd89171 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -29,11 +29,11 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.exception.HoodieMetadataException; import java.io.IOException; import java.util.Arrays; @@ -83,31 +83,39 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size()); // List all directories in parallel - List> dirToFileListing = engineContext.map(pathsToList, path -> { + List dirToFileListing = engineContext.flatMap(pathsToList, path -> { FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); - return Pair.of(path, fileSystem.listStatus(path)); + return Arrays.stream(fileSystem.listStatus(path)); }, listingParallelism); pathsToList.clear(); // if current dictionary contains PartitionMetadata, add it to result - // if current dictionary does not contain PartitionMetadata, add it to queue - dirToFileListing.forEach(p -> { - Option partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel() - .filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) - .findFirst()); + // if current dictionary does not contain PartitionMetadata, add it to queue to be processed. + int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size()); + if (!dirToFileListing.isEmpty()) { + // result below holds a list of pair. first entry in the pair optionally holds the deduced list of partitions. + // and second entry holds optionally a directory path to be processed further. + List, Option>> result = engineContext.map(dirToFileListing, fileStatus -> { + FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get()); + if (fileStatus.isDirectory()) { + if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) { + return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())), Option.empty()); + } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) { + return Pair.of(Option.empty(), Option.of(fileStatus.getPath())); + } + } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) { + String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent()); + return Pair.of(Option.of(partitionName), Option.empty()); + } + return Pair.of(Option.empty(), Option.empty()); + }, fileListingParallelism); - if (partitionMetaFile.isPresent()) { - // Is a partition. - String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft()); - partitionPaths.add(partitionName); - } else { - // Add sub-dirs to the queue - pathsToList.addAll(Arrays.stream(p.getRight()) - .filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) - .map(fileStatus -> fileStatus.getPath()) - .collect(Collectors.toList())); - } - }); + partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get()) + .collect(Collectors.toList())); + + pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get()) + .collect(Collectors.toList())); + } } return partitionPaths; }