diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index 0059c7f8d..e81338207 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -40,11 +40,9 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** * Marker operations of directly accessing the file system to create and delete @@ -74,31 +72,7 @@ public class DirectWriteMarkers extends WriteMarkers { * @param parallelism parallelism for deletion. */ public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) { - try { - if (fs.exists(markerDirPath)) { - FileStatus[] fileStatuses = fs.listStatus(markerDirPath); - List markerDirSubPaths = Arrays.stream(fileStatuses) - .map(fileStatus -> fileStatus.getPath().toString()) - .collect(Collectors.toList()); - - if (markerDirSubPaths.size() > 0) { - SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); - parallelism = Math.min(markerDirSubPaths.size(), parallelism); - context.foreach(markerDirSubPaths, subPathStr -> { - Path subPath = new Path(subPathStr); - FileSystem fileSystem = subPath.getFileSystem(conf.get()); - fileSystem.delete(subPath, true); - }, parallelism); - } - - boolean result = fs.delete(markerDirPath, true); - LOG.info("Removing marker directory at " + markerDirPath); - return result; - } - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } - return false; + return FSUtils.deleteDir(context, fs, markerDirPath, parallelism); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java index 7b67de852..621711a3f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java @@ -32,13 +32,11 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.DirectWriteMarkers; -import com.esotericsoftware.minlog.Log; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -108,7 +106,8 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler { // Deletes marker type file MarkerUtils.deleteMarkerTypeFile(fileSystem, markerDir); // Deletes timeline server based markers - deleteTimelineBasedMarkerFiles(markerDir, fileSystem); + deleteTimelineBasedMarkerFiles( + context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism()); break; default: throw new HoodieException("The marker type \"" + markerTypeOption.get().name() @@ -117,26 +116,18 @@ public abstract class BaseTwoToOneDowngradeHandler implements DowngradeHandler { } else { // In case of partial failures during downgrade, there is a chance that marker type file was deleted, // but timeline server based marker files are left. So deletes them if any - deleteTimelineBasedMarkerFiles(markerDir, fileSystem); + deleteTimelineBasedMarkerFiles( + context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism()); } } - private void deleteTimelineBasedMarkerFiles(String markerDir, FileSystem fileSystem) throws IOException { + private void deleteTimelineBasedMarkerFiles(HoodieEngineContext context, String markerDir, + FileSystem fileSystem, int parallelism) throws IOException { // Deletes timeline based marker files if any. - Path dirPath = new Path(markerDir); - FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); Predicate prefixFilter = fileStatus -> fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX); - List markerDirSubPaths = Arrays.stream(fileStatuses) - .filter(prefixFilter) - .map(fileStatus -> fileStatus.getPath().toString()) - .collect(Collectors.toList()); - markerDirSubPaths.forEach(fileToDelete -> { - try { - fileSystem.delete(new Path(fileToDelete), false); - } catch (IOException e) { - Log.warn("Deleting Timeline based marker files failed ", e); - } - }); + FSUtils.parallelizeSubPathProcess(context, fileSystem, new Path(markerDir), parallelism, + prefixFilter, pairOfSubPathAndConf -> + FSUtils.deleteSubPath(pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue(), false)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 7e39053e1..5c439f51a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -49,8 +50,10 @@ import org.apache.log4j.Logger; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -59,6 +62,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.function.Function; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -612,4 +616,87 @@ public class FSUtils { .filter(fileStatus -> !fileStatus.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) .collect(Collectors.toList()); } + + /** + * Deletes a directory by deleting sub-paths in parallel on the file system. + * + * @param hoodieEngineContext {@code HoodieEngineContext} instance + * @param fs file system + * @param dirPath directory path + * @param parallelism parallelism to use for sub-paths + * @return {@code true} if the directory is delete; {@code false} otherwise. + */ + public static boolean deleteDir( + HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism) { + try { + if (fs.exists(dirPath)) { + FSUtils.parallelizeSubPathProcess(hoodieEngineContext, fs, dirPath, parallelism, e -> true, + pairOfSubPathAndConf -> deleteSubPath( + pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue(), true) + ); + boolean result = fs.delete(dirPath, false); + LOG.info("Removed directory at " + dirPath); + return result; + } + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + return false; + } + + /** + * Processes sub-path in parallel. + * + * @param hoodieEngineContext {@code HoodieEngineContext} instance + * @param fs file system + * @param dirPath directory path + * @param parallelism parallelism to use for sub-paths + * @param subPathPredicate predicate to use to filter sub-paths for processing + * @param pairFunction actual processing logic for each sub-path + * @param type of result to return for each sub-path + * @return a map of sub-path to result of the processing + */ + public static Map parallelizeSubPathProcess( + HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism, + Predicate subPathPredicate, SerializableFunction, T> pairFunction) { + Map result = new HashMap<>(); + try { + FileStatus[] fileStatuses = fs.listStatus(dirPath); + List subPaths = Arrays.stream(fileStatuses) + .filter(subPathPredicate) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + if (subPaths.size() > 0) { + SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); + int actualParallelism = Math.min(subPaths.size(), parallelism); + result = hoodieEngineContext.mapToPair(subPaths, + subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))), + actualParallelism); + } + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + return result; + } + + /** + * Deletes a sub-path. + * + * @param subPathStr sub-path String + * @param conf serializable config + * @param recursive is recursive or not + * @return {@code true} if the sub-path is deleted; {@code false} otherwise. + */ + public static boolean deleteSubPath(String subPathStr, SerializableConfiguration conf, boolean recursive) { + try { + Path subPath = new Path(subPathStr); + FileSystem fileSystem = subPath.getFileSystem(conf.get()); + return fileSystem.delete(subPath, recursive); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + } + + public interface SerializableFunction extends Function, Serializable { + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java index 10685a1dc..cb3f103a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileIOUtils.java @@ -21,18 +21,23 @@ package org.apache.hudi.common.util; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; /** * Bunch of utility methods for working with files and byte streams. @@ -71,6 +76,20 @@ public class FileIOUtils { return new String(bos.toByteArray(), StandardCharsets.UTF_8); } + /** + * Reads the input stream into String lines. + * + * @param input {@code InputStream} instance. + * @return String lines in a list. + */ + public static List readAsUTFStringLines(InputStream input) { + List lines = new ArrayList<>(); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)); + lines = bufferedReader.lines().collect(Collectors.toList()); + closeQuietly(bufferedReader); + return lines; + } + public static void copy(InputStream inputStream, OutputStream outputStream) throws IOException { byte[] buffer = new byte[1024]; int len; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java index f799cb32b..555a036b9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java @@ -21,9 +21,9 @@ package org.apache.hudi.common.util; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; -import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -35,20 +35,15 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; -import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Collectors; import static org.apache.hudi.common.util.FileIOUtils.closeQuietly; @@ -178,43 +173,44 @@ public class MarkerUtils { Path dirPath = new Path(markerDir); try { if (fileSystem.exists(dirPath)) { - FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); Predicate prefixFilter = fileStatus -> fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX); Predicate markerTypeFilter = fileStatus -> !fileStatus.getPath().getName().equals(MARKER_TYPE_FILENAME); - List markerDirSubPaths = Arrays.stream(fileStatuses) - .filter(prefixFilter.and(markerTypeFilter)) - .map(fileStatus -> fileStatus.getPath().toString()) - .collect(Collectors.toList()); - - if (markerDirSubPaths.size() > 0) { - SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf()); - int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism); - return context.mapToPair(markerDirSubPaths, markersFilePathStr -> { - Path markersFilePath = new Path(markersFilePathStr); - FileSystem fs = markersFilePath.getFileSystem(conf.get()); - FSDataInputStream fsDataInputStream = null; - BufferedReader bufferedReader = null; - Set markers = new HashSet<>(); - try { - LOG.debug("Read marker file: " + markersFilePathStr); - fsDataInputStream = fs.open(markersFilePath); - bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8)); - markers = bufferedReader.lines().collect(Collectors.toSet()); - } catch (IOException e) { - throw new HoodieIOException("Failed to read file " + markersFilePathStr, e); - } finally { - closeQuietly(bufferedReader); - closeQuietly(fsDataInputStream); - } - return new ImmutablePair<>(markersFilePathStr, markers); - }, actualParallelism); - } + return FSUtils.parallelizeSubPathProcess( + context, fileSystem, dirPath, parallelism, prefixFilter.and(markerTypeFilter), + pairOfSubPathAndConf -> { + String markersFilePathStr = pairOfSubPathAndConf.getKey(); + SerializableConfiguration conf = pairOfSubPathAndConf.getValue(); + return readMarkersFromFile(new Path(markersFilePathStr), conf); + }); } return new HashMap<>(); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } } + + /** + * Reads the markers stored in the underlying file. + * + * @param markersFilePath file path for the markers + * @param conf serializable config + * @return markers in a {@code Set} of String. + */ + public static Set readMarkersFromFile(Path markersFilePath, SerializableConfiguration conf) { + FSDataInputStream fsDataInputStream = null; + Set markers = new HashSet<>(); + try { + LOG.debug("Read marker file: " + markersFilePath); + FileSystem fs = markersFilePath.getFileSystem(conf.get()); + fsDataInputStream = fs.open(markersFilePath); + markers = new HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream)); + } catch (IOException e) { + throw new HoodieIOException("Failed to read MARKERS file " + markersFilePath, e); + } finally { + closeQuietly(fsDataInputStream); + } + return markers; + } } \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index abfeee9c6..c345cc7af 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -18,14 +18,19 @@ package org.apache.hudi.common.fs; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Rule; import org.junit.contrib.java.lang.system.EnvironmentVariables; @@ -37,8 +42,10 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -47,6 +54,7 @@ import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -308,8 +316,139 @@ public class TestFSUtils extends HoodieCommonTestHarness { Files.createFile(partitionPath.resolve(log3)); assertEquals(3, (int) FSUtils.getLatestLogVersion(FSUtils.getFs(basePath, new Configuration()), - new Path(partitionPath.toString()), fileId, LOG_EXTENTION, instantTime).get().getLeft()); + new Path(partitionPath.toString()), fileId, LOG_EXTENTION, instantTime).get().getLeft()); assertEquals(4, FSUtils.computeNextLogVersion(FSUtils.getFs(basePath, new Configuration()), - new Path(partitionPath.toString()), fileId, LOG_EXTENTION, instantTime)); + new Path(partitionPath.toString()), fileId, LOG_EXTENTION, instantTime)); + } + + private void prepareTestDirectory(FileSystem fileSystem, String rootDir) throws IOException { + // Directory structure + // .hoodie/.temp/ + // - subdir1 + // - file1.txt + // - subdir2 + // - file2.txt + // - file3 + Path dirPath = new Path(rootDir); + String subDir1 = rootDir + "/subdir1"; + String file1 = subDir1 + "/file1.txt"; + String subDir2 = rootDir + "/subdir2"; + String file2 = subDir2 + "/file2.txt"; + String file3 = rootDir + "/file3.txt"; + String[] dirs = new String[]{rootDir, subDir1, subDir2}; + String[] files = new String[]{file1, file2, file3}; + // clean up first + cleanUpTestDirectory(fileSystem, rootDir); + for (String dir : dirs) { + fileSystem.mkdirs(new Path(dir)); + } + for (String filename : files) { + fileSystem.create(new Path(filename)); + } + } + + private void cleanUpTestDirectory(FileSystem fileSystem, String rootDir) throws IOException { + fileSystem.delete(new Path(rootDir), true); + } + + @Test + public void testDeleteExistingDir() throws IOException { + String rootDir = basePath + "/.hoodie/.temp"; + FileSystem fileSystem = metaClient.getFs(); + prepareTestDirectory(fileSystem, rootDir); + + Path rootDirPath = new Path(rootDir); + assertTrue(fileSystem.exists(rootDirPath)); + assertTrue(FSUtils.deleteDir( + new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, rootDirPath, 2)); + assertFalse(fileSystem.exists(rootDirPath)); + } + + @Test + public void testDeleteNonExistingDir() throws IOException { + String rootDir = basePath + "/.hoodie/.temp"; + FileSystem fileSystem = metaClient.getFs(); + cleanUpTestDirectory(fileSystem, rootDir); + + assertFalse(FSUtils.deleteDir( + new HoodieLocalEngineContext(metaClient.getHadoopConf()), fileSystem, new Path(rootDir), 2)); + } + + @Test + public void testDeleteSubDirectoryRecursively() throws IOException { + String rootDir = basePath + "/.hoodie/.temp"; + String subPathStr = rootDir + "/subdir1"; + FileSystem fileSystem = metaClient.getFs(); + prepareTestDirectory(fileSystem, rootDir); + + assertTrue(FSUtils.deleteSubPath( + subPathStr, new SerializableConfiguration(fileSystem.getConf()), true)); + } + + @Test + public void testDeleteSubDirectoryNonRecursively() throws IOException { + String rootDir = basePath + "/.hoodie/.temp"; + String subPathStr = rootDir + "/subdir1"; + FileSystem fileSystem = metaClient.getFs(); + prepareTestDirectory(fileSystem, rootDir); + + assertThrows( + HoodieIOException.class, + () -> FSUtils.deleteSubPath( + subPathStr, new SerializableConfiguration(fileSystem.getConf()), false)); + } + + @Test + public void testDeleteSubPathAsFile() throws IOException { + String rootDir = basePath + "/.hoodie/.temp"; + String subPathStr = rootDir + "/file3.txt"; + FileSystem fileSystem = metaClient.getFs(); + prepareTestDirectory(fileSystem, rootDir); + + assertTrue(FSUtils.deleteSubPath( + subPathStr, new SerializableConfiguration(fileSystem.getConf()), false)); + } + + @Test + public void testDeleteNonExistingSubDirectory() throws IOException { + String rootDir = basePath + "/.hoodie/.temp"; + String subPathStr = rootDir + "/subdir10"; + FileSystem fileSystem = metaClient.getFs(); + cleanUpTestDirectory(fileSystem, rootDir); + + assertFalse(FSUtils.deleteSubPath( + subPathStr, new SerializableConfiguration(fileSystem.getConf()), true)); + } + + @Test + public void testParallelizeSubPathProcessWithExistingDir() throws IOException { + String rootDir = basePath + "/.hoodie/.temp"; + FileSystem fileSystem = metaClient.getFs(); + prepareTestDirectory(fileSystem, rootDir); + Map> result = FSUtils.parallelizeSubPathProcess( + new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem, new Path(rootDir), 2, + fileStatus -> !fileStatus.getPath().getName().contains("1"), + pairOfSubPathAndConf -> { + Path subPath = new Path(pairOfSubPathAndConf.getKey()); + List listFiles = new ArrayList<>(); + try { + FileSystem fs = subPath.getFileSystem(pairOfSubPathAndConf.getValue().get()); + FileStatus[] fileStatuses = fs.listStatus(subPath); + listFiles = Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().getName()).collect(Collectors.toList()); + } catch (IOException e) { + e.printStackTrace(); + } + return listFiles; + } + ); + assertEquals(2, result.size()); + for (String subPath : result.keySet()) { + if (subPath.contains("subdir2")) { + assertEquals(Collections.singletonList("file2.txt"), result.get(subPath)); + } else if (subPath.contains("file3")) { + assertEquals(Collections.singletonList("file3.txt"), result.get(subPath)); + } + } } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestFileIOUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestFileIOUtils.java index 0c2ac7cf6..762aad704 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestFileIOUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestFileIOUtils.java @@ -26,6 +26,9 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -63,4 +66,12 @@ public class TestFileIOUtils extends HoodieCommonTestHarness { inputStream = new ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8)); assertEquals(msg.length(), FileIOUtils.readAsByteArray(inputStream).length); } + + @Test + public void testReadAsUTFStringLines() { + String content = "a\nb\nc"; + List expectedLines = Arrays.stream(new String[]{"a", "b", "c"}).collect(Collectors.toList()); + ByteArrayInputStream inputStream = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + assertEquals(expectedLines, FileIOUtils.readAsUTFStringLines(inputStream)); + } } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 68f98bfe2..8ba9abf0e 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -18,8 +18,8 @@ package org.apache.hudi.timeline.service.handlers.marker; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.util.HoodieTimer; @@ -31,7 +31,6 @@ import org.apache.hudi.exception.HoodieIOException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringUtils; @@ -44,7 +43,6 @@ import java.io.OutputStreamWriter; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -237,31 +235,7 @@ public class MarkerDirState implements Serializable { * @return {@code true} if successful; {@code false} otherwise. */ public boolean deleteAllMarkers() { - Path dirPath = new Path(markerDirPath); - boolean result = false; - try { - if (fileSystem.exists(dirPath)) { - FileStatus[] fileStatuses = fileSystem.listStatus(dirPath); - List markerDirSubPaths = Arrays.stream(fileStatuses) - .map(fileStatus -> fileStatus.getPath().toString()) - .collect(Collectors.toList()); - - if (markerDirSubPaths.size() > 0) { - SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf()); - int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism); - hoodieEngineContext.foreach(markerDirSubPaths, subPathStr -> { - Path subPath = new Path(subPathStr); - FileSystem fileSystem = subPath.getFileSystem(conf.get()); - fileSystem.delete(subPath, true); - }, actualParallelism); - } - - result = fileSystem.delete(dirPath, false); - LOG.info("Removing marker directory at " + dirPath); - } - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } + boolean result = FSUtils.deleteDir(hoodieEngineContext, fileSystem, new Path(markerDirPath), parallelism); allMarkers.clear(); fileMarkersMap.clear(); return result;