diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieTableMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieTableMetadata.java index bf91a0f97..0bda301e9 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieTableMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieTableMetadata.java @@ -110,11 +110,11 @@ public class HoodieTableMetadata implements Serializable { throw new DatasetNotFoundException(this.basePath); } - // create .hoodie folder if it does not exist. this.metadataFolder = new Path(this.basePath, METAFOLDER_NAME); Path propertyPath = new Path(metadataFolder, HOODIE_PROPERTIES_FILE); if (!fs.exists(propertyPath)) { if (initOnMissing) { + // create .hoodie folder if it does not exist. createHoodieProperties(metadataFolder, tableName); } else { throw new InvalidDatasetException(this.basePath); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index b930ae240..df43aa89f 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -31,6 +31,7 @@ import java.util.UUID; public class HoodieTestUtils { public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; + public static final int DEFAULT_TASK_PARTITIONID = 1; public static final void initializeHoodieDirectory(String basePath) throws IOException { new File(basePath + "/" + HoodieTableMetadata.METAFOLDER_NAME).mkdirs(); @@ -78,12 +79,16 @@ public class HoodieTestUtils { public static final String createDataFile(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; new File(folderPath).mkdirs(); - new File(folderPath + FSUtils.makeDataFileName(commitTime, 1, fileID)).createNewFile(); + new File(folderPath + FSUtils.makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID)).createNewFile(); return fileID; } + public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { + return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID); + } + public static final boolean doesDataFileExist(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { - return new File(basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, 1, fileID)).exists(); + return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists(); } public static final boolean doesCommitExist(String basePath, String commitTime) { diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java new file mode 100644 index 000000000..c8f991587 --- /dev/null +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.uber.hoodie.hadoop; + +import com.uber.hoodie.common.model.HoodieTableMetadata; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.InvalidDatasetException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; + +/** + * Given a path is a part of + * - Hoodie dataset => accepts ONLY the latest version of each path + * - Non-Hoodie dataset => then always accept + * + * We can set this filter, on a query engine's Hadoop Config & if it respects path filters, then + * you should be able to query both hoodie & non-hoodie datasets as you would normally do. + * + * hadoopConf.setClass("mapreduce.input.pathFilter.class", + * com.uber.hoodie.hadoop.HoodieROTablePathFilter.class, + * org.apache.hadoop.fs.PathFilter.class) + * + */ +public class HoodieROTablePathFilter implements PathFilter, Serializable { + + public static final Log LOG = LogFactory.getLog(HoodieInputFormat.class); + + /** + * Its quite common, to have all files from a given partition path be passed into accept(), + * cache the check for hoodie metadata for known partition paths & the latest versions of files + */ + private HashMap> hoodiePathCache; + + /** + * Paths that are known to be non-hoodie datasets. + */ + private HashSet nonHoodiePathCache; + + + public HoodieROTablePathFilter() { + hoodiePathCache = new HashMap<>(); + nonHoodiePathCache = new HashSet<>(); + } + + /** + * Obtain the path, two levels from provided path + * + * @return said path if available, null otherwise + */ + private Path safeGetParentsParent(Path path) { + if (path.getParent() != null && path.getParent().getParent() != null && path.getParent().getParent().getParent() != null) { + return path.getParent().getParent().getParent(); + } + return null; + } + + + @Override + public boolean accept(Path path) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Checking acceptance for path " + path); + } + Path folder = null; + try { + FileSystem fs = path.getFileSystem(new Configuration()); + if (fs.isDirectory(path)) { + return true; + } + + // Assumes path is a file + folder = path.getParent(); // get the immediate parent. + // Try to use the caches. + if (nonHoodiePathCache.contains(folder.toString())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Accepting non-hoodie path from cache: " + path); + } + return true; + } + + if (hoodiePathCache.containsKey(folder.toString())) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("%s Hoodie path checked against cache, accept => %s \n", + path, + hoodiePathCache.get(folder.toString()).contains(path))); + } + return hoodiePathCache.get(folder.toString()).contains(path); + } + + // Perform actual checking. + Path baseDir = safeGetParentsParent(folder); + if (baseDir != null) { + try { + HoodieTableMetadata metadata = new HoodieTableMetadata(fs, baseDir.toString()); + FileStatus[] latestFiles = metadata.getLatestVersions(fs.listStatus(folder)); + // populate the cache + if (!hoodiePathCache.containsKey(folder.toString())) { + hoodiePathCache.put(folder.toString(), new HashSet()); + } + LOG.info("Based on hoodie metadata from base path: " + baseDir.toString() + + ", caching " + latestFiles.length+" files under "+ folder); + for (FileStatus lfile: latestFiles) { + hoodiePathCache.get(folder.toString()).add(lfile.getPath()); + } + + // accept the path, if its among the latest files. + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("%s checked after cache population, accept => %s \n", + path, + hoodiePathCache.get(folder.toString()).contains(path))); + } + return hoodiePathCache.get(folder.toString()).contains(path); + } catch (InvalidDatasetException e) { + // Non-hoodie path, accept it. + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("(1) Caching non-hoodie path under %s \n", + folder.toString())); + } + nonHoodiePathCache.add(folder.toString()); + return true; + } + } else { + // files is at < 3 level depth in FS tree, can't be hoodie dataset + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("(2) Caching non-hoodie path under %s \n", folder.toString())); + } + nonHoodiePathCache.add(folder.toString()); + return true; + } + } catch (Exception e) { + String msg = "Error checking path :" + path +", under folder: "+ folder; + LOG.error(msg, e); + throw new HoodieException(msg, e); + } + } +} diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestHoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestHoodieROTablePathFilter.java new file mode 100644 index 000000000..a6ce1c39a --- /dev/null +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestHoodieROTablePathFilter.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.uber.hoodie.hadoop; + +import com.uber.hoodie.common.model.HoodieTestUtils; + +import org.apache.hadoop.fs.Path; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +/** + */ +public class TestHoodieROTablePathFilter { + + @Test + public void testHoodiePaths() throws IOException { + // Create a temp folder as the base path + String basePath = HoodieTestUtils.initializeTempHoodieBasePath(); + + HoodieTestUtils.createCommitFiles(basePath, "001", "002"); + HoodieTestUtils.createInflightCommitFiles(basePath, "003"); + + HoodieTestUtils.createDataFile(basePath, "2017/01/01", "001", "f1"); + HoodieTestUtils.createDataFile(basePath, "2017/01/01", "001", "f2"); + HoodieTestUtils.createDataFile(basePath, "2017/01/01", "001", "f3"); + HoodieTestUtils.createDataFile(basePath, "2017/01/01", "002", "f2"); + HoodieTestUtils.createDataFile(basePath, "2017/01/01", "003", "f3"); + + HoodieROTablePathFilter pathFilter = new HoodieROTablePathFilter(); + Path partitionPath = new Path("file://" + basePath + File.separator + "2017/01/01"); + assertTrue("Directories should be accepted", pathFilter.accept(partitionPath)); + + assertTrue(pathFilter.accept(new Path("file://" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "001", "f1")))); + assertFalse(pathFilter.accept(new Path("file://" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "001", "f2")))); + assertTrue(pathFilter.accept(new Path("file://" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "001", "f3")))); + assertTrue(pathFilter.accept(new Path("file://" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "002", "f2")))); + assertFalse(pathFilter.accept(new Path("file://" + HoodieTestUtils.getDataFilePath(basePath, "2017/01/01", "003", "f3")))); + } + + @Test + public void testNonHoodiePaths() throws IOException { + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + String basePath = folder.getRoot().getAbsolutePath(); + HoodieROTablePathFilter pathFilter = new HoodieROTablePathFilter(); + + String path = basePath + File.separator + "nonhoodiefolder"; + new File(path).mkdirs(); + assertTrue(pathFilter.accept(new Path("file://" + path))); + + path = basePath + File.separator + "nonhoodiefolder/somefile"; + new File(path).createNewFile(); + assertTrue(pathFilter.accept(new Path("file://" + path))); + } +}