1
0

[HUDI-2634] Improved the metadata table bootstrap for very large tables. (#3873)

* [HUDI-2634] Improved the metadata table bootstrap for very large tables.

Following improvements are implemented:
1. Memory overhead reduction:
  - Existing code caches FileStatus for each file in memory.
  - Created a new class DirectoryInfo which is used to cache a director's file list with parts of the FileStatus (only filename and file len). This reduces the memory requirements.

2. Improved parallelism:
  - Existing code collects all the listing to the Driver and then creates HoodieRecord on the Driver.
  - This takes a long time for large tables (11million HoodieRecords to be created)
  - Created a new function in SparkRDDWriteClient specifically for bootstrap commit. In it, the HoodieRecord creation is parallelized across executors so it completes fast.

3. Fixed setting to limit the number of parallel listings:
  - Existing code had a bug wherein 1500 executors were hardcoded to perform listing. This leads to exception due to limit in the spark's result memory.
  - Corrected the use of the config.

Result:
Dataset has 1299 partitions and 12Million files.
file listing time=1.5mins
HoodieRecord creation time=13seconds
deltacommit duration=2.6mins

Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
Prashant Wason
2021-11-10 19:37:48 -08:00
committed by GitHub
parent 90f9b4562a
commit 77b0440eb4
8 changed files with 188 additions and 76 deletions

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -39,7 +40,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -51,7 +51,6 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -68,6 +67,8 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -175,7 +176,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
.withAutoCommit(true)
.withAvroSchemaValidate(true)
.withEmbeddedTimelineServerEnabled(false)
@@ -400,92 +401,68 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
// List all partitions in the basePath of the containing dataset
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
Map<String, List<FileStatus>> partitionToFileStatus = getPartitionsToFilesMapping(dataMetaClient);
List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
// Create a HoodieCommitMetadata with writeStats for all discovered files
int[] stats = {0};
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
partitionToFileStatus.forEach((partition, statuses) -> {
// Filter the statuses to only include files which were created before or on createInstantTime
statuses.stream().filter(status -> {
String filename = status.getPath().getName();
return !HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(filename), HoodieTimeline.GREATER_THAN,
createInstantTime);
}).forEach(status -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPath((partition.isEmpty() ? "" : partition + Path.SEPARATOR) + status.getPath().getName());
writeStat.setPartitionPath(partition);
writeStat.setTotalWriteBytes(status.getLen());
commitMetadata.addWriteStat(partition, writeStat);
stats[0] += 1;
});
// If the partition has no files then create a writeStat with no file path
if (commitMetadata.getWriteStats(partition) == null) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partition);
commitMetadata.addWriteStat(partition, writeStat);
}
});
LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
update(commitMetadata, createInstantTime, false);
// During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these
// large number of files and calling the existing update(HoodieCommitMetadata) function does not scale well.
// Hence, we have a special commit just for the bootstrap scenario.
bootstrapCommit(dirInfoList, createInstantTime);
return true;
}
/**
* Function to find hoodie partitions and list files in them in parallel.
*
* @param dataMetaClient
* @param datasetMetaClient data set meta client instance.
* @return Map of partition names to a list of FileStatus for all the files in the partition
*/
private Map<String, List<FileStatus>> getPartitionsToFilesMapping(HoodieTableMetaClient dataMetaClient) {
private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaClient) {
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(dataWriteConfig.getBasePath()));
Map<String, List<FileStatus>> partitionToFileStatus = new HashMap<>();
List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new SerializableConfiguration(dataMetaClient.getHadoopConf());
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
final String datasetBasePath = datasetMetaClient.getBasePath();
while (!pathsToList.isEmpty()) {
int listingParallelism = Math.min(fileListingParallelism, pathsToList.size());
// In each round we will list a section of directories
int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
List<DirectoryInfo> processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
FileSystem fs = path.getFileSystem(conf.get());
return Pair.of(path, fs.listStatus(path));
}, listingParallelism);
pathsToList.clear();
String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path);
return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
}, numDirsToList);
pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size()));
// If the listing reveals a directory, add it to queue. If the listing reveals a hoodie partition, add it to
// the results.
dirToFileListing.forEach(p -> {
if (!dirFilterRegex.isEmpty() && p.getLeft().getName().matches(dirFilterRegex)) {
LOG.info("Ignoring directory " + p.getLeft() + " which matches the filter regex " + dirFilterRegex);
return;
for (DirectoryInfo dirInfo : processedDirectories) {
if (!dirFilterRegex.isEmpty()) {
final String relativePath = dirInfo.getRelativePath();
if (!relativePath.isEmpty()) {
Path partitionPath = new Path(datasetBasePath, relativePath);
if (partitionPath.getName().matches(dirFilterRegex)) {
LOG.info("Ignoring directory " + partitionPath + " which matches the filter regex " + dirFilterRegex);
continue;
}
}
}
List<FileStatus> filesInDir = Arrays.stream(p.getRight()).parallel()
.filter(fs -> !fs.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))
.collect(Collectors.toList());
if (p.getRight().length > filesInDir.size()) {
String partitionName = FSUtils.getRelativePartitionPath(new Path(dataMetaClient.getBasePath()), p.getLeft());
// deal with Non-partition table, we should exclude .hoodie
partitionToFileStatus.put(partitionName, filesInDir.stream()
.filter(f -> !f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
if (dirInfo.isHoodiePartition()) {
// Add to result
partitionsToBootstrap.add(dirInfo);
} else {
// Add sub-dirs to the queue
pathsToList.addAll(Arrays.stream(p.getRight())
.filter(fs -> fs.isDirectory() && !fs.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
.map(fs -> fs.getPath())
.collect(Collectors.toList()));
pathsToList.addAll(dirInfo.getSubDirectories());
}
});
}
}
return partitionToFileStatus;
return partitionsToBootstrap;
}
/**
@@ -549,7 +526,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
if (enabled && metadata != null) {
List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService);
commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService);
}
}
@@ -611,7 +588,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), rollbackMetadata, instantTime,
metadata.getSyncedInstantTime(), wasSynced);
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, false);
commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, false);
}
}
@@ -624,12 +601,12 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
/**
* Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
* @param records The list of records to be written.
* @param records The HoodieData of records to be written.
* @param partitionName The partition to which the records are to be written.
* @param instantTime The timestamp to use for the deltacommit.
* @param canTriggerTableService true if table services can be scheduled and executed. false otherwise.
*/
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService);
protected abstract void commit(HoodieData<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService);
/**
* Perform a compaction on the Metadata Table.
@@ -668,4 +645,96 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
// metadata table.
writeClient.clean(instantTime + "002");
}
/**
* This is invoked to bootstrap metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to
* other regular commits.
*
*/
protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String createInstantTime) {
List<String> partitions = partitionInfoList.stream().map(p -> p.getRelativePath()).collect(Collectors.toList());
final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum();
// Record which saves the list of all partitions
HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
if (partitions.isEmpty()) {
// in case of boostrapping of a fresh table, there won't be any partitions, but we need to make a boostrap commit
commit(engineContext.parallelize(Collections.singletonList(allPartitionRecord), 1), MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
return;
}
HoodieData<HoodieRecord> partitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
if (!partitionInfoList.isEmpty()) {
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
// Record which saves files within a partition
return HoodieMetadataPayload.createPartitionFilesRecord(
partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
});
partitionRecords = partitionRecords.union(fileListRecords);
}
LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata");
ValidationUtils.checkState(partitionRecords.count() == (partitions.size() + 1));
commit(partitionRecords, MetadataPartitionType.FILES.partitionPath(), createInstantTime, false);
}
/**
* A class which represents a directory and the files and directories inside it.
*
* A {@code PartitionFileInfo} object saves the name of the partition and various properties requires of each file
* required for bootstrapping the metadata table. Saving limited properties reduces the total memory footprint when
* a very large number of files are present in the dataset being bootstrapped.
*/
static class DirectoryInfo implements Serializable {
// Relative path of the directory (relative to the base directory)
private final String relativePath;
// Map of filenames within this partition to their respective sizes
private HashMap<String, Long> filenameToSizeMap;
// List of directories within this partition
private final List<Path> subDirectories = new ArrayList<>();
// Is this a hoodie partition
private boolean isHoodiePartition = false;
public DirectoryInfo(String relativePath, FileStatus[] fileStatus) {
this.relativePath = relativePath;
// Pre-allocate with the maximum length possible
filenameToSizeMap = new HashMap<>(fileStatus.length);
for (FileStatus status : fileStatus) {
if (status.isDirectory()) {
// Ignore .hoodie directory as there cannot be any partitions inside it
if (!status.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
this.subDirectories.add(status.getPath());
}
} else if (status.getPath().getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
// Presence of partition meta file implies this is a HUDI partition
this.isHoodiePartition = true;
} else if (FSUtils.isDataFile(status.getPath())) {
// Regular HUDI data file (base file or log file)
filenameToSizeMap.put(status.getPath().getName(), status.getLen());
}
}
}
String getRelativePath() {
return relativePath;
}
int getTotalFiles() {
return filenameToSizeMap.size();
}
boolean isHoodiePartition() {
return isHoodiePartition;
}
List<Path> getSubDirectories() {
return subDirectories;
}
// Returns a map of filenames mapped to their lengths
Map<String, Long> getFileNameToSizeMap() {
return filenameToSizeMap;
}
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.metadata;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
@@ -91,8 +92,9 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
}
@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService) {
protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
List<HoodieRecord> records = (List<HoodieRecord>) hoodieDataRecords.get();
List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);
try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {

View File

@@ -130,6 +130,11 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
return HoodieJavaRDD.of(rddData.distinct());
}
@Override
public HoodieData<T> union(HoodieData<T> other) {
return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
}
@Override
public List<T> collectAsList() {
return rddData.collect();

View File

@@ -22,6 +22,7 @@ import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
@@ -39,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
@@ -121,9 +121,9 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
}
}
@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService) {
protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) hoodieDataRecords.get();
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
@@ -166,12 +166,11 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
*
* The record is tagged with respective file slice's location based on its record key.
*/
private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext();
return jsc.parallelize(records, 1).map(r -> {
return recordsRDD.map(r -> {
FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
return r;

View File

@@ -97,6 +97,13 @@ public abstract class HoodieData<T> implements Serializable {
*/
public abstract HoodieData<T> distinct();
/**
* Unions this {@link HoodieData} with other {@link HoodieData}.
* @param other {@link HoodieData} of interest.
* @return the union of two as as instance of {@link HoodieData}.
*/
public abstract HoodieData<T> union(HoodieData<T> other);
/**
* @return collected results in {@link List<T>}.
*/

View File

@@ -132,6 +132,14 @@ public class HoodieList<T> extends HoodieData<T> {
return HoodieList.of(new ArrayList<>(new HashSet<>(listData)));
}
@Override
public HoodieData<T> union(HoodieData<T> other) {
List<T> unionResult = new ArrayList<>();
unionResult.addAll(listData);
unionResult.addAll(other.collectAsList());
return HoodieList.of(unionResult);
}
@Override
public List<T> collectAsList() {
return listData;

View File

@@ -239,7 +239,7 @@ public class FSUtils {
/**
* Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its subdirs
* are skipped
*
*
* @param fs File System
* @param basePathStr Base-Path
* @param consumer Callback for processing
@@ -431,17 +431,29 @@ public class FSUtils {
public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version,
String writeToken) {
String suffix =
(writeToken == null) ? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version)
: String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken);
String suffix = (writeToken == null)
? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version)
: String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken);
return LOG_FILE_PREFIX + suffix;
}
public static boolean isBaseFile(Path path) {
String extension = getFileExtension(path.getName());
return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(extension);
}
public static boolean isLogFile(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
return matcher.find() && logPath.getName().contains(".log");
}
/**
* Returns true if the given path is a Base file or a Log file.
*/
public static boolean isDataFile(Path path) {
return isBaseFile(path) || isLogFile(path);
}
/**
* Get the names of all the base and log files in the given partition path.
*/

View File

@@ -18,6 +18,11 @@
package org.apache.hudi.common.model;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Hoodie file format.
*/
@@ -27,6 +32,11 @@ public enum HoodieFileFormat {
HFILE(".hfile"),
ORC(".orc");
public static final Set<String> BASE_FILE_EXTENSIONS = Arrays.stream(HoodieFileFormat.values())
.map(HoodieFileFormat::getFileExtension)
.filter(x -> !x.equals(HoodieFileFormat.HOODIE_LOG.getFileExtension()))
.collect(Collectors.toCollection(HashSet::new));
private final String extension;
HoodieFileFormat(String extension) {