HUDI-135 - Skip Meta folder when looking for partitions
This commit is contained in:
committed by
vinoth chandar
parent
33f5208c1e
commit
93f8f12a30
@@ -18,6 +18,7 @@ package com.uber.hoodie.common.util;
|
||||
|
||||
import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.uber.hoodie.common.model.HoodieFileFormat;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
@@ -67,6 +68,12 @@ public class FSUtils {
|
||||
private static final long MIN_ROLLBACK_TO_KEEP = 10;
|
||||
private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
|
||||
|
||||
private static final PathFilter ALLOW_ALL_FILTER = new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path file) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
public static Configuration prepareHadoopConf(Configuration conf) {
|
||||
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||
@@ -152,16 +159,11 @@ public class FSUtils {
|
||||
/**
|
||||
* Gets all partition paths assuming date partitioning (year, month, day) three levels down.
|
||||
*/
|
||||
public static List<String> getAllFoldersThreeLevelsDown(FileSystem fs, String basePath)
|
||||
public static List<String> getAllPartitionFoldersThreeLevelsDown(FileSystem fs, String basePath)
|
||||
throws IOException {
|
||||
List<String> datePartitions = new ArrayList<>();
|
||||
// Avoid listing and including any folders under the metafolder
|
||||
PathFilter filter = (path) -> {
|
||||
if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
PathFilter filter = getExcludeMetaPathFilter();
|
||||
FileStatus[] folders = fs.globStatus(new Path(basePath + "/*/*/*"), filter);
|
||||
for (FileStatus status : folders) {
|
||||
Path path = status.getPath();
|
||||
@@ -201,31 +203,53 @@ public class FSUtils {
|
||||
partitions.add(getRelativePartitionPath(basePath, filePath.getParent()));
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}, true);
|
||||
return partitions;
|
||||
}
|
||||
|
||||
public static final List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
|
||||
String markerDir) throws IOException {
|
||||
List<String> dataFiles = new LinkedList<>();
|
||||
FSUtils.processFiles(fs, markerDir, (status) -> {
|
||||
processFiles(fs, markerDir, (status) -> {
|
||||
String pathStr = status.getPath().toString();
|
||||
if (pathStr.endsWith(MARKER_EXTN)) {
|
||||
dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs));
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}, false);
|
||||
return dataFiles;
|
||||
}
|
||||
|
||||
private static final void processFiles(FileSystem fs, String basePathStr,
|
||||
Function<LocatedFileStatus, Boolean> consumer) throws IOException {
|
||||
RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(new Path(basePathStr), true);
|
||||
while (allFiles.hasNext()) {
|
||||
LocatedFileStatus status = allFiles.next();
|
||||
boolean success = consumer.apply(status);
|
||||
if (!success) {
|
||||
throw new HoodieException("Failed to process file-status=" + status);
|
||||
/**
|
||||
* Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its
|
||||
* subdirs are skipped
|
||||
* @param fs File System
|
||||
* @param basePathStr Base-Path
|
||||
* @param consumer Callback for processing
|
||||
* @param excludeMetaFolder Exclude .hoodie folder
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static void processFiles(FileSystem fs, String basePathStr,
|
||||
Function<FileStatus, Boolean> consumer, boolean excludeMetaFolder) throws IOException {
|
||||
PathFilter pathFilter = excludeMetaFolder ? getExcludeMetaPathFilter() : ALLOW_ALL_FILTER;
|
||||
FileStatus[] topLevelStatuses = fs.listStatus(new Path(basePathStr));
|
||||
for (int i = 0; i < topLevelStatuses.length; i++) {
|
||||
FileStatus child = topLevelStatuses[i];
|
||||
if (child.isFile()) {
|
||||
boolean success = consumer.apply(child);
|
||||
if (!success) {
|
||||
throw new HoodieException("Failed to process file-status=" + child);
|
||||
}
|
||||
} else if (pathFilter.accept(child.getPath())) {
|
||||
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(child.getPath(), true);
|
||||
while (itr.hasNext()) {
|
||||
FileStatus status = itr.next();
|
||||
boolean success = consumer.apply(status);
|
||||
if (!success) {
|
||||
throw new HoodieException("Failed to process file-status=" + status);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -234,7 +258,7 @@ public class FSUtils {
|
||||
boolean assumeDatePartitioning)
|
||||
throws IOException {
|
||||
if (assumeDatePartitioning) {
|
||||
return getAllFoldersThreeLevelsDown(fs, basePathStr);
|
||||
return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr);
|
||||
} else {
|
||||
return getAllFoldersWithPartitionMetaFile(fs, basePathStr);
|
||||
}
|
||||
@@ -247,6 +271,16 @@ public class FSUtils {
|
||||
return dotIndex == -1 ? "" : fileName.substring(dotIndex);
|
||||
}
|
||||
|
||||
private static PathFilter getExcludeMetaPathFilter() {
|
||||
// Avoid listing and including any folders under the metafolder
|
||||
return (path) -> {
|
||||
if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
public static String getInstantTime(String name) {
|
||||
return name.replace(getFileExtension(name), "");
|
||||
}
|
||||
@@ -453,7 +487,6 @@ public class FSUtils {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
return recovered;
|
||||
|
||||
}
|
||||
|
||||
public static void deleteOlderCleanMetaFiles(FileSystem fs, String metaPath,
|
||||
|
||||
Reference in New Issue
Block a user