diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java index bf6c82888..dda22ceb0 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java @@ -77,7 +77,7 @@ public class RepairsCommand implements CommandMarker { String latestCommit = HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get() .getTimestamp(); - List partitionPaths = FSUtils.getAllFoldersThreeLevelsDown(HoodieCLI.fs, + List partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath()); Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath()); String[][] rows = new String[partitionPaths.size() + 1][]; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index a97cb5127..ab61e5dc6 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common.util; import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.uber.hoodie.common.model.HoodieFileFormat; import com.uber.hoodie.common.model.HoodieLogFile; @@ -67,6 +68,12 @@ public class FSUtils { private static final long MIN_ROLLBACK_TO_KEEP = 10; private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_"; + private static final PathFilter ALLOW_ALL_FILTER = new PathFilter() { + @Override + public boolean accept(Path file) { + return true; + } + }; public static Configuration prepareHadoopConf(Configuration conf) { conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); @@ -152,16 +159,11 @@ public class FSUtils { /** * Gets all partition paths assuming date partitioning (year, month, day) three levels down. */ - public static List getAllFoldersThreeLevelsDown(FileSystem fs, String basePath) + public static List getAllPartitionFoldersThreeLevelsDown(FileSystem fs, String basePath) throws IOException { List datePartitions = new ArrayList<>(); // Avoid listing and including any folders under the metafolder - PathFilter filter = (path) -> { - if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) { - return false; - } - return true; - }; + PathFilter filter = getExcludeMetaPathFilter(); FileStatus[] folders = fs.globStatus(new Path(basePath + "/*/*/*"), filter); for (FileStatus status : folders) { Path path = status.getPath(); @@ -201,31 +203,53 @@ public class FSUtils { partitions.add(getRelativePartitionPath(basePath, filePath.getParent())); } return true; - }); + }, true); return partitions; } public static final List getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs, String markerDir) throws IOException { List dataFiles = new LinkedList<>(); - FSUtils.processFiles(fs, markerDir, (status) -> { + processFiles(fs, markerDir, (status) -> { String pathStr = status.getPath().toString(); if (pathStr.endsWith(MARKER_EXTN)) { dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs)); } return true; - }); + }, false); return dataFiles; } - private static final void processFiles(FileSystem fs, String basePathStr, - Function consumer) throws IOException { - RemoteIterator allFiles = fs.listFiles(new Path(basePathStr), true); - while (allFiles.hasNext()) { - LocatedFileStatus status = allFiles.next(); - boolean success = consumer.apply(status); - if (!success) { - throw new HoodieException("Failed to process file-status=" + status); + /** + * Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its + * subdirs are skipped + * @param fs File System + * @param basePathStr Base-Path + * @param consumer Callback for processing + * @param excludeMetaFolder Exclude .hoodie folder + * @throws IOException + */ + @VisibleForTesting + static void processFiles(FileSystem fs, String basePathStr, + Function consumer, boolean excludeMetaFolder) throws IOException { + PathFilter pathFilter = excludeMetaFolder ? getExcludeMetaPathFilter() : ALLOW_ALL_FILTER; + FileStatus[] topLevelStatuses = fs.listStatus(new Path(basePathStr)); + for (int i = 0; i < topLevelStatuses.length; i++) { + FileStatus child = topLevelStatuses[i]; + if (child.isFile()) { + boolean success = consumer.apply(child); + if (!success) { + throw new HoodieException("Failed to process file-status=" + child); + } + } else if (pathFilter.accept(child.getPath())) { + RemoteIterator itr = fs.listFiles(child.getPath(), true); + while (itr.hasNext()) { + FileStatus status = itr.next(); + boolean success = consumer.apply(status); + if (!success) { + throw new HoodieException("Failed to process file-status=" + status); + } + } } } } @@ -234,7 +258,7 @@ public class FSUtils { boolean assumeDatePartitioning) throws IOException { if (assumeDatePartitioning) { - return getAllFoldersThreeLevelsDown(fs, basePathStr); + return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr); } else { return getAllFoldersWithPartitionMetaFile(fs, basePathStr); } @@ -247,6 +271,16 @@ public class FSUtils { return dotIndex == -1 ? "" : fileName.substring(dotIndex); } + private static PathFilter getExcludeMetaPathFilter() { + // Avoid listing and including any folders under the metafolder + return (path) -> { + if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) { + return false; + } + return true; + }; + } + public static String getInstantTime(String name) { return name.replace(getFileExtension(name), ""); } @@ -453,7 +487,6 @@ public class FSUtils { Thread.sleep(1000); } return recovered; - } public static void deleteOlderCleanMetaFiles(FileSystem fs, String metaPath, diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java index 5c3ba02ee..78fafd9c9 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java @@ -20,8 +20,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.exception.HoodieException; +import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; +import java.util.List; import java.util.UUID; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; @@ -30,6 +36,7 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.contrib.java.lang.system.EnvironmentVariables; +import org.junit.rules.TemporaryFolder; public class TestFSUtils { @@ -55,6 +62,71 @@ public class TestFSUtils { .equals("*_" + taskPartitionId + "_" + commitTime + ".parquet")); } + @Test + /** + * Tests if process Files return only paths excluding marker directories + * Cleaner, Rollback and compaction-scheduling logic was recursively processing all subfolders including that + * of ".hoodie" when looking for partition-paths. This causes a race when they try to list all folders (recursively) + * but the marker directory (that of compaction inside of ".hoodie" folder) is deleted underneath by compactor. + * This code tests the fix by ensuring ".hoodie" and their subfolders are never processed. + */ + public void testProcessFiles() throws Exception { + TemporaryFolder tmpFolder = new TemporaryFolder(); + tmpFolder.create(); + // All directories including marker dirs. + List folders = Arrays.asList("2016/04/15", "2016/05/16", ".hoodie/.temp/2/2016/04/15", + ".hoodie/.temp/2/2016/05/16"); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath()); + String basePath = metaClient.getBasePath(); + folders.stream().forEach(f -> { + try { + metaClient.getFs().mkdirs(new Path(new Path(basePath), f)); + } catch (IOException e) { + throw new HoodieException(e); + } + }); + + // Files inside partitions and marker directories + List files = Arrays.asList( + "2016/04/15/1_1-0-1_20190528120000.parquet", + "2016/05/16/2_1-0-1_20190528120000.parquet", + ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000.parquet", + ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000.parquet" + ); + + files.stream().forEach(f -> { + try { + metaClient.getFs().create(new Path(new Path(basePath), f)); + } catch (IOException e) { + throw new HoodieException(e); + } + }); + + // Test excluding meta-folder + final List collected = new ArrayList<>(); + FSUtils.processFiles(metaClient.getFs(), basePath, (status) -> { + collected.add(status.getPath().toString()); + return true; + }, true); + + Assert.assertTrue("Hoodie MetaFolder MUST be skipped but got :" + collected, collected.stream() + .noneMatch(s -> s.contains(HoodieTableMetaClient.METAFOLDER_NAME))); + // Check if only files are listed + Assert.assertEquals(2, collected.size()); + + // Test including meta-folder + final List collected2 = new ArrayList<>(); + FSUtils.processFiles(metaClient.getFs(), basePath, (status) -> { + collected2.add(status.getPath().toString()); + return true; + }, false); + + Assert.assertFalse("Hoodie MetaFolder will be present :" + collected2, collected2.stream() + .noneMatch(s -> s.contains(HoodieTableMetaClient.METAFOLDER_NAME))); + // Check if only files are listed including hoodie.properties + Assert.assertEquals("Collected=" + collected2, 5, collected2.size()); + } + @Test public void testGetCommitTime() { String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());