diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java index e4224fdd8..57b41ef40 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java @@ -33,7 +33,6 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BootstrapFileMapping; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -280,13 +279,8 @@ public class BootstrapCommitActionExecutor> * @throws IOException */ private Map>>> listAndProcessSourcePartitions() throws IOException { - List>> folders = - BootstrapUtils.getAllLeafFoldersWithFiles(bootstrapSourceFileSystem, - config.getBootstrapSourceBasePath(), path -> { - // TODO: Needs to be abstracted out when supporting different formats - // TODO: Remove hoodieFilter - return path.getName().endsWith(HoodieFileFormat.PARQUET.getFileExtension()); - }); + List>> folders = BootstrapUtils.getAllLeafFoldersWithFiles( + table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), jsc); LOG.info("Fetching Bootstrap Schema !!"); BootstrapSchemaProvider sourceSchemaProvider = new BootstrapSchemaProvider(config); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java index 67d13651a..cb4bc2d7b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java @@ -18,14 +18,20 @@ package org.apache.hudi.table.action.bootstrap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; +import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.util.ArrayList; @@ -39,39 +45,88 @@ public class BootstrapUtils { /** * Returns leaf folders with files under a path. + * @param metaClient Hoodie table metadata client * @param fs File System - * @param basePathStr Base Path to look for leaf folders - * @param filePathFilter Filters to skip directories/paths + * @param jsc Java spark context * @return list of partition paths with files under them. * @throws IOException */ - public static List>> getAllLeafFoldersWithFiles(FileSystem fs, String basePathStr, - PathFilter filePathFilter) throws IOException { + public static List>> getAllLeafFoldersWithFiles(HoodieTableMetaClient metaClient, + FileSystem fs, String basePathStr, JavaSparkContext jsc) throws IOException { final Path basePath = new Path(basePathStr); + final String baseFileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); final Map> levelToPartitions = new HashMap<>(); final Map> partitionToFiles = new HashMap<>(); - FSUtils.processFiles(fs, basePathStr, (status) -> { - if (status.isFile() && filePathFilter.accept(status.getPath())) { - String relativePath = FSUtils.getRelativePartitionPath(basePath, status.getPath().getParent()); - List statusList = partitionToFiles.get(relativePath); - if (null == statusList) { - Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count(); - List dirs = levelToPartitions.get(level); - if (null == dirs) { - dirs = new ArrayList<>(); - levelToPartitions.put(level, dirs); - } - dirs.add(relativePath); - statusList = new ArrayList<>(); - partitionToFiles.put(relativePath, statusList); - } - statusList.add(FileStatusUtils.fromFileStatus(status)); + PathFilter filePathFilter = getFilePathFilter(baseFileExtension); + PathFilter metaPathFilter = getExcludeMetaPathFilter(); + + FileStatus[] topLevelStatuses = fs.listStatus(basePath); + List subDirectories = new ArrayList<>(); + + List>> result = new ArrayList<>(); + + for (FileStatus topLevelStatus: topLevelStatuses) { + if (topLevelStatus.isFile() && filePathFilter.accept(topLevelStatus.getPath())) { + String relativePath = FSUtils.getRelativePartitionPath(basePath, topLevelStatus.getPath().getParent()); + Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count(); + HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(topLevelStatus); + result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath))); + } else if (metaPathFilter.accept(topLevelStatus.getPath())) { + subDirectories.add(topLevelStatus.getPath().toString()); } - return true; - }, true); + } + + if (subDirectories.size() > 0) { + result.addAll(jsc.parallelize(subDirectories, subDirectories.size()).flatMap(directory -> { + PathFilter pathFilter = getFilePathFilter(baseFileExtension); + Path path = new Path(directory); + FileSystem fileSystem = path.getFileSystem(new Configuration()); + RemoteIterator itr = fileSystem.listFiles(path, true); + List>> res = new ArrayList<>(); + while (itr.hasNext()) { + FileStatus status = itr.next(); + if (pathFilter.accept(status.getPath())) { + String relativePath = FSUtils.getRelativePartitionPath(new Path(basePathStr), status.getPath().getParent()); + Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count(); + HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(status); + res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath))); + } + } + return res.iterator(); + }).collect()); + } + + result.forEach(val -> { + String relativePath = val.getRight().getRight(); + List statusList = partitionToFiles.get(relativePath); + if (null == statusList) { + Integer level = val.getRight().getLeft(); + List dirs = levelToPartitions.get(level); + if (null == dirs) { + dirs = new ArrayList<>(); + levelToPartitions.put(level, dirs); + } + dirs.add(relativePath); + statusList = new ArrayList<>(); + partitionToFiles.put(relativePath, statusList); + } + statusList.add(val.getLeft()); + }); + OptionalInt maxLevelOpt = levelToPartitions.keySet().stream().mapToInt(x -> x).max(); int maxLevel = maxLevelOpt.orElse(-1); return maxLevel >= 0 ? levelToPartitions.get(maxLevel).stream() - .map(d -> Pair.of(d, partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>(); + .map(d -> Pair.of(d, partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>(); + } + + private static PathFilter getFilePathFilter(String baseFileExtension) { + return (path) -> { + return path.getName().endsWith(baseFileExtension); + }; + } + + private static PathFilter getExcludeMetaPathFilter() { + // Avoid listing and including any folders under the metafolder + return (path) -> !path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java b/hudi-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java index 7303bee1c..64db6cf6d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java @@ -63,20 +63,15 @@ public class TestBootstrapUtils extends HoodieClientTestBase { } }); - List>> collected = - BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath, (status) -> { - return true; - }); + List>> collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, + metaClient.getFs(), basePath, jsc); assertEquals(3, collected.size()); collected.stream().forEach(k -> { assertEquals(2, k.getRight().size()); }); // Simulate reading from un-partitioned dataset - collected = - BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath + "/" + folders.get(0), (status) -> { - return true; - }); + collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath + "/" + folders.get(0), jsc); assertEquals(1, collected.size()); collected.stream().forEach(k -> { assertEquals(2, k.getRight().size()); diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java index 88f535b17..0aa8ca45b 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java +++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java @@ -171,9 +171,9 @@ public class TestBootstrap extends HoodieClientTestBase { } else { df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath); } - String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath, - (status) -> status.getName().endsWith(".parquet")).stream().findAny().map(p -> p.getValue().stream().findAny()) - .orElse(null).get().getPath()).toString(); + String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), + srcPath, jsc).stream().findAny().map(p -> p.getValue().stream().findAny()) + .orElse(null).get().getPath()).toString(); ParquetFileReader reader = ParquetFileReader.open(metaClient.getHadoopConf(), new Path(filePath)); MessageType schema = reader.getFooter().getFileMetaData().getSchema(); return new AvroSchemaConverter().convert(schema); @@ -266,8 +266,8 @@ public class TestBootstrap extends HoodieClientTestBase { client.rollBackInflightBootstrap(); metaClient.reloadActiveTimeline(); assertEquals(0, metaClient.getCommitsTimeline().countInstants()); - assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath, - (status) -> status.getName().endsWith(".parquet")).stream().flatMap(f -> f.getValue().stream()).count()); + assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, jsc) + .stream().flatMap(f -> f.getValue().stream()).count()); BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient); assertFalse(index.useIndex()); @@ -292,8 +292,8 @@ public class TestBootstrap extends HoodieClientTestBase { String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2"; generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath); JavaRDD updateBatch = - generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), updateSPath, - (status) -> status.getName().endsWith("parquet")), schema); + generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), updateSPath, jsc), + schema); String newInstantTs = client.startCommit(); client.upsert(updateBatch, newInstantTs); checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1, @@ -353,9 +353,8 @@ public class TestBootstrap extends HoodieClientTestBase { bootstrapped.registerTempTable("bootstrapped"); original.registerTempTable("original"); if (checkNumRawFiles) { - List files = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), bootstrapBasePath, - (status) -> status.getName().endsWith(".parquet")) - .stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList()); + List files = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), + bootstrapBasePath, jsc).stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList()); assertEquals(files.size() * numVersions, sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count()); }