[HUDI-3281][Performance]Tuning performance of getAllPartitionPaths API in FileSystemBackedTableMetadata (#4643)
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -31,12 +31,11 @@ import org.apache.hadoop.fs.FileSystem;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
|
public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
|
||||||
@@ -64,45 +63,41 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<String> getAllPartitionPaths() throws IOException {
|
public List<String> getAllPartitionPaths() throws IOException {
|
||||||
|
FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
|
||||||
if (assumeDatePartitioning) {
|
if (assumeDatePartitioning) {
|
||||||
FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
|
|
||||||
return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
|
return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Path> pathsToList = new LinkedList<>();
|
List<Path> pathsToList = new CopyOnWriteArrayList<>();
|
||||||
pathsToList.add(new Path(datasetBasePath));
|
pathsToList.add(new Path(datasetBasePath));
|
||||||
List<String> partitionPaths = new ArrayList<>();
|
List<String> partitionPaths = new CopyOnWriteArrayList<>();
|
||||||
|
|
||||||
while (!pathsToList.isEmpty()) {
|
while (!pathsToList.isEmpty()) {
|
||||||
// TODO: Get the parallelism from HoodieWriteConfig
|
// TODO: Get the parallelism from HoodieWriteConfig
|
||||||
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
|
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
|
||||||
|
|
||||||
// List all directories in parallel
|
// List all directories in parallel
|
||||||
List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
|
List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList, path -> {
|
||||||
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
|
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
|
||||||
return Pair.of(path, fileSystem.listStatus(path));
|
return fileSystem.listStatus(path);
|
||||||
}, listingParallelism);
|
}, listingParallelism);
|
||||||
pathsToList.clear();
|
pathsToList.clear();
|
||||||
|
|
||||||
// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
|
// if current dictionary contains PartitionMetadata, add it to result
|
||||||
// the results.
|
// if current dictionary does not contain PartitionMetadata, add it to queue
|
||||||
dirToFileListing.forEach(p -> {
|
dirToFileListing.stream().flatMap(Arrays::stream).parallel()
|
||||||
Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
|
.forEach(fileStatus -> {
|
||||||
.filter(fs -> fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
|
if (fileStatus.isDirectory()) {
|
||||||
.findFirst());
|
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) {
|
||||||
|
partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath()));
|
||||||
if (partitionMetaFile.isPresent()) {
|
} else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
|
||||||
// Is a partition.
|
pathsToList.add(fileStatus.getPath());
|
||||||
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), p.getLeft());
|
}
|
||||||
partitionPaths.add(partitionName);
|
} else if (fileStatus.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
|
||||||
} else {
|
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
|
||||||
// Add sub-dirs to the queue
|
partitionPaths.add(partitionName);
|
||||||
pathsToList.addAll(Arrays.stream(p.getRight())
|
}
|
||||||
.filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
|
});
|
||||||
.map(fs -> fs.getPath())
|
|
||||||
.collect(Collectors.toList()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
return partitionPaths;
|
return partitionPaths;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user