diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodiePrompt.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodiePrompt.java index 66e189488..268ec1721 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodiePrompt.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodiePrompt.java @@ -28,18 +28,19 @@ public class HoodiePrompt extends DefaultPromptProvider { @Override public String getPrompt() { - String tableName = HoodieCLI.tableMetadata.getTableConfig().getTableName(); - switch (HoodieCLI.state) { - case INIT: - return "hoodie->"; - case DATASET: - return "hoodie:" + tableName + "->"; - case SYNC: - return "hoodie:" + tableName + " <==> " - + HoodieCLI.syncTableMetadata.getTableConfig().getTableName() + "->"; - } - if (HoodieCLI.tableMetadata != null) + if (HoodieCLI.tableMetadata != null) { + String tableName = HoodieCLI.tableMetadata.getTableConfig().getTableName(); + switch (HoodieCLI.state) { + case INIT: + return "hoodie->"; + case DATASET: + return "hoodie:" + tableName + "->"; + case SYNC: + return "hoodie:" + tableName + " <==> " + + HoodieCLI.syncTableMetadata.getTableConfig().getTableName() + "->"; + } return "hoodie:" + tableName + "->"; + } return "hoodie->"; } diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java new file mode 100644 index 000000000..160b9f3c6 --- /dev/null +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.uber.hoodie.cli.commands; + +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata; +import com.uber.hoodie.cli.HoodieCLI; +import com.uber.hoodie.cli.HoodiePrintHelper; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliAvailabilityIndicator; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Component +public class CleansCommand implements CommandMarker { + @CliAvailabilityIndicator({"cleans show"}) + public boolean isShowAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliAvailabilityIndicator({"cleans refresh"}) + public boolean isRefreshAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliAvailabilityIndicator({"clean showpartitions"}) + public boolean isCommitShowAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliCommand(value = "cleans show", help = "Show the cleans") + public String showCleans() throws IOException { + HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants(); + List cleans = timeline.getInstants().collect(Collectors.toList()); + String[][] rows = new String[cleans.size()][]; + Collections.reverse(cleans); + for (int i = 0; i < cleans.size(); i++) { + HoodieInstant clean = cleans.get(i); + HoodieCleanMetadata cleanMetadata = + AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get()); + rows[i] = new String[] {clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), + String.valueOf(cleanMetadata.getTotalFilesDeleted()), + String.valueOf(cleanMetadata.getTimeTakenInMillis())}; + } + return HoodiePrintHelper.print( + new String[] {"CleanTime", "EarliestCommandRetained", "Total Files Deleted", + "Total Time Taken"}, rows); + } + + @CliCommand(value = "cleans refresh", help = "Refresh the commits") + public String refreshCleans() throws IOException { + HoodieTableMetaClient metadata = + new HoodieTableMetaClient(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath()); + HoodieCLI.setTableMetadata(metadata); + return "Metadata for table " + metadata.getTableConfig().getTableName() + " refreshed."; + } + + @CliCommand(value = "clean showpartitions", help = "Show partition level details of a clean") + public String showCleanPartitions( + @CliOption(key = {"clean"}, help = "clean to show") + final String commitTime) throws Exception { + HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants(); + HoodieInstant cleanInstant = + new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, commitTime); + + if (!timeline.containsInstant(cleanInstant)) { + return "Clean " + commitTime + " not found in metadata " + timeline; + } + HoodieCleanMetadata cleanMetadata = + AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(cleanInstant).get()); + List rows = new ArrayList<>(); + for (Map.Entry entry : cleanMetadata.getPartitionMetadata().entrySet()) { + String path = entry.getKey(); + HoodieCleanPartitionMetadata stats = entry.getValue(); + String policy = stats.getPolicy(); + String totalSuccessDeletedFiles = String.valueOf(stats.getSuccessDeleteFiles().size()); + String totalFailedDeletedFiles = String.valueOf(stats.getFailedDeleteFiles().size()); + rows.add(new String[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles}); + } + return HoodiePrintHelper.print( + new String[] {"Partition Path", "Cleaning policy", "Total Files Successfully Deleted", + "Total Failed Deletions"}, rows.toArray(new String[rows.size()][])); + } +} diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java new file mode 100644 index 000000000..b9dfc0764 --- /dev/null +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.uber.hoodie.cli.commands; + +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.cli.HoodieCLI; +import com.uber.hoodie.cli.HoodiePrintHelper; +import com.uber.hoodie.cli.utils.InputStreamConsumer; +import com.uber.hoodie.cli.utils.SparkUtil; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.HoodieIndex; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.launcher.SparkLauncher; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliAvailabilityIndicator; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +@Component +public class SavepointsCommand implements CommandMarker { + @CliAvailabilityIndicator({"savepoints show"}) + public boolean isShowAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliAvailabilityIndicator({"savepoints refresh"}) + public boolean isRefreshAvailable() { + return HoodieCLI.tableMetadata != null; + } + + + @CliAvailabilityIndicator({"savepoint create"}) + public boolean isCreateSavepointAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliAvailabilityIndicator({"savepoint rollback"}) + public boolean isRollbackToSavepointAvailable() { + return HoodieCLI.tableMetadata != null && !HoodieCLI.tableMetadata.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty(); + } + + @CliCommand(value = "savepoints show", help = "Show the savepoints") + public String showSavepoints() throws IOException { + HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTimeline timeline = activeTimeline.getSavePointTimeline().filterCompletedInstants(); + List commits = timeline.getInstants().collect(Collectors.toList()); + String[][] rows = new String[commits.size()][]; + Collections.reverse(commits); + for (int i = 0; i < commits.size(); i++) { + HoodieInstant commit = commits.get(i); + rows[i] = new String[] {commit.getTimestamp()}; + } + return HoodiePrintHelper.print(new String[] {"SavepointTime"}, rows); + } + + @CliCommand(value = "savepoint create", help = "Savepoint a commit") + public String savepoint( + @CliOption(key = {"commit"}, help = "Commit to savepoint") + final String commitTime, + @CliOption(key = {"user"}, help = "User who is creating the savepoint") + final String user, + @CliOption(key = {"comments"}, help = "Comments for creating the savepoint") + final String comments) throws Exception { + HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); + HoodieInstant + commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); + + if (!timeline.containsInstant(commitInstant)) { + return "Commit " + commitTime + " not found in Commits " + timeline; + } + + HoodieWriteClient client = createHoodieClient(null, HoodieCLI.tableMetadata.getBasePath()); + HoodieSavepointMetadata metadata = new HoodieSavepointMetadata(user, + HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()), comments); + if (client.savepoint(commitTime, metadata)) { + // Refresh the current + refreshMetaClient(); + return String.format("The commit \"%s\" has been savepointed.", commitTime); + } + return String.format("Failed: Could not savepoint commit \"%s\".", commitTime); + } + + @CliCommand(value = "savepoint rollback", help = "Savepoint a commit") + public String rollbackToSavepoint( + @CliOption(key = {"savepoint"}, help = "Savepoint to rollback") + final String commitTime, + @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") + final String sparkPropertiesPath) throws Exception { + HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); + HoodieInstant + commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); + + if (!timeline.containsInstant(commitInstant)) { + return "Commit " + commitTime + " not found in Commits " + timeline; + } + + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), + commitTime, + HoodieCLI.tableMetadata.getBasePath()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + // Refresh the current + refreshMetaClient(); + if (exitCode != 0) { + return "Savepoint " + commitTime + " failed to roll back"; + } + return "Savepoint " + commitTime + " rolled back"; + } + + + @CliCommand(value = "savepoints refresh", help = "Refresh the savepoints") + public String refreshMetaClient() throws IOException { + HoodieTableMetaClient metadata = + new HoodieTableMetaClient(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath()); + HoodieCLI.setTableMetadata(metadata); + return "Metadata for table " + metadata.getTableConfig().getTableName() + " refreshed."; + } + + private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) + throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .build(); + return new HoodieWriteClient(jsc, config, false); + } + + + +} diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java index c6f4c1327..227226f00 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java @@ -28,6 +28,8 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import java.util.Date; + public class SparkMain { protected final static Logger LOG = Logger.getLogger(SparkMain.class); @@ -38,7 +40,8 @@ public class SparkMain { */ enum SparkCommand { ROLLBACK, - DEDUPLICATE + DEDUPLICATE, + ROLLBACK_TO_SAVEPOINT, SAVEPOINT } public static void main(String[] args) throws Exception { @@ -55,6 +58,9 @@ public class SparkMain { } else if(SparkCommand.DEDUPLICATE.equals(cmd)) { assert (args.length == 4); returnCode = deduplicatePartitionPath(jsc, args[1], args[2], args[3]); + } else if(SparkCommand.ROLLBACK_TO_SAVEPOINT.equals(cmd)) { + assert (args.length == 3); + returnCode = rollbackToSavepoint(jsc, args[1], args[2]); } System.exit(returnCode); @@ -76,11 +82,23 @@ public class SparkMain { HoodieWriteClient client = createHoodieClient(jsc, basePath); if (client.rollback(commitTime)) { LOG.info(String.format("The commit \"%s\" rolled back.", commitTime)); - return -1; + return 0; } else { LOG.info(String.format("The commit \"%s\" failed to roll back.", commitTime)); + return -1; + } + } + + private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) + throws Exception { + HoodieWriteClient client = createHoodieClient(jsc, basePath); + if (client.rollbackToSavepoint(savepointTime)) { + LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime)); + return 0; + } else { + LOG.info(String.format("The commit \"%s\" failed to roll back.", savepointTime)); + return -1; } - return 0; } private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index b87e84c7d..8b584b687 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -17,6 +17,14 @@ package com.uber.hoodie; import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackMetadata; +import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.common.HoodieCleanStat; +import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; @@ -27,12 +35,15 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCommitException; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.exception.HoodieRollbackException; +import com.uber.hoodie.exception.HoodieSavepointException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.BulkInsertMapFunction; import com.uber.hoodie.index.HoodieIndex; @@ -62,14 +73,19 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.text.ParseException; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.spark.util.AccumulatorV2; +import org.apache.spark.util.LongAccumulator; import scala.Option; import scala.Tuple2; @@ -359,7 +375,7 @@ public class HoodieWriteClient implements Seriali // We cannot have unbounded commit files. Archive commits if we have to archive archiveLog.archiveIfRequired(); // Call clean to cleanup if there is anything to cleanup after the commit, - clean(); + clean(commitTime); if (writeContext != null) { long durationInMs = metrics.getDurationInMs(writeContext.stop()); metrics.updateCommitMetrics( @@ -379,6 +395,143 @@ public class HoodieWriteClient implements Seriali return true; } + /** + * Savepoint the latest commit. The data files and commit files for that commit will never be rolledback, + * cleaned or archived. This gives an option to rollback the state to the savepoint anytime. + * Savepoint needs to be manually created and deleted. + * + * Savepoint should be on a commit that is not cleaned. + * + * @param savePointMetadata - metadata about the savepoint + * @return true if the savepoint was created successfully + */ + public boolean savepoint(HoodieSavepointMetadata savePointMetadata) { + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + if (table.getCompletedCommitTimeline().empty()) { + throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); + } + + String latestCommit = table.getCompletedCommitTimeline().lastInstant().get().getTimestamp(); + logger.info("Savepointing latest commit " + latestCommit); + return savepoint(latestCommit, savePointMetadata); + } + + /** + * Savepoint a specific commit. The data files and commit files for that commit will never be rolledback, + * cleaned or archived. This gives an option to rollback the state to the savepoint anytime. + * Savepoint needs to be manually created and deleted. + * + * Savepoint should be on a commit that is not cleaned. + * + * @param savePointMetadata - metadata about the savepoint + * @return true if the savepoint was created successfully + */ + public boolean savepoint(String commitTime, HoodieSavepointMetadata savePointMetadata) { + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + Optional cleanInstant = table.getCompletedCleanTimeline().lastInstant(); + + HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); + if(!table.getCompletedCommitTimeline().containsInstant(commitInstant)) { + throw new HoodieSavepointException("Could not savepoint non-existing commit " + commitInstant); + } + + try { + // Check the last commit that was not cleaned and check if savepoint time is > that commit + String lastCommitRetained; + if (cleanInstant.isPresent()) { + HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata( + table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get()); + lastCommitRetained = cleanMetadata.getEarliestCommitToRetain(); + } else { + lastCommitRetained = + table.getCompletedCommitTimeline().firstInstant().get().getTimestamp(); + } + + // Cannot allow savepoint time on a commit that could have been cleaned + Preconditions.checkArgument(table.getActiveTimeline() + .compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL), + "Could not savepoint commit " + commitTime + " as this is beyond the lookup window " + + lastCommitRetained); + + // Nothing to save in the savepoint + table.getActiveTimeline().saveAsComplete( + new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, commitTime), + AvroUtils.serializeSavepointMetadata(savePointMetadata)); + logger.info("Savepoint " + commitTime + " created"); + return true; + } catch (IOException e) { + throw new HoodieSavepointException("Failed to savepoint " + commitTime, e); + } + } + + /** + * Delete a savepoint that was created. Once the savepoint is deleted, the commit can be rolledback + * and cleaner may clean up data files. + * + * @param savepointTime - delete the savepoint + * @return true if the savepoint was deleted successfully + */ + public void deleteSavepoint(String savepointTime) { + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + + HoodieInstant savePoint = + new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); + boolean isSavepointPresent = + table.getCompletedSavepointTimeline().containsInstant(savePoint); + if (!isSavepointPresent) { + logger.warn("No savepoint present " + savepointTime); + return; + } + + activeTimeline.revertToInflight(savePoint); + activeTimeline.deleteInflight( + new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime)); + logger.info("Savepoint " + savepointTime + " deleted"); + } + + /** + * Rollback the state to the savepoint. + * WARNING: This rollsback recent commits and deleted data files. Queries accessing the files + * will mostly fail. This should be done during a downtime. + * + * @param savepointTime - savepoint time to rollback to + * @return true if the savepoint was rollecback to successfully + */ + public boolean rollbackToSavepoint(String savepointTime) { + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); + + HoodieInstant savePoint = + new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); + boolean isSavepointPresent = + table.getCompletedSavepointTimeline().containsInstant(savePoint); + if (!isSavepointPresent) { + throw new HoodieRollbackException("No savepoint for commitTime " + savepointTime); + } + + List commitsToRollback = + commitTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants() + .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + logger.info("Rolling back commits " + commitsToRollback); + + rollback(commitsToRollback); + + // Make sure the rollback was successful + Optional lastInstant = + activeTimeline.reload().getCommitTimeline().filterCompletedInstants().lastInstant(); + Preconditions.checkArgument(lastInstant.isPresent()); + Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime), + savepointTime + "is not the last commit after rolling back " + commitsToRollback + + ", last commit was " + lastInstant.get().getTimestamp()); + return true; + } + /** * Rollback the (inflight/committed) record changes with the given commit time. * Three steps: @@ -388,81 +541,135 @@ public class HoodieWriteClient implements Seriali * (4) Finally delete .commit or .inflight file, */ public boolean rollback(final String commitTime) throws HoodieRollbackException { + rollback(Lists.newArrayList(commitTime)); + return true; + } + + + private void rollback(List commits) { + if(commits.isEmpty()) { + logger.info("List of commits to rollback is empty"); + return; + } + final Timer.Context context = metrics.getRollbackCtx(); + String startRollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieTimeline inflightTimeline = activeTimeline.getCommitTimeline().filterInflights(); - HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); + HoodieTimeline inflightTimeline = table.getInflightCommitTimeline(); + HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); + + // Check if any of the commits is a savepoint - do not allow rollback on those commits + List savepoints = + table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + commits.forEach(s -> { + if (savepoints.contains(s)) { + throw new HoodieRollbackException( + "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + + s); + } + }); try { - if (commitTimeline.lastInstant().isPresent() - && !commitTimeline.findInstantsAfter(commitTime, Integer.MAX_VALUE).empty()) { - throw new HoodieRollbackException("Found commits after time :" + commitTime + + if (commitTimeline.empty() && inflightTimeline.empty()) { + // nothing to rollback + logger.info("No commits to rollback " + commits); + } + + // Make sure only the last n commits are being rolled back + // If there is a commit in-between or after that is not rolled back, then abort + String lastCommit = commits.get(commits.size() - 1); + if (!commitTimeline.empty() && !commitTimeline + .findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) { + throw new HoodieRollbackException("Found commits after time :" + lastCommit + ", please rollback greater commits first"); } List inflights = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); - if (!inflights.isEmpty() && inflights.indexOf(commitTime) != inflights.size() - 1) { - throw new HoodieRollbackException( - "Found in-flight commits after time :" + commitTime + - ", please rollback greater commits first"); + if (!inflights.isEmpty() && inflights.indexOf(lastCommit) != inflights.size() - 1) { + throw new HoodieRollbackException( + "Found in-flight commits after time :" + lastCommit + + ", please rollback greater commits first"); } - if (inflights.contains(commitTime) || (commitTimeline.lastInstant().isPresent() - && commitTimeline.lastInstant().get().getTimestamp().equals(commitTime))) { - // 1. Atomically unpublish this commit - if(!inflights.contains(commitTime)) { - // This is completed commit, first revert it to inflight to unpublish data - activeTimeline.revertToInflight( - new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime)); - } - // 2. Revert the index changes - logger.info("Clean out index changes at time: " + commitTime); - if (!index.rollbackCommit(commitTime)) { - throw new HoodieRollbackException( - "Clean out index changes failed, for time :" + commitTime); - } + // Atomically unpublish all the commits + commits.stream().filter(s -> !inflights.contains(s)) + .map(s -> new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s)) + .forEach(activeTimeline::revertToInflight); + logger.info("Unpublished " + commits); - // 3. Delete the new generated parquet files - logger.info("Clean out all parquet files generated at time: " + commitTime); - final Accumulator numFilesDeletedAccu = jsc.accumulator(0); - jsc.parallelize( - FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath())) - .foreach((VoidFunction) partitionPath -> { - // Scan all partitions files with this commit time - FileSystem fs1 = FSUtils.getFs(); - FileStatus[] toBeDeleted = - fs1.listStatus(new Path(config.getBasePath(), partitionPath), - path -> { - return commitTime - .equals(FSUtils.getCommitTime(path.getName())); - }); - for (FileStatus file : toBeDeleted) { - boolean success = fs1.delete(file.getPath(), false); - logger.info("Delete file " + file.getPath() + "\t" + success); - if (success) { - numFilesDeletedAccu.add(1); + // cleanup index entries + commits.stream().forEach(s -> { + if (!index.rollbackCommit(s)) { + throw new HoodieRollbackException( + "Clean out index changes failed, for time :" + s); + } + }); + logger.info("Index rolled back for commits " + commits); + + // delete all the data files for all these commits + logger.info("Clean out all parquet files generated for commits: " + commits); + final LongAccumulator numFilesDeletedCounter = jsc.sc().longAccumulator(); + List stats = jsc.parallelize( + FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath())) + .map((Function) partitionPath -> { + // Scan all partitions files with this commit time + logger.info("Cleaning path " + partitionPath); + FileSystem fs1 = FSUtils.getFs(); + FileStatus[] toBeDeleted = + fs1.listStatus(new Path(config.getBasePath(), partitionPath), path -> { + if(!path.toString().contains(".parquet")) { + return false; } + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commits.contains(fileCommitTime); + }); + Map results = Maps.newHashMap(); + for (FileStatus file : toBeDeleted) { + boolean success = fs1.delete(file.getPath(), false); + results.put(file, success); + logger.info("Delete file " + file.getPath() + "\t" + success); + if (success) { + numFilesDeletedCounter.add(1); } - }); - // 4. Remove commit - logger.info("Clean out metadata files at time: " + commitTime); - activeTimeline.deleteInflight( - new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime)); + } + return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) + .withDeletedFileResults(results).build(); + }).collect(); - if (context != null) { - long durationInMs = metrics.getDurationInMs(context.stop()); - int numFilesDeleted = numFilesDeletedAccu.value(); - metrics.updateRollbackMetrics(durationInMs, numFilesDeleted); - } + // Remove the rolled back inflight commits + commits.stream().map(s -> new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, s)) + .forEach(activeTimeline::deleteInflight); + logger.info("Deleted inflight commits " + commits); + + Optional durationInMs = Optional.empty(); + if (context != null) { + durationInMs = Optional.of(metrics.getDurationInMs(context.stop())); + Long numFilesDeleted = numFilesDeletedCounter.value(); + metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); + } + HoodieRollbackMetadata rollbackMetadata = + AvroUtils.convertRollbackMetadata(startRollbackTime, durationInMs, commits, stats); + table.getActiveTimeline().saveAsComplete( + new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), + AvroUtils.serializeRollbackMetadata(rollbackMetadata)); + logger.info("Commits " + commits + " rollback is complete"); + + if (!table.getActiveTimeline().getCleanerTimeline().empty()) { + logger.info("Cleaning up older rollback meta files"); + // Cleanup of older cleaner meta files + // TODO - make the commit archival generic and archive rollback metadata + FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), + table.getActiveTimeline().getRollbackTimeline().getInstants()); } - return true; } catch (IOException e) { throw new HoodieRollbackException("Failed to rollback " + - config.getBasePath() + " at commit time" + commitTime, e); + config.getBasePath() + " commits " + commits, e); } } @@ -476,37 +683,58 @@ public class HoodieWriteClient implements Seriali /** * Clean up any stale/old files/data lying around (either on file storage or index storage) */ - private void clean() throws HoodieIOException { + private void clean(String startCleanTime) throws HoodieIOException { try { logger.info("Cleaner started"); final Timer.Context context = metrics.getCleanCtx(); + // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); - List partitionsToClean = FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()); + List partitionsToClean = + FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()); // shuffle to distribute cleaning work across partitions evenly Collections.shuffle(partitionsToClean); - logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy()); - if(partitionsToClean.isEmpty()) { + logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config + .getCleanerPolicy()); + if (partitionsToClean.isEmpty()) { logger.info("Nothing to clean here mom. It is already clean"); return; } int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); - int numFilesDeleted = jsc.parallelize(partitionsToClean, cleanerParallelism) - .map((Function) partitionPathToClean -> { + List cleanStats = jsc.parallelize(partitionsToClean, cleanerParallelism) + .map((Function) partitionPathToClean -> { HoodieCleaner cleaner = new HoodieCleaner(table, config); return cleaner.clean(partitionPathToClean); }) - .reduce((Function2) (v1, v2) -> v1 + v2); + .collect(); - logger.info("Cleaned " + numFilesDeleted + " files"); // Emit metrics (duration, numFilesDeleted) if needed + Optional durationInMs = Optional.empty(); if (context != null) { - long durationInMs = metrics.getDurationInMs(context.stop()); - logger.info("cleanerElaspsedTime (Minutes): " + durationInMs / (1000 * 60)); - metrics.updateCleanMetrics(durationInMs, numFilesDeleted); + durationInMs = Optional.of(metrics.getDurationInMs(context.stop())); + logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); + } + + // Create the metadata and save it + HoodieCleanMetadata metadata = + AvroUtils.convertCleanMetadata(startCleanTime, durationInMs, cleanStats); + logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"); + metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), + metadata.getTotalFilesDeleted()); + + table.getActiveTimeline().saveAsComplete( + new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, startCleanTime), + AvroUtils.serializeCleanMetadata(metadata)); + logger.info("Marked clean started on " + startCleanTime + " as complete"); + + if (!table.getActiveTimeline().getCleanerTimeline().empty()) { + // Cleanup of older cleaner meta files + // TODO - make the commit archival generic and archive clean metadata + FSUtils.deleteOlderCleanMetaFiles(fs, table.getMetaClient().getMetaPath(), + table.getActiveTimeline().getCleanerTimeline().getInstants()); } } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index b6be693b8..a431adf4a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -17,7 +17,7 @@ package com.uber.hoodie.config; import com.google.common.base.Preconditions; -import com.uber.hoodie.io.HoodieCleaner; +import com.uber.hoodie.common.model.HoodieCleaningPolicy; import javax.annotation.concurrent.Immutable; import java.io.File; @@ -32,7 +32,7 @@ import java.util.Properties; public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy"; private static final String DEFAULT_CLEANER_POLICY = - HoodieCleaner.CleaningPolicy.KEEP_LATEST_COMMITS.name(); + HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(); public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions.retained"; @@ -94,7 +94,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { } } - public Builder withCleanerPolicy(HoodieCleaner.CleaningPolicy policy) { + public Builder withCleanerPolicy(HoodieCleaningPolicy policy) { props.setProperty(CLEANER_POLICY_PROP, policy.name()); return this; } @@ -164,7 +164,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(CLEANER_PARALLELISM), CLEANER_PARALLELISM, DEFAULT_CLEANER_PARALLELISM); - HoodieCleaner.CleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); + HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); Preconditions.checkArgument( Integer.parseInt(props.getProperty(MAX_COMMITS_TO_KEEP)) > Integer .parseInt(props.getProperty(MIN_COMMITS_TO_KEEP))); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index b95094676..f6058ab5f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -18,6 +18,7 @@ package com.uber.hoodie.config; import com.google.common.base.Preconditions; +import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.HoodieCleaner; import com.uber.hoodie.metrics.MetricsReporterType; @@ -97,8 +98,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { /** * compaction properties **/ - public HoodieCleaner.CleaningPolicy getCleanerPolicy() { - return HoodieCleaner.CleaningPolicy + public HoodieCleaningPolicy getCleanerPolicy() { + return HoodieCleaningPolicy .valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_POLICY_PROP)); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieSavepointException.java b/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieSavepointException.java new file mode 100644 index 000000000..83e1bd134 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/exception/HoodieSavepointException.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.exception; + +public class HoodieSavepointException extends HoodieException { + + public HoodieSavepointException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieSavepointException(String msg) { + super(msg); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java index 07d1780b5..ced3025db 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleaner.java @@ -16,6 +16,10 @@ package com.uber.hoodie.io; +import com.clearspring.analytics.util.Lists; +import com.google.common.collect.Maps; +import com.uber.hoodie.common.HoodieCleanStat; +import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; @@ -33,6 +37,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -47,13 +53,6 @@ import java.util.stream.Collectors; public class HoodieCleaner { private static Logger logger = LogManager.getLogger(HoodieCleaner.class); - - public enum CleaningPolicy { - KEEP_LATEST_FILE_VERSIONS, - KEEP_LATEST_COMMITS - } - - private final TableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; private HoodieTable hoodieTable; @@ -86,13 +85,18 @@ public class HoodieCleaner { fileSystemView.getEveryVersionInPartition(partitionPath) .collect(Collectors.toList()); List deletePaths = new ArrayList<>(); + List savepoints = hoodieTable.getSavepoints(); for (List versionsForFileId : fileVersions) { int keepVersions = config.getCleanerFileVersionsRetained(); Iterator commitItr = versionsForFileId.iterator(); while (commitItr.hasNext() && keepVersions > 0) { // Skip this most recent version - commitItr.next(); + HoodieDataFile next = commitItr.next(); + if(savepoints.contains(next.getCommitTime())) { + // do not clean datafiles that are savepointed + continue; + } keepVersions--; } // Delete the remaining files @@ -130,6 +134,8 @@ public class HoodieCleaner { "Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); List deletePaths = new ArrayList<>(); + List savepoints = hoodieTable.getSavepoints(); + // determine if we have enough commits, to start cleaning. if (commitTimeline.countInstants() > commitsRetained) { HoodieInstant earliestCommitToRetain = @@ -146,6 +152,10 @@ public class HoodieCleaner { // i.e always spare the last commit. for (HoodieDataFile afile : fileList) { String fileCommitTime = afile.getCommitTime(); + if(savepoints.contains(fileCommitTime)) { + // do not clean up a savepoint data file + continue; + } // Dont delete the latest commit and also the last commit before the earliest commit we are retaining // The window of commit retain == max query run time. So a query could be running which still // uses this file. @@ -196,30 +206,42 @@ public class HoodieCleaner { * * @throws IllegalArgumentException if unknown cleaning policy is provided */ - public int clean(String partitionPath) throws IOException { - CleaningPolicy policy = config.getCleanerPolicy(); + public HoodieCleanStat clean(String partitionPath) throws IOException { + HoodieCleaningPolicy policy = config.getCleanerPolicy(); List deletePaths; - if (policy == CleaningPolicy.KEEP_LATEST_COMMITS) { + Optional earliestCommitToRetain = Optional.empty(); + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); - } else if (policy == CleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { + int commitsRetained = config.getCleanerCommitsRetained(); + if (commitTimeline.countInstants() > commitsRetained) { + earliestCommitToRetain = + commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); + } + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath); } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } // perform the actual deletes + Map deletedFiles = Maps.newHashMap(); for (String deletePath : deletePaths) { logger.info("Working on delete path :" + deletePath); FileStatus[] deleteVersions = fs.globStatus(new Path(deletePath)); if (deleteVersions != null) { for (FileStatus deleteVersion : deleteVersions) { - if (fs.delete(deleteVersion.getPath(), false)) { - logger.info("Cleaning file at path :" + deleteVersion.getPath()); + boolean deleteResult = fs.delete(deleteVersion.getPath(), false); + deletedFiles.put(deleteVersion, deleteResult); + if (deleteResult) { + logger.info("Cleaned file at path :" + deleteVersion.getPath()); } } } } - logger.info(deletePaths.size() + " files deleted for partition path:" + partitionPath); - return deletePaths.size(); + + logger.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath); + return HoodieCleanStat.newBuilder().withPolicy(policy).withDeletePathPattern(deletePaths) + .withPartitionPath(partitionPath).withEarliestCommitRetained(earliestCommitToRetain) + .withDeletedFileResults(deletedFiles).build(); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index dfe8f8d10..2579d8a10 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -25,6 +25,7 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.common.file.HoodieAppendLog; import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -71,17 +72,17 @@ public class HoodieCommitArchiveLog { } private Stream getCommitsToArchive() { + int maxCommitsToKeep = config.getMaxCommitsToKeep(); int minCommitsToKeep = config.getMinCommitsToKeep(); - HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(fs, config.getBasePath(), true); - HoodieTimeline commitTimeline = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); + List savepoints = table.getSavepoints(); if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) { // Actually do the commits - return commitTimeline.getInstants() + return commitTimeline.getInstants().filter(s -> !savepoints.contains(s.getTimestamp())) .limit(commitTimeline.countInstants() - minCommitsToKeep); } return Stream.empty(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java index 066064119..c0dee102d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java @@ -98,7 +98,7 @@ public class HoodieMetrics { } } - public void updateRollbackMetrics(long durationInMs, int numFilesDeleted) { + public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) { if (config.isMetricsOn()) { logger.info(String.format("Sending rollback metrics (duration=%d, numFilesDeleted=$d)", durationInMs, numFilesDeleted)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 274e0c9ab..984ef155b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -21,6 +21,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; @@ -35,6 +36,7 @@ import org.apache.spark.Partitioner; import java.io.Serializable; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; /** * Abstract implementation of a HoodieTable @@ -111,6 +113,36 @@ public abstract class HoodieTable implements Seri return getCommitTimeline().filterCompletedInstants(); } + /** + * Get only the inflights (no-completed) commit timeline + * @return + */ + public HoodieTimeline getInflightCommitTimeline() { + return getCommitTimeline().filterInflights(); + } + + + /** + * Get only the completed (no-inflights) clean timeline + * @return + */ + public HoodieTimeline getCompletedCleanTimeline() { + return getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); + } + + /** + * Get only the completed (no-inflights) savepoint timeline + * @return + */ + public HoodieTimeline getCompletedSavepointTimeline() { + return getActiveTimeline().getSavePointTimeline().filterCompletedInstants(); + } + + public List getSavepoints() { + return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + } + public HoodieActiveTimeline getActiveTimeline() { return metaClient.getActiveTimeline(); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index aa83eec89..a46182b50 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -18,8 +18,10 @@ package com.uber.hoodie; import com.google.common.collect.Iterables; +import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; @@ -29,6 +31,7 @@ import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ParquetUtils; @@ -38,11 +41,9 @@ import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.io.HoodieCleaner; import com.uber.hoodie.table.HoodieTable; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; @@ -58,6 +59,8 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -71,6 +74,7 @@ import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestHoodieClient implements Serializable { private transient JavaSparkContext jsc = null; @@ -335,12 +339,206 @@ public class TestHoodieClient implements Serializable { readClient.readSince("000").count()); } + + @Test + public void testCreateSavepoint() throws Exception { + HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1) + .build()).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + FileSystem fs = FSUtils.getFs(); + + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + List records = dataGen.generateInserts(newCommitTime, 200); + List statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(statuses); + + /** + * Write 2 (updates) + */ + newCommitTime = "002"; + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + client.savepoint(new HoodieSavepointMetadata("hoodie-unit-test", + HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()), "test")); + try { + client.rollback(newCommitTime); + fail("Rollback of a savepoint was allowed " + newCommitTime); + } catch (HoodieRollbackException e) { + // this is good + } + + /** + * Write 3 (updates) + */ + newCommitTime = "003"; + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + /** + * Write 4 (updates) + */ + newCommitTime = "004"; + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + final TableFileSystemView view = table.getFileSystemView(); + List dataFiles = partitionPaths.stream().flatMap(s -> { + Stream> files = view.getEveryVersionInPartition(s); + return files.flatMap(Collection::stream).filter(f -> f.getCommitTime().equals("002")); + }).collect(Collectors.toList()); + + assertEquals("The data files for commit 002 should not be cleaned", 3, dataFiles.size()); + + // Delete savepoint + assertFalse(table.getCompletedSavepointTimeline().empty()); + client.deleteSavepoint( + table.getCompletedSavepointTimeline().getInstants().findFirst().get().getTimestamp()); + // rollback and reupsert 004 + client.rollback(newCommitTime); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = new HoodieTableMetaClient(fs, basePath); + table = HoodieTable.getHoodieTable(metaClient, getConfig()); + final TableFileSystemView view1 = table.getFileSystemView(); + dataFiles = partitionPaths.stream().flatMap(s -> { + Stream> files = view1.getEveryVersionInPartition(s); + return files.flatMap(Collection::stream).filter(f -> f.getCommitTime().equals("002")); + }).collect(Collectors.toList()); + + assertEquals("The data files for commit 002 should be cleaned now", 0, dataFiles.size()); + } + + + @Test + public void testRollbackToSavepoint() throws Exception { + HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig( + HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1) + .build()).build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + FileSystem fs = FSUtils.getFs(); + + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + /** + * Write 2 (updates) + */ + newCommitTime = "002"; + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + client.savepoint(new HoodieSavepointMetadata("hoodie-unit-test", + HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()), "test")); + + /** + * Write 3 (updates) + */ + newCommitTime = "003"; + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + final TableFileSystemView view1 = table.getFileSystemView(); + + List dataFiles = partitionPaths.stream().flatMap(s -> { + Stream> files = view1.getEveryVersionInPartition(s); + return files.flatMap(Collection::stream).filter(f -> f.getCommitTime().equals("003")); + }).collect(Collectors.toList()); + assertEquals("The data files for commit 003 should be present", 3, dataFiles.size()); + + + /** + * Write 4 (updates) + */ + newCommitTime = "004"; + records = dataGen.generateUpdates(newCommitTime, records); + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = new HoodieTableMetaClient(fs, basePath); + table = HoodieTable.getHoodieTable(metaClient, getConfig()); + final TableFileSystemView view2 = table.getFileSystemView(); + + dataFiles = partitionPaths.stream().flatMap(s -> { + Stream> files = view2.getEveryVersionInPartition(s); + return files.flatMap(Collection::stream).filter(f -> f.getCommitTime().equals("004")); + }).collect(Collectors.toList()); + assertEquals("The data files for commit 004 should be present", 3, dataFiles.size()); + + + // rolling back to a non existent savepoint must not succeed + try { + client.rollbackToSavepoint("001"); + fail("Rolling back to non-existent savepoint should not be allowed"); + } catch (HoodieRollbackException e) { + // this is good + } + + // rollback to savepoint 002 + HoodieInstant savepoint = + table.getCompletedSavepointTimeline().getInstants().findFirst().get(); + client.rollbackToSavepoint(savepoint.getTimestamp()); + + metaClient = new HoodieTableMetaClient(fs, basePath); + table = HoodieTable.getHoodieTable(metaClient, getConfig()); + final TableFileSystemView view3 = table.getFileSystemView(); + dataFiles = partitionPaths.stream().flatMap(s -> { + Stream> files = view3.getEveryVersionInPartition(s); + return files.flatMap(Collection::stream).filter(f -> f.getCommitTime().equals("002")); + }).collect(Collectors.toList()); + assertEquals("The data files for commit 002 be available", 3, dataFiles.size()); + + dataFiles = partitionPaths.stream().flatMap(s -> { + Stream> files = view3.getEveryVersionInPartition(s); + return files.flatMap(Collection::stream).filter(f -> f.getCommitTime().equals("003")); + }).collect(Collectors.toList()); + assertEquals("The data files for commit 003 should be rolled back", 0, dataFiles.size()); + + dataFiles = partitionPaths.stream().flatMap(s -> { + Stream> files = view3.getEveryVersionInPartition(s); + return files.flatMap(Collection::stream).filter(f -> f.getCommitTime().equals("004")); + }).collect(Collectors.toList()); + assertEquals("The data files for commit 004 should be rolled back", 0, dataFiles.size()); + } + + @Test public void testInsertAndCleanByVersions() throws Exception { int maxVersions = 2; // keep upto 2 versions for each file HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig( HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaner.CleaningPolicy.KEEP_LATEST_FILE_VERSIONS) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) .retainFileVersions(maxVersions).build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); @@ -365,6 +563,13 @@ public class TestHoodieClient implements Serializable { // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + assertFalse(table.getCompletedCommitTimeline().empty()); + String commitTime = + table.getCompletedCommitTimeline().getInstants().findFirst().get().getTimestamp(); + assertFalse(table.getCompletedCleanTimeline().empty()); + assertEquals("The clean instant should be the same as the commit instant", commitTime, + table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp()); + List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table).collect(); checkTaggedRecords(taggedRecords, newCommitTime); @@ -425,7 +630,7 @@ public class TestHoodieClient implements Serializable { int maxCommits = 3; // keep upto 3 commits from the past HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig( HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaner.CleaningPolicy.KEEP_LATEST_FILE_VERSIONS) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) .retainCommits(maxCommits).build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); @@ -450,6 +655,13 @@ public class TestHoodieClient implements Serializable { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + assertFalse(table.getCompletedCommitTimeline().empty()); + String commitTime = + table.getCompletedCommitTimeline().getInstants().findFirst().get().getTimestamp(); + assertFalse(table.getCompletedCleanTimeline().empty()); + assertEquals("The clean instant should be the same as the commit instant", commitTime, + table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp()); + List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table).collect(); checkTaggedRecords(taggedRecords, newCommitTime); @@ -843,6 +1055,7 @@ public class TestHoodieClient implements Serializable { + @After public void clean() { if (basePath != null) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java index 8ee4c8863..c2b37ee7e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCleaner.java @@ -16,7 +16,9 @@ package com.uber.hoodie.io; +import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.util.FSUtils; @@ -26,6 +28,10 @@ import com.uber.hoodie.table.HoodieTable; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import static org.junit.Assert.*; /** @@ -47,7 +53,7 @@ public class TestHoodieCleaner { public void testKeepLatestFileVersions() throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaner.CleaningPolicy.KEEP_LATEST_FILE_VERSIONS) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) .retainFileVersions(1).build()).build(); // make 1 commit, with 1 file per partition @@ -60,8 +66,8 @@ public class TestHoodieCleaner { HoodieTable table = HoodieTable.getHoodieTable(metadata, config); HoodieCleaner cleaner = new HoodieCleaner(table, config); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0])); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1])); + assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1]).getSuccessDeleteFiles().size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); @@ -76,8 +82,8 @@ public class TestHoodieCleaner { table = HoodieTable.getHoodieTable(metadata, config); cleaner = new HoodieCleaner(table, config); - assertEquals("Must clean 1 file" , 1, cleaner.clean(partitionPaths[0])); - assertEquals("Must clean 1 file" , 1, cleaner.clean(partitionPaths[1])); + assertEquals("Must clean 1 file" , 1, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals("Must clean 1 file" , 1, cleaner.clean(partitionPaths[1]).getSuccessDeleteFiles().size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "001", file2P1C1)); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); @@ -93,14 +99,14 @@ public class TestHoodieCleaner { table = HoodieTable.getHoodieTable(metadata, config); cleaner = new HoodieCleaner(table, config); - assertEquals("Must clean two files" , 2, cleaner.clean(partitionPaths[0])); + assertEquals("Must clean two files" , 2, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); // No cleaning on partially written file, with no commit. HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file3P0C2); // update - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0])); + assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); } @@ -109,7 +115,7 @@ public class TestHoodieCleaner { public void testKeepLatestCommits() throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withCleanerPolicy(HoodieCleaner.CleaningPolicy.KEEP_LATEST_COMMITS) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) .retainCommits(2).build()).build(); @@ -123,8 +129,8 @@ public class TestHoodieCleaner { HoodieTable table = HoodieTable.getHoodieTable(metadata, config); HoodieCleaner cleaner = new HoodieCleaner(table, config); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0])); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1])); + assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1]).getSuccessDeleteFiles().size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "000", file1P1C0)); @@ -139,8 +145,8 @@ public class TestHoodieCleaner { table = HoodieTable.getHoodieTable(metadata, config); cleaner = new HoodieCleaner(table, config); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0])); - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1])); + assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); + assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[1]).getSuccessDeleteFiles().size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[1], "001", file2P1C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); @@ -158,7 +164,7 @@ public class TestHoodieCleaner { cleaner = new HoodieCleaner(table, config); assertEquals( "Must not clean any file. We have to keep 1 version before the latest commit time to keep", - 0, cleaner.clean(partitionPaths[0])); + 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); @@ -173,7 +179,7 @@ public class TestHoodieCleaner { cleaner = new HoodieCleaner(table, config); assertEquals( - "Must not clean one old file", 1, cleaner.clean(partitionPaths[0])); + "Must not clean one old file", 1, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); @@ -185,7 +191,7 @@ public class TestHoodieCleaner { // No cleaning on partially written file, with no commit. HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "004", file3P0C2); // update - assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0])); + assertEquals("Must not clean any files" , 0, cleaner.clean(partitionPaths[0]).getSuccessDeleteFiles().size()); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "001", file2P0C1)); } diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index bb1929813..6ec8c07a6 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -47,6 +47,10 @@ org.apache.rat apache-rat-plugin + + org.apache.avro + avro-maven-plugin + diff --git a/hoodie-common/src/main/avro/HoodieCleanMetadata.avsc b/hoodie-common/src/main/avro/HoodieCleanMetadata.avsc new file mode 100644 index 000000000..3e55a97e9 --- /dev/null +++ b/hoodie-common/src/main/avro/HoodieCleanMetadata.avsc @@ -0,0 +1,24 @@ +{"namespace": "com.uber.hoodie.avro.model", + "type": "record", + "name": "HoodieCleanMetadata", + "fields": [ + {"name": "startCleanTime", "type": "string"}, + {"name": "timeTakenInMillis", "type": "long"}, + {"name": "totalFilesDeleted", "type": "int"}, + {"name": "earliestCommitToRetain", "type": "string"}, + {"name": "partitionMetadata", "type": { + "type" : "map", "values" : { + "type": "record", + "name": "HoodieCleanPartitionMetadata", + "fields": [ + {"name": "partitionPath", "type": "string"}, + {"name": "policy", "type": "string"}, + {"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}}, + {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}}, + {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}} + ] + } + } + } + ] +} diff --git a/hoodie-common/src/main/avro/HoodieRollbackMetadata.avsc b/hoodie-common/src/main/avro/HoodieRollbackMetadata.avsc new file mode 100644 index 000000000..60d2d63a3 --- /dev/null +++ b/hoodie-common/src/main/avro/HoodieRollbackMetadata.avsc @@ -0,0 +1,22 @@ +{"namespace": "com.uber.hoodie.avro.model", + "type": "record", + "name": "HoodieRollbackMetadata", + "fields": [ + {"name": "startRollbackTime", "type": "string"}, + {"name": "timeTakenInMillis", "type": "long"}, + {"name": "totalFilesDeleted", "type": "int"}, + {"name": "commitsRollback", "type": {"type": "array", "items": "string"}}, + {"name": "partitionMetadata", "type": { + "type" : "map", "values" : { + "type": "record", + "name": "HoodieRollbackPartitionMetadata", + "fields": [ + {"name": "partitionPath", "type": "string"}, + {"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}}, + {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}} + ] + } + } + } + ] +} diff --git a/hoodie-common/src/main/avro/HoodieSavePointMetadata.avsc b/hoodie-common/src/main/avro/HoodieSavePointMetadata.avsc new file mode 100644 index 000000000..25c36591e --- /dev/null +++ b/hoodie-common/src/main/avro/HoodieSavePointMetadata.avsc @@ -0,0 +1,9 @@ +{"namespace": "com.uber.hoodie.avro.model", + "type": "record", + "name": "HoodieSavepointMetadata", + "fields": [ + {"name": "savepointedBy", "type": "string"}, + {"name": "savepointedAt", "type": "string"}, + {"name": "comments", "type": "string"} + ] +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieCleanStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieCleanStat.java new file mode 100644 index 000000000..f0e2a9bdf --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieCleanStat.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common; + +import com.uber.hoodie.common.model.HoodieCleaningPolicy; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import org.apache.hadoop.fs.FileStatus; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Collects stats about a single partition clean operation + */ +public class HoodieCleanStat implements Serializable { + // Policy used + private final HoodieCleaningPolicy policy; + // Partition path cleaned + private final String partitionPath; + // The patterns that were generated for the delete operation + private final List deletePathPatterns; + private final List successDeleteFiles; + // Files that could not be deleted + private final List failedDeleteFiles; + // Earliest commit that was retained in this clean + private final String earliestCommitToRetain; + + public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, + List deletePathPatterns, List successDeleteFiles, + List failedDeleteFiles, String earliestCommitToRetain) { + this.policy = policy; + this.partitionPath = partitionPath; + this.deletePathPatterns = deletePathPatterns; + this.successDeleteFiles = successDeleteFiles; + this.failedDeleteFiles = failedDeleteFiles; + this.earliestCommitToRetain = earliestCommitToRetain; + } + + public HoodieCleaningPolicy getPolicy() { + return policy; + } + + public String getPartitionPath() { + return partitionPath; + } + + public List getDeletePathPatterns() { + return deletePathPatterns; + } + + public List getSuccessDeleteFiles() { + return successDeleteFiles; + } + + public List getFailedDeleteFiles() { + return failedDeleteFiles; + } + + public String getEarliestCommitToRetain() { + return earliestCommitToRetain; + } + + public static HoodieCleanStat.Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private HoodieCleaningPolicy policy; + private List deletePathPatterns; + private List successDeleteFiles; + private List failedDeleteFiles; + private String partitionPath; + private String earliestCommitToRetain; + + public Builder withPolicy(HoodieCleaningPolicy policy) { + this.policy = policy; + return this; + } + + public Builder withDeletePathPattern(List deletePathPatterns) { + this.deletePathPatterns = deletePathPatterns; + return this; + } + + public Builder withDeletedFileResults(Map deletedFiles) { + //noinspection Convert2MethodRef + successDeleteFiles = deletedFiles.entrySet().stream().filter(s -> s.getValue()) + .map(s -> s.getKey().getPath().toString()).collect(Collectors.toList()); + failedDeleteFiles = deletedFiles.entrySet().stream().filter(s -> !s.getValue()) + .map(s -> s.getKey().getPath().toString()).collect(Collectors.toList()); + return this; + } + + public Builder withPartitionPath(String partitionPath) { + this.partitionPath = partitionPath; + return this; + } + + public Builder withEarliestCommitRetained(Optional earliestCommitToRetain) { + this.earliestCommitToRetain = (earliestCommitToRetain.isPresent()) ? + earliestCommitToRetain.get().getTimestamp() : + "-1"; + return this; + } + + public HoodieCleanStat build() { + return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, + successDeleteFiles, failedDeleteFiles, earliestCommitToRetain); + } + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieRollbackStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieRollbackStat.java new file mode 100644 index 000000000..c809538fd --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieRollbackStat.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common; + +import com.uber.hoodie.common.model.HoodieCleaningPolicy; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import org.apache.hadoop.fs.FileStatus; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Collects stats about a single partition clean operation + */ +public class HoodieRollbackStat implements Serializable { + // Partition path + private final String partitionPath; + private final List successDeleteFiles; + // Files that could not be deleted + private final List failedDeleteFiles; + + public HoodieRollbackStat(String partitionPath, List successDeleteFiles, + List failedDeleteFiles) { + this.partitionPath = partitionPath; + this.successDeleteFiles = successDeleteFiles; + this.failedDeleteFiles = failedDeleteFiles; + } + + public String getPartitionPath() { + return partitionPath; + } + + public List getSuccessDeleteFiles() { + return successDeleteFiles; + } + + public List getFailedDeleteFiles() { + return failedDeleteFiles; + } + + public static HoodieRollbackStat.Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private List successDeleteFiles; + private List failedDeleteFiles; + private String partitionPath; + + public Builder withDeletedFileResults(Map deletedFiles) { + //noinspection Convert2MethodRef + successDeleteFiles = deletedFiles.entrySet().stream().filter(s -> s.getValue()) + .map(s -> s.getKey().getPath().toString()).collect(Collectors.toList()); + failedDeleteFiles = deletedFiles.entrySet().stream().filter(s -> !s.getValue()) + .map(s -> s.getKey().getPath().toString()).collect(Collectors.toList()); + return this; + } + + public Builder withPartitionPath(String partitionPath) { + this.partitionPath = partitionPath; + return this; + } + + public HoodieRollbackStat build() { + return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles); + } + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCleaningPolicy.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCleaningPolicy.java new file mode 100644 index 000000000..c351ef1b9 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCleaningPolicy.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +public enum HoodieCleaningPolicy { + KEEP_LATEST_FILE_VERSIONS, + KEEP_LATEST_COMMITS +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index 67f36121b..1844ca8a1 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -41,6 +41,7 @@ public interface HoodieTimeline extends Serializable { String COMMIT_ACTION = "commit"; String DELTA_COMMIT_ACTION = "deltacommit"; String CLEAN_ACTION = "clean"; + String ROLLBACK_ACTION = "rollback"; String SAVEPOINT_ACTION = "savepoint"; String COMPACTION_ACTION = "compaction"; String INFLIGHT_EXTENSION = ".inflight"; @@ -48,12 +49,14 @@ public interface HoodieTimeline extends Serializable { String COMMIT_EXTENSION = "." + COMMIT_ACTION; String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; String CLEAN_EXTENSION = "." + CLEAN_ACTION; + String ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION; String SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION; String COMPACTION_EXTENSION = "." + COMPACTION_ACTION; //this is to preserve backwards compatibility on commit in-flight filenames String INFLIGHT_COMMIT_EXTENSION = INFLIGHT_EXTENSION; String INFLIGHT_DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; + String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_COMPACTION_EXTENSION = "." + COMPACTION_ACTION + INFLIGHT_EXTENSION; @@ -191,6 +194,14 @@ public interface HoodieTimeline extends Serializable { return instant + HoodieTimeline.INFLIGHT_CLEAN_EXTENSION; } + static String makeRollbackFileName(String instant) { + return instant + HoodieTimeline.ROLLBACK_EXTENSION; + } + + static String makeInflightRollbackFileName(String instant) { + return instant + HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION; + } + static String makeInflightSavePointFileName(String commitTime) { return commitTime + HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 1ffed5930..2345d8326 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -87,8 +87,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public HoodieActiveTimeline(FileSystem fs, String metaPath) { this(fs, metaPath, new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, - INFLIGHT_DELTA_COMMIT_EXTENSION, COMPACTION_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, - CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, COMPACTION_EXTENSION}); + INFLIGHT_DELTA_COMMIT_EXTENSION, COMPACTION_EXTENSION, + INFLIGHT_COMPACTION_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, + CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION}); } /** @@ -160,6 +161,16 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { (Function> & Serializable) this::getInstantDetails); } + /** + * Get only the rollback action (inflight and completed) in the active timeline + * + * @return + */ + public HoodieTimeline getRollbackTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION), + (Function> & Serializable) this::getInstantDetails); + } + /** * Get only the save point action (inflight and completed) in the active timeline * diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java index 24a666194..504a7ed1c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java @@ -89,6 +89,10 @@ public class HoodieInstant implements Serializable { return isInflight ? HoodieTimeline.makeInflightCleanerFileName(timestamp) : HoodieTimeline.makeCleanerFileName(timestamp); + } else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) { + return isInflight ? + HoodieTimeline.makeInflightRollbackFileName(timestamp) : + HoodieTimeline.makeRollbackFileName(timestamp); } else if (HoodieTimeline.SAVEPOINT_ACTION.equals(action)) { return isInflight ? HoodieTimeline.makeInflightSavePointFileName(timestamp) : diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index 5aa39070e..8196abab5 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -16,24 +16,45 @@ package com.uber.hoodie.common.util; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.uber.hoodie.avro.model.HoodieCleanMetadata; +import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackMetadata; +import com.uber.hoodie.avro.model.HoodieRollbackPartitionMetadata; +import com.uber.hoodie.avro.model.HoodieSavepointMetadata; +import com.uber.hoodie.common.HoodieCleanStat; +import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.exception.HoodieIOException; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.file.SeekableInput; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hadoop.fs.AvroFSInput; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import java.io.ByteArrayOutputStream; +import java.io.FileWriter; import java.io.IOException; import java.util.List; import java.util.Optional; @@ -68,4 +89,85 @@ public class AvroUtils { }); return loadedRecords; } + + + public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, + Optional durationInMs, List cleanStats) { + ImmutableMap.Builder partitionMetadataBuilder = + ImmutableMap.builder(); + int totalDeleted = 0; + String earliestCommitToRetain = null; + for (HoodieCleanStat stat : cleanStats) { + HoodieCleanPartitionMetadata metadata = + new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), + stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), + stat.getDeletePathPatterns()); + partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); + totalDeleted += stat.getSuccessDeleteFiles().size(); + if (earliestCommitToRetain == null) { + // This will be the same for all partitions + earliestCommitToRetain = stat.getEarliestCommitToRetain(); + } + } + return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), + totalDeleted, earliestCommitToRetain, partitionMetadataBuilder.build()); + } + + public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, + Optional durationInMs, List commits, List stats) { + ImmutableMap.Builder partitionMetadataBuilder = + ImmutableMap.builder(); + int totalDeleted = 0; + for (HoodieRollbackStat stat : stats) { + HoodieRollbackPartitionMetadata metadata = + new HoodieRollbackPartitionMetadata(stat.getPartitionPath(), + stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles()); + partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); + totalDeleted += stat.getSuccessDeleteFiles().size(); + } + return new HoodieRollbackMetadata(startRollbackTime, durationInMs.orElseGet(() -> -1L), + totalDeleted, commits, partitionMetadataBuilder.build()); + } + + public static Optional serializeCleanMetadata(HoodieCleanMetadata metadata) + throws IOException { + return serializeAvroMetadata(metadata, HoodieCleanMetadata.class); + } + + public static Optional serializeSavepointMetadata(HoodieSavepointMetadata metadata) + throws IOException { + return serializeAvroMetadata(metadata, HoodieSavepointMetadata.class); + } + + public static Optional serializeRollbackMetadata( + HoodieRollbackMetadata rollbackMetadata) throws IOException { + return serializeAvroMetadata(rollbackMetadata, HoodieRollbackMetadata.class); + } + + public static Optional serializeAvroMetadata(T metadata, + Class clazz) throws IOException { + DatumWriter datumWriter = new SpecificDatumWriter<>(clazz); + DataFileWriter fileWriter = new DataFileWriter<>(datumWriter); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + fileWriter.create(metadata.getSchema(), baos); + fileWriter.append(metadata); + fileWriter.flush(); + return Optional.of(baos.toByteArray()); + } + + public static HoodieCleanMetadata deserializeHoodieCleanMetadata(byte[] bytes) + throws IOException { + return deserializeAvroMetadata(bytes, HoodieCleanMetadata.class); + } + + public static T deserializeAvroMetadata(byte[] bytes, + Class clazz) throws IOException { + DatumReader reader = new SpecificDatumReader<>(clazz); + FileReader fileReader = + DataFileReader.openReader(new SeekableByteArrayInput(bytes), reader); + Preconditions + .checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz); + return fileReader.next(); + } + } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index cc8cb0ee8..4385b746b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -19,6 +19,7 @@ package com.uber.hoodie.common.util; import com.google.common.base.Preconditions; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.InvalidHoodiePathException; import org.apache.hadoop.conf.Configuration; @@ -49,6 +50,8 @@ public class FSUtils { // Log files are of this pattern - b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.avro.delta.1 private static final Pattern LOG_FILE_PATTERN = Pattern.compile("(.*)_(.*)\\.(.*)\\.(.*)\\.([0-9]*)"); private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; + private static final long MIN_CLEAN_TO_KEEP = 10; + private static final long MIN_ROLLBACK_TO_KEEP = 10; public static FileSystem getFs() { Configuration conf = new Configuration(); @@ -305,4 +308,31 @@ public class FSUtils { } + public static void deleteOlderCleanMetaFiles(FileSystem fs, String metaPath, + Stream instants) { + //TODO - this should be archived when archival is made general for all meta-data + // skip MIN_CLEAN_TO_KEEP and delete rest + instants.skip(MIN_CLEAN_TO_KEEP).map(s -> { + try { + return fs.delete(new Path(metaPath, s.getFileName()), false); + } catch (IOException e) { + throw new HoodieIOException("Could not delete clean meta files" + s.getFileName(), + e); + } + }); + } + + public static void deleteOlderRollbackMetaFiles(FileSystem fs, String metaPath, + Stream instants) { + //TODO - this should be archived when archival is made general for all meta-data + // skip MIN_ROLLBACK_TO_KEEP and delete rest + instants.skip(MIN_ROLLBACK_TO_KEEP).map(s -> { + try { + return fs.delete(new Path(metaPath, s.getFileName()), false); + } catch (IOException e) { + throw new HoodieIOException( + "Could not delete rollback meta files " + s.getFileName(), e); + } + }); + } } diff --git a/pom.xml b/pom.xml index cfb388dd5..e45a9ccfc 100644 --- a/pom.xml +++ b/pom.xml @@ -252,11 +252,13 @@ **/.* **/*.txt **/*.sh + **/*.log **/dependency-reduced-pom.xml **/test/resources/*.avro **/test/resources/*.data **/test/resources/*.schema **/test/resources/*.csv + **/main/avro/*.avsc @@ -268,6 +270,24 @@ + + org.apache.avro + avro-maven-plugin + 1.7.6 + + + generate-sources + + schema + + + ${project.basedir}/src/main/avro/ + ${project.build.directory}/generated-sources/src/main/java/ + String + + + +