From 9e59da7fd96d8af0587efa2f34c89f56320d4c91 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Wed, 27 Feb 2019 23:43:06 -0800 Subject: [PATCH] Refactor HoodieTable Rollback to write one rollback instant for a batch of commits to rollback --- .../com/uber/hoodie/HoodieWriteClient.java | 267 +++++++++++------- .../hoodie/table/HoodieCopyOnWriteTable.java | 47 ++- .../hoodie/table/HoodieMergeOnReadTable.java | 41 ++- .../com/uber/hoodie/table/HoodieTable.java | 2 +- .../com/uber/hoodie/TestAsyncCompaction.java | 2 +- .../java/com/uber/hoodie/TestCleaner.java | 16 +- .../hoodie/table/TestMergeOnReadTable.java | 2 +- hoodie-common/pom.xml | 1 + .../src/main/avro/HoodieRestoreMetadata.avsc | 17 ++ .../hoodie/common/table/HoodieTimeline.java | 11 + .../table/timeline/HoodieActiveTimeline.java | 11 +- .../common/table/timeline/HoodieInstant.java | 3 + .../uber/hoodie/common/util/AvroUtils.java | 25 +- .../com/uber/hoodie/common/util/FSUtils.java | 14 + 14 files changed, 286 insertions(+), 173 deletions(-) create mode 100644 hoodie-common/src/main/avro/HoodieRestoreMetadata.avsc diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index d9f779861..1f584de20 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -19,10 +19,10 @@ package com.uber.hoodie; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableMap; import com.uber.hoodie.avro.model.HoodieCleanMetadata; import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.avro.model.HoodieRestoreMetadata; import com.uber.hoodie.avro.model.HoodieRollbackMetadata; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; @@ -65,13 +65,13 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.text.ParseException; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; @@ -758,7 +758,7 @@ public class HoodieWriteClient implements Seriali .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); logger.info("Rolling back commits " + commitsToRollback); - rollback(commitsToRollback); + restoreToInstant(savepointTime); // Make sure the rollback was successful Optional lastInstant = activeTimeline.reload().getCommitsAndCompactionTimeline() @@ -776,7 +776,7 @@ public class HoodieWriteClient implements Seriali * files. (4) Finally delete .commit or .inflight file, */ public boolean rollback(final String commitTime) throws HoodieRollbackException { - rollback(Lists.newArrayList(commitTime)); + rollbackInternal(commitTime); return true; } @@ -787,137 +787,190 @@ public class HoodieWriteClient implements Seriali * files and/or append rollback to existing log files. (4) Finally delete .commit, .inflight, .compaction.inflight * or .compaction.requested file */ - public void restoreToCommit(final String commitTime) throws HoodieRollbackException { + public void restoreToInstant(final String instantTime) throws HoodieRollbackException { // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); // Get all the commits on the timeline after the provided commit time - List commitsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants() - .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), commitTime)) + List instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants() + .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime)) .collect(Collectors.toList()); // reverse the commits to descending order of commit time - Collections.reverse(commitsToRollback); - commitsToRollback.stream().forEach(instant -> { - switch (instant.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.DELTA_COMMIT_ACTION: - rollback(instant.getTimestamp()); - break; - case HoodieTimeline.COMPACTION_ACTION: - if (instant.isRequested()) { - // TODO : Get file status and create a rollback stat and file - // TODO : Delete the .aux files along with the instant file, okay for now since the archival process will - // delete these files when it does not see a corresponding instant file under .hoodie - deleteRequestedCompaction(instant.getTimestamp()); - logger.info("Deleted pending scheduled compaction " + instant.getTimestamp()); - } else { - rollback(instant.getTimestamp()); - } - break; - default: - throw new IllegalArgumentException("invalid action name " + instant.getAction()); - } + Collections.reverse(instantsToRollback); + // Start a rollback instant for all commits to be rolled back + String startRollbackInstant = startInstant(); + // Start the timer + final Timer.Context context = startContext(); + ImmutableMap.Builder> instantsToStats = + ImmutableMap.builder(); + instantsToRollback.stream().forEach(instant -> { try { - // Ensure unique rollback instants for seconds granularity - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new HoodieRollbackException("unable to rollback instant " + instant, e); + switch (instant.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.DELTA_COMMIT_ACTION: + List statsForInstant = doRollbackAndGetStats(instant.getTimestamp()); + instantsToStats.put(instant.getTimestamp(), statsForInstant); + break; + case HoodieTimeline.COMPACTION_ACTION: + if (instant.isRequested()) { + // TODO : Get file status and create a rollback stat and file + // TODO : Delete the .aux files along with the instant file, okay for now since the archival process will + // delete these files when it does not see a corresponding instant file under .hoodie + deleteRequestedCompaction(instant.getTimestamp()); + logger.info("Deleted pending scheduled compaction " + instant.getTimestamp()); + } else { + List statsForCompaction = doRollbackAndGetStats(instant.getTimestamp()); + instantsToStats.put(instant.getTimestamp(), statsForCompaction); + } + break; + default: + throw new IllegalArgumentException("invalid action name " + instant.getAction()); + } + } catch (IOException io) { + throw new HoodieRollbackException("unable to rollback instant " + instant, io); } }); + try { + finishRestore(context, instantsToStats.build(), + instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), + startRollbackInstant, instantTime); + } catch (IOException io) { + throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io); + } } - private void rollback(List commitsToRollback) { - if (commitsToRollback.isEmpty()) { - logger.info("List of commits to rollback is empty"); - return; - } + private String startInstant() { + return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + } - final Timer.Context context = metrics.getRollbackCtx(); - String startRollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); + private Timer.Context startContext() { + return metrics.getRollbackCtx(); + } - // Create a Hoodie table which encapsulated the commits and files visible + private List doRollbackAndGetStats(final String commitToRollback) throws + IOException { HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - Set pendingCompactions = - table.getActiveTimeline().filterPendingCompactionTimeline().getInstants() - .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline(); HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - // Check if any of the commits is a savepoint - do not allow rollback on those commits List savepoints = table.getCompletedSavepointTimeline().getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - commitsToRollback.forEach(s -> { - if (savepoints.contains(s)) { + savepoints.stream().forEach(s -> { + if (s.contains(commitToRollback)) { throw new HoodieRollbackException( "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s); } }); + if (commitTimeline.empty() && inflightCommitTimeline.empty()) { + // nothing to rollback + logger.info("No commits to rollback " + commitToRollback); + } + + // Make sure only the last n commits are being rolled back + // If there is a commit in-between or after that is not rolled back, then abort + String lastCommit = commitToRollback; + + if ((lastCommit != null) && !commitTimeline.empty() && !commitTimeline + .findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) { + throw new HoodieRollbackException( + "Found commits after time :" + lastCommit + ", please rollback greater commits first"); + } + + List inflights = inflightCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) { + throw new HoodieRollbackException("Found in-flight commits after time :" + lastCommit + + ", please rollback greater commits first"); + } + + List stats = table.rollback(jsc, commitToRollback, true); + + logger.info("Deleted inflight commits " + commitToRollback); + + // cleanup index entries + if (!index.rollbackCommit(commitToRollback)) { + throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback); + } + logger.info("Index rolled back for commits " + commitToRollback); + return stats; + } + + private void finishRollback(final Timer.Context context, List rollbackStats, + List commitsToRollback, final String startRollbackTime) throws IOException { + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + Optional durationInMs = Optional.empty(); + Long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); + if (context != null) { + durationInMs = Optional.of(metrics.getDurationInMs(context.stop())); + metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); + } + HoodieRollbackMetadata rollbackMetadata = AvroUtils + .convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats); + table.getActiveTimeline().saveAsComplete( + new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), + AvroUtils.serializeRollbackMetadata(rollbackMetadata)); + logger.info("Commits " + commitsToRollback + " rollback is complete"); + + if (!table.getActiveTimeline().getCleanerTimeline().empty()) { + logger.info("Cleaning up older rollback meta files"); + // Cleanup of older cleaner meta files + // TODO - make the commit archival generic and archive rollback metadata + FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), + table.getActiveTimeline().getRollbackTimeline().getInstants()); + } + } + + private void finishRestore(final Timer.Context context, Map> commitToStats, + List commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); + Optional durationInMs = Optional.empty(); + Long numFilesDeleted = 0L; + for (Map.Entry> commitToStat : commitToStats.entrySet()) { + List stats = commitToStat.getValue(); + numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()) + .sum(); + } + if (context != null) { + durationInMs = Optional.of(metrics.getDurationInMs(context.stop())); + metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); + } + HoodieRestoreMetadata restoreMetadata = AvroUtils + .convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats); + table.getActiveTimeline().saveAsComplete( + new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime), + AvroUtils.serializeRestoreMetadata(restoreMetadata)); + logger.info("Commits " + commitsToRollback + " rollback is complete. Restored dataset to " + restoreToInstant); + + if (!table.getActiveTimeline().getCleanerTimeline().empty()) { + logger.info("Cleaning up older restore meta files"); + // Cleanup of older cleaner meta files + // TODO - make the commit archival generic and archive rollback metadata + FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), + table.getActiveTimeline().getRestoreTimeline().getInstants()); + } + } + + private void rollbackInternal(String commitToRollback) { + if (commitToRollback.isEmpty()) { + logger.info("List of commits to rollback is empty"); + return; + } + final String startRollbackTime = startInstant(); + final Timer.Context context = startContext(); + // Create a Hoodie table which encapsulated the commits and files visible try { - if (commitTimeline.empty() && inflightCommitTimeline.empty()) { - // nothing to rollback - logger.info("No commits to rollback " + commitsToRollback); - } - - // Make sure only the last n commits are being rolled back - // If there is a commit in-between or after that is not rolled back, then abort - String lastCommit = null; - if (!commitsToRollback.isEmpty()) { - lastCommit = commitsToRollback.get(commitsToRollback.size() - 1); - } - - if ((lastCommit != null) && !commitTimeline.empty() && !commitTimeline - .findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) { - throw new HoodieRollbackException( - "Found commits after time :" + lastCommit + ", please rollback greater commits first"); - } - - List inflights = inflightCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) { - throw new HoodieRollbackException("Found in-flight commits after time :" + lastCommit - + ", please rollback greater commits first"); - } - - List stats = table.rollback(jsc, commitsToRollback, true); - - logger.info("Deleted inflight commits " + commitsToRollback); - - // cleanup index entries - commitsToRollback.forEach(s -> { - if (!index.rollbackCommit(s)) { - throw new HoodieRollbackException("Rollback index changes failed, for time :" + s); - } - }); - logger.info("Index rolled back for commits " + commitsToRollback); - - Optional durationInMs = Optional.empty(); - if (context != null) { - durationInMs = Optional.of(metrics.getDurationInMs(context.stop())); - Long numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()) - .sum(); - metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); - } - HoodieRollbackMetadata rollbackMetadata = AvroUtils - .convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, stats); - table.getActiveTimeline().saveAsComplete( - new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), - AvroUtils.serializeRollbackMetadata(rollbackMetadata)); - logger.info("Commits " + commitsToRollback + " rollback is complete"); - - if (!table.getActiveTimeline().getCleanerTimeline().empty()) { - logger.info("Cleaning up older rollback meta files"); - // Cleanup of older cleaner meta files - // TODO - make the commit archival generic and archive rollback metadata - FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), - table.getActiveTimeline().getRollbackTimeline().getInstants()); - } + List stats = doRollbackAndGetStats(commitToRollback); + Map> statToCommit = new HashMap<>(); + finishRollback(context, stats, Arrays.asList(commitToRollback), startRollbackTime); } catch (IOException e) { throw new HoodieRollbackException( - "Failed to rollback " + config.getBasePath() + " commits " + commitsToRollback, e); + "Failed to rollback " + config.getBasePath() + " commits " + commitToRollback, e); } } @@ -1251,7 +1304,7 @@ public class HoodieWriteClient implements Seriali */ @VisibleForTesting void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException { - table.rollback(jsc, ImmutableList.copyOf(new String[]{inflightInstant.getTimestamp()}), false); + table.rollback(jsc, inflightInstant.getTimestamp(), false); // Revert instant state file table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index c7a387af8..ea7eceb98 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -330,7 +330,7 @@ public class HoodieCopyOnWriteTable extends Hoodi * Common method used for cleaning out parquet files under a partition path during rollback of a * set of commits */ - protected Map deleteCleanedFiles(Map results, List commits, String + protected Map deleteCleanedFiles(Map results, String commit, String partitionPath) throws IOException { logger.info("Cleaning path " + partitionPath); @@ -338,7 +338,7 @@ public class HoodieCopyOnWriteTable extends Hoodi PathFilter filter = (path) -> { if (path.toString().contains(".parquet")) { String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commits.contains(fileCommitTime); + return commit.equals(fileCommitTime); } return false; }; @@ -352,28 +352,27 @@ public class HoodieCopyOnWriteTable extends Hoodi } @Override - public List rollback(JavaSparkContext jsc, List commits, boolean deleteInstants) + public List rollback(JavaSparkContext jsc, String commit, boolean deleteInstants) throws IOException { String actionType = metaClient.getCommitActionType(); HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); List inflights = this.getInflightCommitTimeline().getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + // Atomically unpublish the commits + if (!inflights.contains(commit)) { + activeTimeline.revertToInflight(new HoodieInstant(false, actionType, commit)); + } + logger.info("Unpublished " + commit); - // Atomically unpublish all the commits - commits.stream().filter(s -> !inflights.contains(s)) - .map(s -> new HoodieInstant(false, actionType, s)) - .forEach(activeTimeline::revertToInflight); - logger.info("Unpublished " + commits); - - // delete all the data files for all these commits - logger.info("Clean out all parquet files generated for commits: " + commits); + // delete all the data files for this commit + logger.info("Clean out all parquet files generated for commit: " + commit); List stats = jsc.parallelize(FSUtils .getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) .map((Function) partitionPath -> { // Scan all partitions files with this commit time final Map filesToDeletedStatus = new HashMap<>(); - deleteCleanedFiles(filesToDeletedStatus, commits, partitionPath); + deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath); return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) .withDeletedFileResults(filesToDeletedStatus).build(); }).collect(); @@ -381,26 +380,26 @@ public class HoodieCopyOnWriteTable extends Hoodi // clean temporary data files cleanTemporaryDataFiles(jsc); - // Delete Inflight instants if enabled - deleteInflightInstants(deleteInstants, activeTimeline, - commits.stream().map(s -> new HoodieInstant(true, actionType, s)).collect(Collectors.toList())); + // Delete Inflight instant if enabled + deleteInflightInstant(deleteInstants, activeTimeline, + new HoodieInstant(true, actionType, commit)); return stats; } /** - * Delete Inflight instants if enabled - * @param deleteInstants Enable Deletion of Inflight instants + * Delete Inflight instant if enabled + * @param deleteInstant Enable Deletion of Inflight instant * @param activeTimeline Hoodie active timeline - * @param instantsToBeDeleted Instants to be deleted + * @param instantToBeDeleted Instant to be deleted */ - protected static void deleteInflightInstants(boolean deleteInstants, HoodieActiveTimeline activeTimeline, - List instantsToBeDeleted) { + protected static void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline, + HoodieInstant instantToBeDeleted) { // Remove the rolled back inflight commits - if (deleteInstants) { - instantsToBeDeleted.forEach(activeTimeline::deleteInflight); - logger.info("Deleted inflight commits " + instantsToBeDeleted); + if (deleteInstant) { + activeTimeline.deleteInflight(instantToBeDeleted); + logger.info("Deleted inflight commit " + instantToBeDeleted); } else { - logger.warn("Rollback finished without deleting inflight instant files. Instants=" + instantsToBeDeleted); + logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 109b440cb..32daa05e7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -174,7 +174,7 @@ public class HoodieMergeOnReadTable extends } @Override - public List rollback(JavaSparkContext jsc, List commits, boolean deleteInstants) + public List rollback(JavaSparkContext jsc, String commit, boolean deleteInstants) throws IOException { // At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental @@ -182,26 +182,23 @@ public class HoodieMergeOnReadTable extends // (commitToRollback). // NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is // required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks. - if (commits.size() > 1) { - throw new UnsupportedOperationException("Bulk Nested Rollbacks are not supported"); - } // Atomically un-publish all non-inflight commits - Map commitsAndCompactions = this.getActiveTimeline() + Optional commitOrCompactionOption = this.getActiveTimeline() .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION)).getInstants() - .filter(i -> commits.contains(i.getTimestamp())) - .collect(Collectors.toMap(HoodieInstant::getTimestamp, i -> i)); - + .filter(i -> commit.equals(i.getTimestamp())) + .findFirst(); + HoodieInstant instantToRollback = commitOrCompactionOption.get(); // Atomically un-publish all non-inflight commits - commitsAndCompactions.entrySet().stream().map(Map.Entry::getValue) - .filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight); - logger.info("Unpublished " + commits); + if (!instantToRollback.isInflight()) { + this.getActiveTimeline().revertToInflight(instantToRollback); + } + logger.info("Unpublished " + commit); Long startTime = System.currentTimeMillis(); List allRollbackStats = jsc.parallelize(FSUtils .getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) - .map((Function>) partitionPath -> commits.stream().map(commit -> { - HoodieInstant instant = commitsAndCompactions.get(commit); + .map((Function) partitionPath -> { HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); HoodieRollbackStat hoodieRollbackStats = null; // Need to put the path filter here since Filter is not serializable @@ -209,18 +206,18 @@ public class HoodieMergeOnReadTable extends PathFilter filter = (path) -> { if (path.toString().contains(".parquet")) { String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commits.contains(fileCommitTime); + return commit.equals(fileCommitTime); } else if (path.toString().contains(".log")) { // Since the baseCommitTime is the only commit for new log files, it's okay here String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return commits.contains(fileCommitTime); + return commit.equals(fileCommitTime); } return false; }; final Map filesToDeletedStatus = new HashMap<>(); - switch (instant.getAction()) { + switch (instantToRollback.getAction()) { case HoodieTimeline.COMMIT_ACTION: try { // Rollback of a commit should delete the newly created parquet files along with any log @@ -244,7 +241,7 @@ public class HoodieMergeOnReadTable extends // and has not yet finished. In this scenario we should delete only the newly created parquet files // and not corresponding base commit log files created with this as baseCommit since updates would // have been written to the log files. - super.deleteCleanedFiles(filesToDeletedStatus, commits, partitionPath); + super.deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath); hoodieRollbackStats = HoodieRollbackStat.newBuilder() .withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus).build(); } else { @@ -286,7 +283,7 @@ public class HoodieMergeOnReadTable extends try { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( metaClient.getCommitTimeline().getInstantDetails( - new HoodieInstant(true, instant.getAction(), instant.getTimestamp())) + new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) .get(), HoodieCommitMetadata.class); // read commit file and (either append delete blocks or delete file) @@ -315,13 +312,11 @@ public class HoodieMergeOnReadTable extends break; } return hoodieRollbackStats; - }).collect(Collectors.toList())).flatMap(List::iterator).filter(Objects::nonNull).collect(); + }).filter(Objects::nonNull).collect(); // Delete Inflight instants if enabled - deleteInflightInstants(deleteInstants, this.getActiveTimeline(), - commitsAndCompactions.entrySet().stream().map( - entry -> new HoodieInstant(true, entry.getValue().getAction(), entry.getValue().getTimestamp())) - .collect(Collectors.toList())); + deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback + .getAction(), instantToRollback.getTimestamp())); logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 749fc6cb2..b8d960904 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -253,7 +253,7 @@ public abstract class HoodieTable implements Seri * Atomically unpublish this commit (2) clean indexing data (3) clean new generated parquet files * / log blocks (4) Finally, delete ..commit or ..inflight file if deleteInstants = true */ - public abstract List rollback(JavaSparkContext jsc, List commits, boolean deleteInstants) + public abstract List rollback(JavaSparkContext jsc, String commit, boolean deleteInstants) throws IOException; /** diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java index 4684eef1e..b8d861484 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java @@ -114,7 +114,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { // Reload and rollback inflight compaction metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); - hoodieTable.rollback(jsc, Arrays.asList(compactionInstantTime), false); + hoodieTable.rollback(jsc, compactionInstantTime, false); client.rollbackInflightCompaction( new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), hoodieTable); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index 76c01097f..98775be88 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -55,7 +55,6 @@ import com.uber.hoodie.table.HoodieTable; import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -628,19 +627,12 @@ public class TestCleaner extends TestHoodieClientBase { assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles()); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withUseTempFolderCopyOnWriteForCreate(false) + .withUseTempFolderCopyOnWriteForCreate(true) .withUseTempFolderCopyOnWriteForMerge(false).build(); - HoodieTable table = HoodieTable.getHoodieTable( - new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, - jsc); - table.rollback(jsc, Collections.emptyList(), true); - assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles()); - - config = HoodieWriteConfig.newBuilder().withPath(basePath).withUseTempFolderCopyOnWriteForCreate(true) - .withUseTempFolderCopyOnWriteForMerge(false).build(); - table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), + HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config + .getBasePath(), true), config, jsc); - table.rollback(jsc, Collections.emptyList(), true); + table.rollback(jsc, "000", true); assertEquals("All temp files are deleted.", 0, getTotalTempFiles()); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index a6fdfedb4..368721440 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -721,7 +721,7 @@ public class TestMergeOnReadTable { copyOfRecords.clear(); // Rollback latest commit first - client.restoreToCommit("000"); + client.restoreToInstant("000"); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml index b49a2ae78..88274265f 100644 --- a/hoodie-common/pom.xml +++ b/hoodie-common/pom.xml @@ -62,6 +62,7 @@ ${basedir}/src/main/avro/HoodieCompactionMetadata.avsc ${basedir}/src/main/avro/HoodieCleanMetadata.avsc ${basedir}/src/main/avro/HoodieRollbackMetadata.avsc + ${basedir}/src/main/avro/HoodieRestoreMetadata.avsc diff --git a/hoodie-common/src/main/avro/HoodieRestoreMetadata.avsc b/hoodie-common/src/main/avro/HoodieRestoreMetadata.avsc new file mode 100644 index 000000000..a96c12d92 --- /dev/null +++ b/hoodie-common/src/main/avro/HoodieRestoreMetadata.avsc @@ -0,0 +1,17 @@ +{"namespace": "com.uber.hoodie.avro.model", + "type": "record", + "name": "HoodieRestoreMetadata", + "fields": [ + {"name": "startRestoreTime", "type": "string"}, + {"name": "timeTakenInMillis", "type": "long"}, + {"name": "instantsToRollback", "type": {"type": "array", "items": "string"}}, + {"name": "hoodieRestoreMetadata", "type": { + "type" : "map", "values" : { + "type": "array", + "default": "null", + "items": "HoodieRollbackMetadata", + "name": "hoodieRollbackMetadata" + } + }} + ] +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index 98dea4479..6c8a20458 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -48,6 +48,7 @@ public interface HoodieTimeline extends Serializable { // (compaction-requested), (compaction-inflight), (completed) String COMPACTION_ACTION = "compaction"; String REQUESTED_EXTENSION = ".requested"; + String RESTORE_ACTION = "restore"; String COMMIT_EXTENSION = "." + COMMIT_ACTION; String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; @@ -66,6 +67,8 @@ public interface HoodieTimeline extends Serializable { StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX); String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION); + String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION; + String RESTORE_EXTENSION = "." + RESTORE_ACTION; /** * Filter this timeline to just include the in-flights @@ -248,6 +251,14 @@ public interface HoodieTimeline extends Serializable { return StringUtils.join(commitTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION); } + static String makeRestoreFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.RESTORE_EXTENSION); + } + + static String makeInflightRestoreFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.INFLIGHT_RESTORE_EXTENSION); + } + static String makeDeltaFileName(String commitTime) { return commitTime + HoodieTimeline.DELTA_COMMIT_EXTENSION; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 2b86448ce..f81f7d119 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -54,7 +54,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList( new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, - CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION})); + CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, + INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); private HoodieTableMetaClient metaClient; @@ -186,6 +187,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { (Function> & Serializable) this::getInstantDetails); } + /** + * Get only the restore action (inflight and completed) in the active timeline + */ + public HoodieTimeline getRestoreTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION), + (Function> & Serializable) this::getInstantDetails); + } + protected Stream filterInstantsByAction(String action) { return instants.stream().filter(s -> s.getAction().equals(action)); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java index 164f4486c..f24eb9dce 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java @@ -131,6 +131,9 @@ public class HoodieInstant implements Serializable { } else { return HoodieTimeline.makeCommitFileName(timestamp); } + } else if (HoodieTimeline.RESTORE_ACTION.equals(action)) { + return isInflight() ? HoodieTimeline.makeInflightRestoreFileName(timestamp) + : HoodieTimeline.makeRestoreFileName(timestamp); } throw new IllegalArgumentException("Cannot get file name for unknown action " + action); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index 93fee8cb4..44482da7f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.uber.hoodie.avro.model.HoodieCleanMetadata; import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata; import com.uber.hoodie.avro.model.HoodieCompactionPlan; +import com.uber.hoodie.avro.model.HoodieRestoreMetadata; import com.uber.hoodie.avro.model.HoodieRollbackMetadata; import com.uber.hoodie.avro.model.HoodieRollbackPartitionMetadata; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; @@ -34,6 +35,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.exception.HoodieIOException; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -114,16 +116,28 @@ public class AvroUtils { totalDeleted, earliestCommitToRetain, partitionMetadataBuilder.build()); } + public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, + Optional durationInMs, List commits, Map> commitToStats) { + ImmutableMap.Builder> commitToStatBuilder = ImmutableMap.builder(); + for (Map.Entry> commitToStat : commitToStats.entrySet()) { + commitToStatBuilder.put(commitToStat.getKey(), Arrays.asList(convertRollbackMetadata(startRestoreTime, + durationInMs, commits, commitToStat.getValue()))); + } + return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits, + commitToStatBuilder.build()); + } + public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, - Optional durationInMs, List commits, List stats) { + Optional durationInMs, List commits, List rollbackStats) { ImmutableMap.Builder partitionMetadataBuilder = ImmutableMap.builder(); int totalDeleted = 0; - for (HoodieRollbackStat stat : stats) { + for (HoodieRollbackStat stat : rollbackStats) { HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles()); - partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); + partitionMetadataBuilder + .put(stat.getPartitionPath(), metadata); totalDeleted += stat.getSuccessDeleteFiles().size(); } return new HoodieRollbackMetadata(startRollbackTime, durationInMs.orElseGet(() -> -1L), @@ -163,6 +177,11 @@ public class AvroUtils { return serializeAvroMetadata(rollbackMetadata, HoodieRollbackMetadata.class); } + public static Optional serializeRestoreMetadata( + HoodieRestoreMetadata restoreMetadata) throws IOException { + return serializeAvroMetadata(restoreMetadata, HoodieRestoreMetadata.class); + } + public static Optional serializeAvroMetadata(T metadata, Class clazz) throws IOException { DatumWriter datumWriter = new SpecificDatumWriter<>(clazz); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index a18c6539f..4b28adee1 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -398,6 +398,20 @@ public class FSUtils { }); } + public static void deleteOlderRestoreMetaFiles(FileSystem fs, String metaPath, + Stream instants) { + //TODO - this should be archived when archival is made general for all meta-data + // skip MIN_ROLLBACK_TO_KEEP and delete rest + instants.skip(MIN_ROLLBACK_TO_KEEP).map(s -> { + try { + return fs.delete(new Path(metaPath, s.getFileName()), false); + } catch (IOException e) { + throw new HoodieIOException( + "Could not delete restore meta files " + s.getFileName(), e); + } + }); + } + public static void createPathIfNotExists(FileSystem fs, Path partitionPath) throws IOException { if (!fs.exists(partitionPath)) { fs.mkdirs(partitionPath);