1
0

[HUDI-2351] Extract common FS and IO utils for marker mechanism (#3529)

This commit is contained in:
Y Ethan Guo
2021-09-09 11:45:28 -07:00
committed by GitHub
parent 57c8113ee1
commit 56d08fbe70
8 changed files with 301 additions and 110 deletions

View File

@@ -40,11 +40,9 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Marker operations of directly accessing the file system to create and delete
@@ -74,31 +72,7 @@ public class DirectWriteMarkers extends WriteMarkers {
* @param parallelism parallelism for deletion.
*/
public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) {
try {
if (fs.exists(markerDirPath)) {
FileStatus[] fileStatuses = fs.listStatus(markerDirPath);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
if (markerDirSubPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
parallelism = Math.min(markerDirSubPaths.size(), parallelism);
context.foreach(markerDirSubPaths, subPathStr -> {
Path subPath = new Path(subPathStr);
FileSystem fileSystem = subPath.getFileSystem(conf.get());
fileSystem.delete(subPath, true);
}, parallelism);
}
boolean result = fs.delete(markerDirPath, true);
LOG.info("Removing marker directory at " + markerDirPath);
return result;
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
return false;
return FSUtils.deleteDir(context, fs, markerDirPath, parallelism);
}
/**

View File

@@ -32,13 +32,11 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.DirectWriteMarkers;
import com.esotericsoftware.minlog.Log;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -108,7 +106,8 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
// Deletes marker type file
MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir);
// Deletes timeline server based markers
deleteTimelineBasedMarkerFiles(markerDir, fileSystem);
deleteTimelineBasedMarkerFiles(
context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
break;
default:
throw new HoodieException("The marker type \"" + markerTypeOption.get().name()
@@ -117,26 +116,18 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler {
} else {
// In case of partial failures during downgrade, there is a chance that marker type file was deleted,
// but timeline server based marker files are left. So deletes them if any
deleteTimelineBasedMarkerFiles(markerDir, fileSystem);
deleteTimelineBasedMarkerFiles(
context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
}
}
private void deleteTimelineBasedMarkerFiles(String markerDir, FileSystem fileSystem) throws IOException {
private void deleteTimelineBasedMarkerFiles(HoodieEngineContext context, String markerDir,
FileSystem fileSystem, int parallelism) throws IOException {
// Deletes timeline based marker files if any.
Path dirPath = new Path(markerDir);
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
Predicate<FileStatus> prefixFilter = fileStatus ->
fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.filter(prefixFilter)
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
markerDirSubPaths.forEach(fileToDelete -> {
try {
fileSystem.delete(new Path(fileToDelete), false);
} catch (IOException e) {
Log.warn("Deleting Timeline based marker files failed ", e);
}
});
FSUtils.parallelizeSubPathProcess(context, fileSystem, new Path(markerDir), parallelism,
prefixFilter, pairOfSubPathAndConf ->
FSUtils.deleteSubPath(pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue(), false));
}
}