From d83b671ada9981780257b10acabfce7b4e5a8403 Mon Sep 17 00:00:00 2001 From: Prasanna Rajaperumal Date: Mon, 13 Mar 2017 23:09:29 -0700 Subject: [PATCH] Implement Savepoints and required metadata timeline - Part 2 --- .../cli/commands/SavepointsCommand.java | 4 +- .../com/uber/hoodie/HoodieWriteClient.java | 55 +++++++++++++++---- .../com/uber/hoodie/io/HoodieCleaner.java | 24 +++++--- .../com/uber/hoodie/table/HoodieTable.java | 37 +++++++++++++ .../com/uber/hoodie/TestHoodieClient.java | 6 +- .../main/avro/HoodieSavePointMetadata.avsc | 15 ++++- .../uber/hoodie/common/util/AvroUtils.java | 23 ++++++++ 7 files changed, 135 insertions(+), 29 deletions(-) diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java index b9dfc0764..d448b7396 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java @@ -97,9 +97,7 @@ public class SavepointsCommand implements CommandMarker { } HoodieWriteClient client = createHoodieClient(null, HoodieCLI.tableMetadata.getBasePath()); - HoodieSavepointMetadata metadata = new HoodieSavepointMetadata(user, - HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()), comments); - if (client.savepoint(commitTime, metadata)) { + if (client.savepoint(commitTime, user, comments)) { // Refresh the current refreshMetaClient(); return String.format("The commit \"%s\" has been savepointed.", commitTime); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 8b584b687..9c6914667 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -26,6 +26,7 @@ import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; @@ -33,8 +34,10 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; @@ -53,9 +56,12 @@ import com.uber.hoodie.metrics.HoodieMetrics; import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.WorkloadProfile; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Accumulator; @@ -63,6 +69,7 @@ import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; @@ -78,6 +85,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -396,16 +404,20 @@ public class HoodieWriteClient implements Seriali } /** - * Savepoint the latest commit. The data files and commit files for that commit will never be rolledback, - * cleaned or archived. This gives an option to rollback the state to the savepoint anytime. + * Savepoint a specific commit. Latest version of data files as of the passed in commitTime + * will be referenced in the savepoint and will never be cleaned. The savepointed commit + * will never be rolledback or archived. + * + * This gives an option to rollback the state to the savepoint anytime. * Savepoint needs to be manually created and deleted. * - * Savepoint should be on a commit that is not cleaned. + * Savepoint should be on a commit that could not have been cleaned. * - * @param savePointMetadata - metadata about the savepoint + * @param user - User creating the savepoint + * @param comment - Comment for the savepoint * @return true if the savepoint was created successfully */ - public boolean savepoint(HoodieSavepointMetadata savePointMetadata) { + public boolean savepoint(String user, String comment) { HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); if (table.getCompletedCommitTimeline().empty()) { @@ -414,20 +426,25 @@ public class HoodieWriteClient implements Seriali String latestCommit = table.getCompletedCommitTimeline().lastInstant().get().getTimestamp(); logger.info("Savepointing latest commit " + latestCommit); - return savepoint(latestCommit, savePointMetadata); + return savepoint(latestCommit, user, comment); } /** - * Savepoint a specific commit. The data files and commit files for that commit will never be rolledback, - * cleaned or archived. This gives an option to rollback the state to the savepoint anytime. + * Savepoint a specific commit. Latest version of data files as of the passed in commitTime + * will be referenced in the savepoint and will never be cleaned. The savepointed commit + * will never be rolledback or archived. + * + * This gives an option to rollback the state to the savepoint anytime. * Savepoint needs to be manually created and deleted. * - * Savepoint should be on a commit that is not cleaned. + * Savepoint should be on a commit that could not have been cleaned. * - * @param savePointMetadata - metadata about the savepoint + * @param commitTime - commit that should be savepointed + * @param user - User creating the savepoint + * @param comment - Comment for the savepoint * @return true if the savepoint was created successfully */ - public boolean savepoint(String commitTime, HoodieSavepointMetadata savePointMetadata) { + public boolean savepoint(String commitTime, String user, String comment) { HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); Optional cleanInstant = table.getCompletedCleanTimeline().lastInstant(); @@ -455,10 +472,24 @@ public class HoodieWriteClient implements Seriali "Could not savepoint commit " + commitTime + " as this is beyond the lookup window " + lastCommitRetained); + Map> latestFilesMap = jsc.parallelize( + FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath())) + .mapToPair((PairFunction>) partitionPath -> { + // Scan all partitions files with this commit time + logger.info("Collecting latest files in partition path " + partitionPath); + TableFileSystemView view = table.getFileSystemView(); + List latestFiles = + view.getLatestVersionInPartition(partitionPath, commitTime) + .map(HoodieDataFile::getFileName).collect(Collectors.toList()); + return new Tuple2>(partitionPath, latestFiles); + }).collectAsMap(); + + HoodieSavepointMetadata metadata = + AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap); // Nothing to save in the savepoint table.getActiveTimeline().saveAsComplete( new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, commitTime), - AvroUtils.serializeSavepointMetadata(savePointMetadata)); + AvroUtils.serializeSavepointMetadata(metadata)); logger.info("Savepoint " + commitTime + " created"); return true; } catch (IOException e) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java index ced3025db..62e0d414a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java @@ -21,6 +21,7 @@ import com.google.common.collect.Maps; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; @@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.FlatMapFunction; import java.io.IOException; import java.util.ArrayList; @@ -39,7 +41,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Cleaner is responsible for garbage collecting older files in a given partition path, such that @@ -50,16 +54,16 @@ import java.util.stream.Collectors; *

* TODO: Should all cleaning be done based on {@link com.uber.hoodie.common.model.HoodieCommitMetadata} */ -public class HoodieCleaner { +public class HoodieCleaner> { private static Logger logger = LogManager.getLogger(HoodieCleaner.class); private final TableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; - private HoodieTable hoodieTable; + private HoodieTable hoodieTable; private HoodieWriteConfig config; private FileSystem fs; - public HoodieCleaner(HoodieTable hoodieTable, HoodieWriteConfig config) { + public HoodieCleaner(HoodieTable hoodieTable, HoodieWriteConfig config) { this.hoodieTable = hoodieTable; this.fileSystemView = hoodieTable.getCompactedFileSystemView(); this.commitTimeline = hoodieTable.getCompletedCommitTimeline(); @@ -85,7 +89,9 @@ public class HoodieCleaner { fileSystemView.getEveryVersionInPartition(partitionPath) .collect(Collectors.toList()); List deletePaths = new ArrayList<>(); - List savepoints = hoodieTable.getSavepoints(); + // Collect all the datafiles savepointed by all the savepoints + List savepointedFiles = hoodieTable.getSavepoints().stream() + .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList()); for (List versionsForFileId : fileVersions) { int keepVersions = config.getCleanerFileVersionsRetained(); @@ -93,8 +99,8 @@ public class HoodieCleaner { while (commitItr.hasNext() && keepVersions > 0) { // Skip this most recent version HoodieDataFile next = commitItr.next(); - if(savepoints.contains(next.getCommitTime())) { - // do not clean datafiles that are savepointed + if(savepointedFiles.contains(next.getFileName())) { + // do not clean up a savepoint data file continue; } keepVersions--; @@ -134,7 +140,9 @@ public class HoodieCleaner { "Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); List deletePaths = new ArrayList<>(); - List savepoints = hoodieTable.getSavepoints(); + // Collect all the datafiles savepointed by all the savepoints + List savepointedFiles = hoodieTable.getSavepoints().stream() + .flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList()); // determine if we have enough commits, to start cleaning. if (commitTimeline.countInstants() > commitsRetained) { @@ -152,7 +160,7 @@ public class HoodieCleaner { // i.e always spare the last commit. for (HoodieDataFile afile : fileList) { String fileCommitTime = afile.getCommitTime(); - if(savepoints.contains(fileCommitTime)) { + if(savepointedFiles.contains(afile.getFileName())) { // do not clean up a savepoint data file continue; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 984ef155b..7c8e53e76 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -17,12 +17,15 @@ package com.uber.hoodie.table; import com.google.common.collect.Sets; +import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.avro.model.HoodieSavepointPartitionMetadata; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; +import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; @@ -30,13 +33,17 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieSavepointException; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.Partitioner; +import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Abstract implementation of a HoodieTable @@ -138,11 +145,41 @@ public abstract class HoodieTable implements Seri return getActiveTimeline().getSavePointTimeline().filterCompletedInstants(); } + /** + * Get the list of savepoints in this table + * @return + */ public List getSavepoints() { return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); } + /** + * Get the list of data file names savepointed + * + * @param savepointTime + * @return + * @throws IOException + */ + public Stream getSavepointedDataFiles(String savepointTime) { + if (!getSavepoints().contains(savepointTime)) { + throw new HoodieSavepointException( + "Could not get data files for savepoint " + savepointTime + ". No such savepoint."); + } + HoodieInstant instant = + new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); + HoodieSavepointMetadata metadata = null; + try { + metadata = AvroUtils.deserializeHoodieSavepointMetadata( + getActiveTimeline().getInstantDetails(instant).get()); + } catch (IOException e) { + throw new HoodieSavepointException( + "Could not get savepointed data files for savepoint " + savepointTime, e); + } + return metadata.getPartitionMetadata().values().stream() + .flatMap(s -> s.getSavepointDataFile().stream()); + } + public HoodieActiveTimeline getActiveTimeline() { return metaClient.getActiveTimeline(); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index a46182b50..701da91d5 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -366,8 +366,7 @@ public class TestHoodieClient implements Serializable { // Verify there are no errors assertNoWriteErrors(statuses); - client.savepoint(new HoodieSavepointMetadata("hoodie-unit-test", - HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()), "test")); + client.savepoint("hoodie-unit-test", "test"); try { client.rollback(newCommitTime); fail("Rollback of a savepoint was allowed " + newCommitTime); @@ -454,8 +453,7 @@ public class TestHoodieClient implements Serializable { // Verify there are no errors assertNoWriteErrors(statuses); - client.savepoint(new HoodieSavepointMetadata("hoodie-unit-test", - HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()), "test")); + client.savepoint("hoodie-unit-test", "test"); /** * Write 3 (updates) diff --git a/hoodie-common/src/main/avro/HoodieSavePointMetadata.avsc b/hoodie-common/src/main/avro/HoodieSavePointMetadata.avsc index 25c36591e..6fc92246a 100644 --- a/hoodie-common/src/main/avro/HoodieSavePointMetadata.avsc +++ b/hoodie-common/src/main/avro/HoodieSavePointMetadata.avsc @@ -3,7 +3,18 @@ "name": "HoodieSavepointMetadata", "fields": [ {"name": "savepointedBy", "type": "string"}, - {"name": "savepointedAt", "type": "string"}, - {"name": "comments", "type": "string"} + {"name": "savepointedAt", "type": "long"}, + {"name": "comments", "type": "string"}, + {"name": "partitionMetadata", "type": { + "type" : "map", "values" : { + "type": "record", + "name": "HoodieSavepointPartitionMetadata", + "fields": [ + {"name": "partitionPath", "type": "string"}, + {"name": "savepointDataFile", "type": {"type": "array", "items": "string"}} + ] + } + } + } ] } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index 8196abab5..611c16b44 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -24,9 +24,11 @@ import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata; import com.uber.hoodie.avro.model.HoodieRollbackMetadata; import com.uber.hoodie.avro.model.HoodieRollbackPartitionMetadata; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.avro.model.HoodieSavepointPartitionMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieAvroPayload; +import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.exception.HoodieIOException; @@ -57,7 +59,9 @@ import java.io.ByteArrayOutputStream; import java.io.FileWriter; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; public class AvroUtils { @@ -129,6 +133,20 @@ public class AvroUtils { totalDeleted, commits, partitionMetadataBuilder.build()); } + public static HoodieSavepointMetadata convertSavepointMetadata(String user, String comment, + Map> latestFiles) { + ImmutableMap.Builder partitionMetadataBuilder = + ImmutableMap.builder(); + for (Map.Entry> stat : latestFiles.entrySet()) { + HoodieSavepointPartitionMetadata metadata = + new HoodieSavepointPartitionMetadata(stat.getKey(), stat.getValue()); + partitionMetadataBuilder.put(stat.getKey(), metadata); + } + return new HoodieSavepointMetadata(user, System.currentTimeMillis(), comment, + partitionMetadataBuilder.build()); + } + + public static Optional serializeCleanMetadata(HoodieCleanMetadata metadata) throws IOException { return serializeAvroMetadata(metadata, HoodieCleanMetadata.class); @@ -160,6 +178,11 @@ public class AvroUtils { return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class); } + public static HoodieSavepointMetadata deserializeHoodieSavepointMetadata(byte[] bytes) + throws IOException { + return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class); + } + public static T deserializeAvroMetadata(byte[] bytes, Class clazz) throws IOException { DatumReader reader = new SpecificDatumReader<>(clazz);