[HUDI-1469] Faster initialization of metadata table using parallelized listing. (#2343)
* [HUDI-1469] Faster initialization of metadata table using parallelized listing which finds partitions and files in a single scan. * MINOR fixes Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
committed by
vinoth chandar
parent
4e64226844
commit
2bd4a68731
@@ -82,6 +82,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
public static final String BULKINSERT_INPUT_DATA_SCHEMA_DDL = "hoodie.bulkinsert.schema.ddl";
|
||||
public static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
|
||||
public static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
|
||||
public static final String FILE_LISTING_PARALLELISM = "hoodie.file.listing.parallelism";
|
||||
public static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
|
||||
public static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
|
||||
public static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
|
||||
@@ -256,6 +257,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
|
||||
}
|
||||
|
||||
public int getFileListingParallelism() {
|
||||
return Math.max(Integer.parseInt(props.getProperty(FILE_LISTING_PARALLELISM)), 1);
|
||||
}
|
||||
|
||||
public boolean shouldRollbackUsingMarkers() {
|
||||
return Boolean.parseBoolean(props.getProperty(ROLLBACK_USING_MARKERS));
|
||||
}
|
||||
@@ -1002,6 +1007,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withFileListingParallelism(int parallelism) {
|
||||
props.setProperty(FILE_LISTING_PARALLELISM, String.valueOf(parallelism));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withUserDefinedBulkInsertPartitionerClass(String className) {
|
||||
props.setProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, className);
|
||||
return this;
|
||||
@@ -1188,6 +1198,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
DEFAULT_PARALLELISM);
|
||||
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
|
||||
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
|
||||
setDefaultOnCondition(props, !props.containsKey(FILE_LISTING_PARALLELISM), FILE_LISTING_PARALLELISM,
|
||||
DEFAULT_PARALLELISM);
|
||||
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
|
||||
DEFAULT_ROLLBACK_PARALLELISM);
|
||||
setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_CLASS_PROP),
|
||||
|
||||
@@ -274,44 +274,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
initTableMetadata();
|
||||
|
||||
// List all partitions in the basePath of the containing dataset
|
||||
FileSystem fs = datasetMetaClient.getFs();
|
||||
FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(hadoopConf, datasetWriteConfig.getBasePath(),
|
||||
datasetWriteConfig.shouldAssumeDatePartitioning());
|
||||
List<String> partitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
|
||||
LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions");
|
||||
|
||||
// List all partitions in parallel and collect the files in them
|
||||
int parallelism = Math.max(partitions.size(), 1);
|
||||
List<Pair<String, FileStatus[]>> partitionFileList = engineContext.map(partitions, partition -> {
|
||||
FileStatus[] statuses = fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(datasetWriteConfig.getBasePath(), partition));
|
||||
return Pair.of(partition, statuses);
|
||||
}, parallelism);
|
||||
LOG.info("Initializing metadata table by using file listings in " + datasetWriteConfig.getBasePath());
|
||||
Map<String, List<FileStatus>> partitionToFileStatus = getPartitionsToFilesMapping(datasetMetaClient);
|
||||
|
||||
// Create a HoodieCommitMetadata with writeStats for all discovered files
|
||||
int[] stats = {0};
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
|
||||
partitionFileList.forEach(t -> {
|
||||
final String partition = t.getKey();
|
||||
try {
|
||||
if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
|
||||
return;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieMetadataException("Failed to check partition " + partition, e);
|
||||
}
|
||||
|
||||
partitionToFileStatus.forEach((partition, statuses) -> {
|
||||
// Filter the statuses to only include files which were created before or on createInstantTime
|
||||
Arrays.stream(t.getValue()).filter(status -> {
|
||||
statuses.stream().filter(status -> {
|
||||
String filename = status.getPath().getName();
|
||||
if (filename.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
|
||||
return false;
|
||||
}
|
||||
if (HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
|
||||
createInstantTime)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return !HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
|
||||
createInstantTime);
|
||||
}).forEach(status -> {
|
||||
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||
writeStat.setPath(partition + Path.SEPARATOR + status.getPath().getName());
|
||||
@@ -329,10 +304,56 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
}
|
||||
});
|
||||
|
||||
LOG.info("Committing " + partitionFileList.size() + " partitions and " + stats[0] + " files to metadata");
|
||||
LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
|
||||
update(commitMetadata, createInstantTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to find hoodie partitions and list files in them in parallel.
|
||||
*
|
||||
* @param datasetMetaClient
|
||||
* @return Map of partition names to a list of FileStatus for all the files in the partition
|
||||
*/
|
||||
private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient datasetMetaClient) {
|
||||
List<Path> pathsToList = new LinkedList<>();
|
||||
pathsToList.add(new Path(datasetWriteConfig.getBasePath()));
|
||||
|
||||
Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
|
||||
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
|
||||
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
|
||||
|
||||
while (!pathsToList.isEmpty()) {
|
||||
int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
|
||||
// List all directories in parallel
|
||||
List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
|
||||
FileSystem fs = path.getFileSystem(conf.get());
|
||||
return Pair.of(path, fs.listStatus(path));
|
||||
}, listingParallelism);
|
||||
pathsToList.clear();
|
||||
|
||||
// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
|
||||
// the results.
|
||||
dirToFileListing.forEach(p -> {
|
||||
List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
|
||||
.filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (p.getRight().length > filesInDir.size()) {
|
||||
// Is a partition. Add all data files to result.
|
||||
partitionToFileStatus.put(p.getLeft().getName(), filesInDir);
|
||||
} else {
|
||||
// Add sub-dirs to the queue
|
||||
pathsToList.addAll(Arrays.stream(p.getRight())
|
||||
.filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
|
||||
.map(fs -> fs.getPath())
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return partitionToFileStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync the Metadata Table from the instants created on the dataset.
|
||||
*
|
||||
@@ -413,7 +434,9 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
writeStats.forEach(hoodieWriteStat -> {
|
||||
String pathWithPartition = hoodieWriteStat.getPath();
|
||||
if (pathWithPartition == null) {
|
||||
throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
|
||||
// Empty partition
|
||||
LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
|
||||
return;
|
||||
}
|
||||
|
||||
int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
|
||||
|
||||
Reference in New Issue
Block a user