diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java index 8104de96a..a0b9810c0 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java @@ -80,7 +80,7 @@ public class RepairsCommand implements CommandMarker { final boolean dryRun) throws IOException { String latestCommit = HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); - List partitionPaths = FSUtils.getAllPartitionPaths(HoodieCLI.fs, + List partitionPaths = FSUtils.getAllFoldersThreeLevelsDown(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath()); Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath()); String[][] rows = new String[partitionPaths.size() + 1][]; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index 701da91d5..30035990f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -18,31 +18,30 @@ package com.uber.hoodie; import com.google.common.collect.Iterables; -import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; -import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ParquetUtils; -import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; +import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; - import com.uber.hoodie.table.HoodieTable; + import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -60,7 +59,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -122,6 +120,15 @@ public class TestHoodieClient implements Serializable { } } + private void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException { + for (String partitionPath: partitionPaths) { + assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath))); + HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath)); + pmeta.readFromFS(); + assertEquals(3, pmeta.getPartitionDepth()); + } + } + private void checkTaggedRecords(List taggedRecords, String commitTime) { for (HoodieRecord rec : taggedRecords) { assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown()); @@ -203,6 +210,9 @@ public class TestHoodieClient implements Serializable { List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); + // check the partition metadata is written out + assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs); + // verify that there is a commit HoodieReadClient readClient = new HoodieReadClient(jsc, basePath, sqlContext); assertEquals("Expecting a single commit.", readClient.listCommitsSince("000").size(), 1); @@ -348,6 +358,7 @@ public class TestHoodieClient implements Serializable { .build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); FileSystem fs = FSUtils.getFs(); + HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); /** * Write 1 (only inserts) @@ -433,6 +444,7 @@ public class TestHoodieClient implements Serializable { .build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); FileSystem fs = FSUtils.getFs(); + HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); /** * Write 1 (only inserts) @@ -712,6 +724,10 @@ public class TestHoodieClient implements Serializable { String commitTime2 = "20160502020601"; String commitTime3 = "20160506030611"; new File(basePath + "/.hoodie").mkdirs(); + HoodieTestDataGenerator.writePartitionMetadata(FSUtils.getFs(), + new String[] {"2016/05/01", "2016/05/02", "2016/05/06"}, + basePath); + // Only first two have commit files HoodieTestUtils.createCommitFiles(basePath, commitTime1, commitTime2); @@ -801,6 +817,9 @@ public class TestHoodieClient implements Serializable { String commitTime2 = "20160502020601"; String commitTime3 = "20160506030611"; new File(basePath + "/.hoodie").mkdirs(); + HoodieTestDataGenerator.writePartitionMetadata(FSUtils.getFs(), + new String[] {"2016/05/01", "2016/05/02", "2016/05/06"}, + basePath); // One good commit HoodieTestUtils.createCommitFiles(basePath, commitTime1); @@ -984,7 +1003,6 @@ public class TestHoodieClient implements Serializable { // setup the small file handling params HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH}); - HoodieWriteClient client = new HoodieWriteClient(jsc, config); // Inserts => will write file1 @@ -995,6 +1013,7 @@ public class TestHoodieClient implements Serializable { List statuses= client.insert(insertRecordsRDD1, commitTime1).collect(); assertNoWriteErrors(statuses); + assertPartitionMetadata(new String[]{TEST_PARTITION_PATH}, FSUtils.getFs()); assertEquals("Just 1 file needs to be added.", 1, statuses.size()); String file1 = statuses.get(0).getFileId(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index e2b50c544..e9ed3b1ae 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -63,11 +64,19 @@ public class HoodieTestDataGenerator { // based on examination of sample file, the schema produces the following per record size public static final int SIZE_PER_RECORD = 50 * 1024; + public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"}; + + + public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) { + for (String partitionPath: partitionPaths) { + new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath)).trySave(0); + } + } private List existingKeysList = new ArrayList<>(); public static Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA)); private static Random rand = new Random(46474747); - private String[] partitionPaths = {"2016/03/15", "2015/03/16", "2015/03/17"}; + private String[] partitionPaths = DEFAULT_PARTITION_PATHS; public HoodieTestDataGenerator(String[] partitionPaths) { this.partitionPaths = partitionPaths; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 0709a1b06..f40aa6ee1 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -60,7 +60,6 @@ import static org.junit.Assert.assertTrue; public class TestHoodieCompactor { private transient JavaSparkContext jsc = null; - private transient SQLContext sqlContext; private String basePath = null; private HoodieCompactor compactor; private transient HoodieTestDataGenerator dataGen = null; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java index 76c76a59b..68f3ec421 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodiePartitionMetadata.java @@ -71,7 +71,7 @@ public class HoodiePartitionMetadata { } public int getPartitionDepth() { - if (!props.contains(PARTITION_DEPTH_KEY)) { + if (!props.containsKey(PARTITION_DEPTH_KEY)) { throw new HoodieException("Could not find partitionDepth in partition metafile"); } return Integer.parseInt(props.getProperty(PARTITION_DEPTH_KEY)); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index f5451058f..3d1fad843 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -19,6 +19,7 @@ package com.uber.hoodie.common.util; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; + import com.uber.hoodie.avro.model.HoodieCleanMetadata; import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata; import com.uber.hoodie.avro.model.HoodieRollbackMetadata; @@ -28,10 +29,10 @@ import com.uber.hoodie.avro.model.HoodieSavepointPartitionMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieAvroPayload; -import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.exception.HoodieIOException; + import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; @@ -40,29 +41,20 @@ import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.file.SeekableInput; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; import org.apache.avro.mapred.FsInput; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.fs.AvroFSInput; -import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.ByteArrayOutputStream; -import java.io.FileWriter; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; public class AvroUtils { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 936b663d5..35308171f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -18,14 +18,19 @@ package com.uber.hoodie.common.util; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.table.log.HoodieLogFile; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.InvalidHoodiePathException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -75,7 +80,7 @@ public class FSUtils { fs = FileSystem.get(conf); } catch (IOException e) { throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), - e); + e); } LOG.info(String.format("Hadoop Configuration: fs.defaultFS: [%s], Config:[%s], FileSystem: [%s]", conf.getRaw("fs.defaultFS"), conf.toString(), fs.toString())); @@ -111,25 +116,43 @@ public class FSUtils { return fs.listStatus(path)[0].getLen(); } - // TODO (weiy): rename the function for better readability public static String getFileId(String fullFileName) { return fullFileName.split("_")[0]; } + /** - * Obtain all the partition paths, that are present in this table. + * Gets all partition paths assuming date partitioning (year, month, day) three levels down. */ - public static List getAllPartitionPaths(FileSystem fs, String basePath) - throws IOException { - List partitionsToClean = new ArrayList<>(); - // TODO(vc): For now, assume partitions are two levels down from base path. + public static List getAllFoldersThreeLevelsDown(FileSystem fs, String basePath) throws IOException { + List datePartitions = new ArrayList<>(); FileStatus[] folders = fs.globStatus(new Path(basePath + "/*/*/*")); for (FileStatus status : folders) { Path path = status.getPath(); - partitionsToClean.add(String.format("%s/%s/%s", path.getParent().getParent().getName(), - path.getParent().getName(), path.getName())); + datePartitions.add(String.format("%s/%s/%s", path.getParent().getParent().getName(), + path.getParent().getName(), path.getName())); } - return partitionsToClean; + return datePartitions; + } + + /** + * Obtain all the partition paths, that are present in this table, denoted by presence of {@link + * com.uber.hoodie.common.model.HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE} + */ + public static List getAllPartitionPaths(FileSystem fs, String basePathStr) + throws IOException { + List partitions = new ArrayList<>(); + Path basePath = new Path(basePathStr); + RemoteIterator allFiles = fs.listFiles(new Path(basePathStr), true); + while (allFiles.hasNext()) { + Path filePath = allFiles.next().getPath(); + if (filePath.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + String partitionFullPath = filePath.getParent().toString(); + int partitionStartIndex = partitionFullPath.lastIndexOf(basePath.getName()) ; + partitions.add(partitionFullPath.substring(partitionStartIndex + basePath.getName().length() + 1)); + } + } + return partitions; } public static String getFileExtension(String fullName) { @@ -146,42 +169,34 @@ public class FSUtils { /** * Get the file extension from the log file - * @param logPath - * @return */ public static String getFileExtensionFromLog(Path logPath) { Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); - if(!matcher.find()) { + if (!matcher.find()) { throw new InvalidHoodiePathException(logPath, "LogFile"); } return matcher.group(3) + "." + matcher.group(4); } /** - * Get the first part of the file name in the log file. That will be the fileId. - * Log file do not have commitTime in the file name. - * - * @param path - * @return + * Get the first part of the file name in the log file. That will be the fileId. Log file do not + * have commitTime in the file name. */ public static String getFileIdFromLogPath(Path path) { Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); - if(!matcher.find()) { + if (!matcher.find()) { throw new InvalidHoodiePathException(path, "LogFile"); } return matcher.group(1); } /** - * Get the first part of the file name in the log file. That will be the fileId. - * Log file do not have commitTime in the file name. - * - * @param path - * @return + * Get the first part of the file name in the log file. That will be the fileId. Log file do not + * have commitTime in the file name. */ public static String getBaseCommitTimeFromLogPath(Path path) { Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); - if(!matcher.find()) { + if (!matcher.find()) { throw new InvalidHoodiePathException(path, "LogFile"); } return matcher.group(2); @@ -189,20 +204,17 @@ public class FSUtils { /** * Get the last part of the file name in the log file and convert to int. - * - * @param logPath - * @return */ public static int getFileVersionFromLog(Path logPath) { Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); - if(!matcher.find()) { + if (!matcher.find()) { throw new InvalidHoodiePathException(logPath, "LogFile"); } return Integer.parseInt(matcher.group(5)); } public static String makeLogFileName(String fileId, String logFileExtension, - String baseCommitTime, int version) { + String baseCommitTime, int version) { return String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version); } @@ -213,46 +225,30 @@ public class FSUtils { /** * Get the latest log file written from the list of log files passed in - * - * @param logFiles - * @return */ public static Optional getLatestLogFile(Stream logFiles) { return logFiles.sorted(Comparator - .comparing(s -> s.getLogVersion(), - Comparator.reverseOrder())).findFirst(); + .comparing(s -> s.getLogVersion(), + Comparator.reverseOrder())).findFirst(); } /** * Get all the log files for the passed in FileId in the partition path - * - * @param fs - * @param partitionPath - * @param fileId - * @param logFileExtension - * @return */ public static Stream getAllLogFiles(FileSystem fs, Path partitionPath, - final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { + final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { return Arrays.stream(fs.listStatus(partitionPath, - path -> path.getName().startsWith(fileId) && path.getName().contains(logFileExtension))) - .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); + path -> path.getName().startsWith(fileId) && path.getName().contains(logFileExtension))) + .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); } /** * Get the latest log version for the fileId in the partition path - * - * @param fs - * @param partitionPath - * @param fileId - * @param logFileExtension - * @return - * @throws IOException */ public static Optional getLatestLogVersion(FileSystem fs, Path partitionPath, - final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { + final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { Optional latestLogFile = - getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime)); + getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime)); if (latestLogFile.isPresent()) { return Optional.of(latestLogFile.get().getLogVersion()); } @@ -260,26 +256,20 @@ public class FSUtils { } public static int getCurrentLogVersion(FileSystem fs, Path partitionPath, - final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { + final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { Optional currentVersion = - getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); + getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); // handle potential overflow return (currentVersion.isPresent()) ? currentVersion.get() : 1; } /** * computes the next log version for the specified fileId in the partition path - * - * @param fs - * @param partitionPath - * @param fileId - * @return - * @throws IOException */ public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId, - final String logFileExtension, final String baseCommitTime) throws IOException { + final String logFileExtension, final String baseCommitTime) throws IOException { Optional currentVersion = - getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); + getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); // handle potential overflow return (currentVersion.isPresent()) ? currentVersion.get() + 1 : 1; } @@ -297,17 +287,13 @@ public class FSUtils { } /** - * When a file was opened and the task died without closing the stream, another task executor cannot open because the existing lease will be active. - * We will try to recover the lease, from HDFS. If a data node went down, it takes about 10 minutes for the lease to be rocovered. - * But if the client dies, this should be instant. - * - * @param dfs - * @param p - * @return - * @throws IOException + * When a file was opened and the task died without closing the stream, another task executor + * cannot open because the existing lease will be active. We will try to recover the lease, from + * HDFS. If a data node went down, it takes about 10 minutes for the lease to be rocovered. But + * if the client dies, this should be instant. */ public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p) - throws IOException, InterruptedException { + throws IOException, InterruptedException { LOG.info("Recover lease on dfs file " + p); // initiate the recovery boolean recovered = false; @@ -324,7 +310,7 @@ public class FSUtils { } public static void deleteOlderCleanMetaFiles(FileSystem fs, String metaPath, - Stream instants) { + Stream instants) { //TODO - this should be archived when archival is made general for all meta-data // skip MIN_CLEAN_TO_KEEP and delete rest instants.skip(MIN_CLEAN_TO_KEEP).map(s -> { @@ -332,13 +318,13 @@ public class FSUtils { return fs.delete(new Path(metaPath, s.getFileName()), false); } catch (IOException e) { throw new HoodieIOException("Could not delete clean meta files" + s.getFileName(), - e); + e); } }); } public static void deleteOlderRollbackMetaFiles(FileSystem fs, String metaPath, - Stream instants) { + Stream instants) { //TODO - this should be archived when archival is made general for all meta-data // skip MIN_ROLLBACK_TO_KEEP and delete rest instants.skip(MIN_ROLLBACK_TO_KEEP).map(s -> { @@ -346,7 +332,7 @@ public class FSUtils { return fs.delete(new Path(metaPath, s.getFileName()), false); } catch (IOException e) { throw new HoodieIOException( - "Could not delete rollback meta files " + s.getFileName(), e); + "Could not delete rollback meta files " + s.getFileName(), e); } }); } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index 7234491e1..ce7ec2b3b 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -22,6 +22,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -50,6 +51,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Hoodie snapshot copy job which copies latest files from all partitions to another place, for snapshot backup. @@ -71,13 +73,14 @@ public class HoodieSnapshotCopier implements Serializable { final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); // Get the latest commit - final Optional - latestCommit = tableMetadata.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + Optional latestCommit = tableMetadata.getActiveTimeline() + .getCommitTimeline().filterCompletedInstants().lastInstant(); if(!latestCommit.isPresent()) { logger.warn("No commits present. Nothing to snapshot"); - } else { - logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %targetBasePath.", latestCommit.get())); + return; } + final String latestCommitTimestamp = latestCommit.get().getTimestamp(); + logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp)); List partitions = FSUtils.getAllPartitionPaths(fs, baseDir); if (partitions.size() > 0) { @@ -90,51 +93,47 @@ public class HoodieSnapshotCopier implements Serializable { fs.delete(new Path(outputDir), true); } - jsc.parallelize(partitions, partitions.size()).flatMap(new FlatMapFunction>() { - @Override - public Iterator> call(String partition) throws Exception { - // Only take latest version files <= latestCommit. - FileSystem fs = FSUtils.getFs(); - List> filePaths = new ArrayList<>(); - for (HoodieDataFile hoodieDataFile : fsView - .getLatestVersionInPartition(partition, latestCommit.get().getTimestamp()) - .collect(Collectors.toList())) { - filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath())); - } - return filePaths.iterator(); - } - }).foreach(new VoidFunction>() { - @Override - public void call(Tuple2 tuple) throws Exception { - String partition = tuple._1(); - Path sourceFilePath = new Path(tuple._2()); - Path toPartitionPath = new Path(outputDir, partition); - FileSystem fs = FSUtils.getFs(); + jsc.parallelize(partitions, partitions.size()) + .flatMap(partition -> { + // Only take latest version files <= latestCommit. + FileSystem fs1 = FSUtils.getFs(); + List> filePaths = new ArrayList<>(); + Stream dataFiles = fsView.getLatestVersionInPartition(partition, latestCommitTimestamp); + dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath()))); - if (!fs.exists(toPartitionPath)) { - fs.mkdirs(toPartitionPath); - } - FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), - false, fs.getConf()); - } + // also need to copy over partition metadata + Path partitionMetaFile = new Path(new Path(baseDir, partition), HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE); + if (fs1.exists(partitionMetaFile)) { + filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString())); + } + + return filePaths.iterator(); + }).foreach(tuple -> { + String partition = tuple._1(); + Path sourceFilePath = new Path(tuple._2()); + Path toPartitionPath = new Path(outputDir, partition); + FileSystem fs1 = FSUtils.getFs(); + + if (!fs1.exists(toPartitionPath)) { + fs1.mkdirs(toPartitionPath); + } + FileUtil.copy(fs1, sourceFilePath, fs1, + new Path(toPartitionPath, sourceFilePath.getName()), false, fs1.getConf()); }); // Also copy the .commit files - logger.info(String.format("Copying .commit files which are no-late-than %targetBasePath.", latestCommit.get())); + logger.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp)); FileStatus[] commitFilesToCopy = fs.listStatus( - new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), new PathFilter() { - @Override - public boolean accept(Path commitFilePath) { - if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) { - return true; - } else { - String commitTime = - FSUtils.getCommitFromCommitFile(commitFilePath.getName()); - return tableMetadata.getActiveTimeline().getCommitTimeline() - .compareTimestamps(commitTime, latestCommit.get().getTimestamp(), HoodieTimeline.GREATER); - } - } - }); + new Path(baseDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> { + if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) { + return true; + } else { + String commitTime = + FSUtils.getCommitFromCommitFile(commitFilePath.getName()); + return tableMetadata.getActiveTimeline().getCommitTimeline() + .compareTimestamps(commitTime, latestCommitTimestamp, HoodieTimeline.LESSER_OR_EQUAL); + } + }); for (FileStatus commitStatus : commitFilesToCopy) { Path targetFilePath = new Path( outputDir + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java index 868d4617b..64e568105 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java @@ -16,6 +16,7 @@ package com.uber.hoodie.utilities; +import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.util.FSUtils; import org.apache.hadoop.fs.FileSystem; @@ -64,77 +65,79 @@ public class TestHoodieSnapshotCopier { HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); copier.snapshot(jsc, basePath, outputPath); - // Nothing changed except _SUCCESS + // Nothing changed; we just bail out assertEquals(fs.listStatus(new Path(basePath)).length, 1); - assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS"))); + assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS"))); } //TODO - uncomment this after fixing test failures -// @Test -// public void testSnapshotCopy() throws Exception { -// // Generate some commits and corresponding parquets -// String commitTime1 = "20160501010101"; -// String commitTime2 = "20160502020601"; -// String commitTime3 = "20160506030611"; -// new File(basePath + "/.hoodie").mkdirs(); -// new File(basePath + "/.hoodie/hoodie.properties").createNewFile(); -// // Only first two have commit files -// new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); -// new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); -// new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile(); -// -// // Some parquet files -// new File(basePath + "/2016/05/01/").mkdirs(); -// new File(basePath + "/2016/05/02/").mkdirs(); -// new File(basePath + "/2016/05/06/").mkdirs(); -// -// // Make commit1 -// File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, 1, "id11")); -// file11.createNewFile(); -// File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, 1, "id12")); -// file12.createNewFile(); -// File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, 1, "id13")); -// file13.createNewFile(); -// -// // Make commit2 -// File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, 1, "id21")); -// file21.createNewFile(); -// File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, 1, "id22")); -// file22.createNewFile(); -// File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, 1, "id23")); -// file23.createNewFile(); -// -// // Make commit3 -// File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, 1, "id31")); -// file31.createNewFile(); -// File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, 1, "id32")); -// file32.createNewFile(); -// File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, 1, "id33")); -// file33.createNewFile(); -// -// // Do a snapshot copy -// HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); -// copier.snapshot(jsc, basePath, outputPath); -// -// // Check results -// assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName()))); -// assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName()))); -// assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName()))); -// assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName()))); -// assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName()))); -// assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName()))); -// assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName()))); -// assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName()))); -// assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName()))); -// -// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit"))); -// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit"))); -// assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit"))); -// assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight"))); -// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties"))); -// -// assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS"))); -// } + @Test + public void testSnapshotCopy() throws Exception { + // Generate some commits and corresponding parquets + String commitTime1 = "20160501010101"; + String commitTime2 = "20160502020601"; + String commitTime3 = "20160506030611"; + new File(basePath + "/.hoodie").mkdirs(); + new File(basePath + "/.hoodie/hoodie.properties").createNewFile(); + // Only first two have commit files + new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); + new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); + new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile(); + + // Some parquet files + new File(basePath + "/2016/05/01/").mkdirs(); + new File(basePath + "/2016/05/02/").mkdirs(); + new File(basePath + "/2016/05/06/").mkdirs(); + HoodieTestDataGenerator.writePartitionMetadata(fs, + new String[] {"2016/05/01", "2016/05/02", "2016/05/06"}, + basePath); + // Make commit1 + File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, 1, "id11")); + file11.createNewFile(); + File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, 1, "id12")); + file12.createNewFile(); + File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, 1, "id13")); + file13.createNewFile(); + + // Make commit2 + File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, 1, "id21")); + file21.createNewFile(); + File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, 1, "id22")); + file22.createNewFile(); + File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, 1, "id23")); + file23.createNewFile(); + + // Make commit3 + File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, 1, "id31")); + file31.createNewFile(); + File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, 1, "id32")); + file32.createNewFile(); + File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, 1, "id33")); + file33.createNewFile(); + + // Do a snapshot copy + HoodieSnapshotCopier copier = new HoodieSnapshotCopier(); + copier.snapshot(jsc, basePath, outputPath); + + // Check results + assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName()))); + assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName()))); + assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName()))); + assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName()))); + assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName()))); + assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName()))); + assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName()))); + assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName()))); + assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName()))); + + assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit"))); + assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit"))); + assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit"))); + assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight"))); + assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties"))); + + assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS"))); + } @After public void cleanup() {