1
0

[HUDI-999] [RFC-12] Parallelize fetching of source data files/partitions (#1924)

This commit is contained in:
Udit Mehrotra
2020-08-06 23:44:57 -07:00
committed by GitHub
parent b51646dcc7
commit ab453f2623
4 changed files with 95 additions and 52 deletions

View File

@@ -33,7 +33,6 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -280,13 +279,8 @@ public class BootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
* @throws IOException
*/
private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException {
List<Pair<String, List<HoodieFileStatus>>> folders =
BootstrapUtils.getAllLeafFoldersWithFiles(bootstrapSourceFileSystem,
config.getBootstrapSourceBasePath(), path -> {
// TODO: Needs to be abstracted out when supporting different formats
// TODO: Remove hoodieFilter
return path.getName().endsWith(HoodieFileFormat.PARQUET.getFileExtension());
});
List<Pair<String, List<HoodieFileStatus>>> folders = BootstrapUtils.getAllLeafFoldersWithFiles(
table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), jsc);
LOG.info("Fetching Bootstrap Schema !!");
BootstrapSchemaProvider sourceSchemaProvider = new BootstrapSchemaProvider(config);

View File

@@ -18,14 +18,20 @@
package org.apache.hudi.table.action.bootstrap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
@@ -39,39 +45,88 @@ public class BootstrapUtils {
/**
* Returns leaf folders with files under a path.
* @param metaClient Hoodie table metadata client
* @param fs File System
* @param basePathStr Base Path to look for leaf folders
* @param filePathFilter Filters to skip directories/paths
* @param jsc Java spark context
* @return list of partition paths with files under them.
* @throws IOException
*/
public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFiles(FileSystem fs, String basePathStr,
PathFilter filePathFilter) throws IOException {
public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFiles(HoodieTableMetaClient metaClient,
FileSystem fs, String basePathStr, JavaSparkContext jsc) throws IOException {
final Path basePath = new Path(basePathStr);
final String baseFileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
final Map<Integer, List<String>> levelToPartitions = new HashMap<>();
final Map<String, List<HoodieFileStatus>> partitionToFiles = new HashMap<>();
FSUtils.processFiles(fs, basePathStr, (status) -> {
if (status.isFile() && filePathFilter.accept(status.getPath())) {
String relativePath = FSUtils.getRelativePartitionPath(basePath, status.getPath().getParent());
List<HoodieFileStatus> statusList = partitionToFiles.get(relativePath);
if (null == statusList) {
Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count();
List<String> dirs = levelToPartitions.get(level);
if (null == dirs) {
dirs = new ArrayList<>();
levelToPartitions.put(level, dirs);
}
dirs.add(relativePath);
statusList = new ArrayList<>();
partitionToFiles.put(relativePath, statusList);
}
statusList.add(FileStatusUtils.fromFileStatus(status));
PathFilter filePathFilter = getFilePathFilter(baseFileExtension);
PathFilter metaPathFilter = getExcludeMetaPathFilter();
FileStatus[] topLevelStatuses = fs.listStatus(basePath);
List<String> subDirectories = new ArrayList<>();
List<Pair<HoodieFileStatus, Pair<Integer, String>>> result = new ArrayList<>();
for (FileStatus topLevelStatus: topLevelStatuses) {
if (topLevelStatus.isFile() && filePathFilter.accept(topLevelStatus.getPath())) {
String relativePath = FSUtils.getRelativePartitionPath(basePath, topLevelStatus.getPath().getParent());
Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count();
HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(topLevelStatus);
result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
} else if (metaPathFilter.accept(topLevelStatus.getPath())) {
subDirectories.add(topLevelStatus.getPath().toString());
}
return true;
}, true);
}
if (subDirectories.size() > 0) {
result.addAll(jsc.parallelize(subDirectories, subDirectories.size()).flatMap(directory -> {
PathFilter pathFilter = getFilePathFilter(baseFileExtension);
Path path = new Path(directory);
FileSystem fileSystem = path.getFileSystem(new Configuration());
RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true);
List<Pair<HoodieFileStatus, Pair<Integer, String>>> res = new ArrayList<>();
while (itr.hasNext()) {
FileStatus status = itr.next();
if (pathFilter.accept(status.getPath())) {
String relativePath = FSUtils.getRelativePartitionPath(new Path(basePathStr), status.getPath().getParent());
Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count();
HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(status);
res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
}
}
return res.iterator();
}).collect());
}
result.forEach(val -> {
String relativePath = val.getRight().getRight();
List<HoodieFileStatus> statusList = partitionToFiles.get(relativePath);
if (null == statusList) {
Integer level = val.getRight().getLeft();
List<String> dirs = levelToPartitions.get(level);
if (null == dirs) {
dirs = new ArrayList<>();
levelToPartitions.put(level, dirs);
}
dirs.add(relativePath);
statusList = new ArrayList<>();
partitionToFiles.put(relativePath, statusList);
}
statusList.add(val.getLeft());
});
OptionalInt maxLevelOpt = levelToPartitions.keySet().stream().mapToInt(x -> x).max();
int maxLevel = maxLevelOpt.orElse(-1);
return maxLevel >= 0 ? levelToPartitions.get(maxLevel).stream()
.map(d -> Pair.of(d, partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>();
.map(d -> Pair.of(d, partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>();
}
private static PathFilter getFilePathFilter(String baseFileExtension) {
return (path) -> {
return path.getName().endsWith(baseFileExtension);
};
}
private static PathFilter getExcludeMetaPathFilter() {
// Avoid listing and including any folders under the metafolder
return (path) -> !path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME);
}
}

View File

@@ -63,20 +63,15 @@ public class TestBootstrapUtils extends HoodieClientTestBase {
}
});
List<Pair<String, List<HoodieFileStatus>>> collected =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath, (status) -> {
return true;
});
List<Pair<String, List<HoodieFileStatus>>> collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
metaClient.getFs(), basePath, jsc);
assertEquals(3, collected.size());
collected.stream().forEach(k -> {
assertEquals(2, k.getRight().size());
});
// Simulate reading from un-partitioned dataset
collected =
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath + "/" + folders.get(0), (status) -> {
return true;
});
collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath + "/" + folders.get(0), jsc);
assertEquals(1, collected.size());
collected.stream().forEach(k -> {
assertEquals(2, k.getRight().size());

View File

@@ -171,9 +171,9 @@ public class TestBootstrap extends HoodieClientTestBase {
} else {
df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
}
String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath,
(status) -> status.getName().endsWith(".parquet")).stream().findAny().map(p -> p.getValue().stream().findAny())
.orElse(null).get().getPath()).toString();
String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
srcPath, jsc).stream().findAny().map(p -> p.getValue().stream().findAny())
.orElse(null).get().getPath()).toString();
ParquetFileReader reader = ParquetFileReader.open(metaClient.getHadoopConf(), new Path(filePath));
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
return new AvroSchemaConverter().convert(schema);
@@ -266,8 +266,8 @@ public class TestBootstrap extends HoodieClientTestBase {
client.rollBackInflightBootstrap();
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath,
(status) -> status.getName().endsWith(".parquet")).stream().flatMap(f -> f.getValue().stream()).count());
assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, jsc)
.stream().flatMap(f -> f.getValue().stream()).count());
BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
assertFalse(index.useIndex());
@@ -292,8 +292,8 @@ public class TestBootstrap extends HoodieClientTestBase {
String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
JavaRDD<HoodieRecord> updateBatch =
generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), updateSPath,
(status) -> status.getName().endsWith("parquet")), schema);
generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), updateSPath, jsc),
schema);
String newInstantTs = client.startCommit();
client.upsert(updateBatch, newInstantTs);
checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1,
@@ -353,9 +353,8 @@ public class TestBootstrap extends HoodieClientTestBase {
bootstrapped.registerTempTable("bootstrapped");
original.registerTempTable("original");
if (checkNumRawFiles) {
List<HoodieFileStatus> files = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), bootstrapBasePath,
(status) -> status.getName().endsWith(".parquet"))
.stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList());
List<HoodieFileStatus> files = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
bootstrapBasePath, jsc).stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList());
assertEquals(files.size() * numVersions,
sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count());
}