From 661b0b3bab85c8fd958a9e256dab99790ea92713 Mon Sep 17 00:00:00 2001 From: vinoth chandar Date: Mon, 13 Apr 2020 08:29:19 -0700 Subject: [PATCH] [HUDI-761] Refactoring rollback and restore actions using the ActionExecutor abstraction (#1492) - rollback() and restore() table level APIs introduced - Restore is implemented by wrapping calls to rollback executor - Existing tests transparently cover this, since its just a refactor --- .../apache/hudi/cli/commands/SparkMain.java | 2 +- .../client/AbstractHoodieWriteClient.java | 118 +-------- .../apache/hudi/client/HoodieWriteClient.java | 156 ++++-------- .../exception/HoodieRestoreException.java | 26 ++ .../hudi/table/HoodieCopyOnWriteTable.java | 79 +----- .../hudi/table/HoodieMergeOnReadTable.java | 198 +------------- .../org/apache/hudi/table/HoodieTable.java | 39 ++- .../action/clean/CleanActionExecutor.java | 8 +- .../restore/BaseRestoreActionExecutor.java | 111 ++++++++ .../CopyOnWriteRestoreActionExecutor.java | 57 +++++ .../MergeOnReadRestoreActionExecutor.java | 64 +++++ .../rollback/BaseRollbackActionExecutor.java | 192 ++++++++++++++ .../CopyOnWriteRollbackActionExecutor.java | 94 +++++++ .../MergeOnReadRollbackActionExecutor.java | 241 ++++++++++++++++++ .../{ => action}/rollback/RollbackHelper.java | 2 +- .../rollback/RollbackRequest.java | 2 +- .../hudi/client/TestClientRollback.java | 16 +- .../org/apache/hudi/table/TestCleaner.java | 2 +- .../table/timeline/TimelineMetadataUtils.java | 15 +- .../fs/inline/TestInLineFileSystem.java | 5 +- 20 files changed, 907 insertions(+), 520 deletions(-) create mode 100644 hudi-client/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java create mode 100644 hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/rollback/RollbackHelper.java (99%) rename hudi-client/src/main/java/org/apache/hudi/table/{ => action}/rollback/RollbackRequest.java (98%) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index e8f4c6aff..5e14fc596 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -283,7 +283,7 @@ public class SparkMain { private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception { HoodieWriteClient client = createHoodieClient(jsc, basePath); - if (client.rollbackToSavepoint(savepointTime)) { + if (client.restoreToSavepoint(savepointTime)) { LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime)); return 0; } else { diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 862a60fca..d712d5753 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -18,11 +18,9 @@ package org.apache.hudi.client; -import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import com.codahale.metrics.Timer; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.SparkConfigUtils; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRollingStat; @@ -32,19 +30,14 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieTable; - -import com.codahale.metrics.Timer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -53,10 +46,8 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.ParseException; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * Abstract Write Client providing functionality for performing commit, index updates and rollback @@ -82,12 +73,6 @@ public abstract class AbstractHoodieWriteClient e return this.operationType; } - protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig) { - super(jsc, clientConfig); - this.metrics = new HoodieMetrics(config, config.getTableName()); - this.index = index; - } - protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig, Option timelineServer) { super(jsc, clientConfig, timelineServer); @@ -320,105 +305,4 @@ public abstract class AbstractHoodieWriteClient e // before this point this.index.close(); } - - protected void rollbackInternal(String commitToRollback) { - final String startRollbackTime = HoodieActiveTimeline.createNewInstantTime(); - final Timer.Context context = metrics.getRollbackCtx(); - // Create a Hoodie table which encapsulated the commits and files visible - try { - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(config, jsc); - Option rollbackInstantOpt = - Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() - .filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback)) - .findFirst()); - - if (rollbackInstantOpt.isPresent()) { - List stats = doRollbackAndGetStats(rollbackInstantOpt.get()); - finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime); - } - } catch (IOException e) { - throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback, - e); - } - } - - protected List doRollbackAndGetStats(final HoodieInstant instantToRollback) throws - IOException { - final String commitToRollback = instantToRollback.getTimestamp(); - HoodieTable table = HoodieTable.create(config, jsc); - HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline(); - HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - // Check if any of the commits is a savepoint - do not allow rollback on those commits - List savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - savepoints.forEach(s -> { - if (s.contains(commitToRollback)) { - throw new HoodieRollbackException( - "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s); - } - }); - - if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) { - // nothing to rollback - LOG.info("No commits to rollback " + commitToRollback); - } - - // Make sure only the last n commits are being rolled back - // If there is a commit in-between or after that is not rolled back, then abort - - if ((commitToRollback != null) && !commitTimeline.empty() - && !commitTimeline.findInstantsAfter(commitToRollback, Integer.MAX_VALUE).empty()) { - throw new HoodieRollbackException( - "Found commits after time :" + commitToRollback + ", please rollback greater commits first"); - } - - List inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - if ((commitToRollback != null) && !inflights.isEmpty() - && (inflights.indexOf(commitToRollback) != inflights.size() - 1)) { - throw new HoodieRollbackException( - "Found in-flight commits after time :" + commitToRollback + ", please rollback greater commits first"); - } - - List stats = table.rollback(jsc, instantToRollback, true); - - LOG.info("Deleted inflight commits " + commitToRollback); - - // cleanup index entries - if (!getIndex().rollbackCommit(commitToRollback)) { - throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback); - } - LOG.info("Index rolled back for commits " + commitToRollback); - return stats; - } - - private void finishRollback(final Timer.Context context, List rollbackStats, - List commitsToRollback, final String startRollbackTime) throws IOException { - HoodieTable table = HoodieTable.create(config, jsc); - Option durationInMs = Option.empty(); - long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); - if (context != null) { - durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); - } - HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils - .convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats); - //TODO: varadarb - This will be fixed when Rollback transition mimics that of commit - table.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, - startRollbackTime)); - table.getActiveTimeline().saveAsComplete( - new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), - TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata)); - LOG.info("Rollback of Commits " + commitsToRollback + " is complete"); - - if (!table.getActiveTimeline().getCleanerTimeline().empty()) { - LOG.info("Cleaning up older rollback meta files"); - // Cleanup of older cleaner meta files - // TODO - make the commit archival generic and archive rollback metadata - FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), - table.getActiveTimeline().getRollbackTimeline().getInstants()); - } - } - } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 6e866f933..1fd34a317 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -18,13 +18,14 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.SparkConfigUtils; -import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -49,6 +50,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieRestoreException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieSavepointException; import org.apache.hudi.index.HoodieIndex; @@ -56,8 +58,6 @@ import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieCommitArchiveLog; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; - -import com.codahale.metrics.Timer; import org.apache.hudi.table.action.commit.HoodieWriteMetadata; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -65,18 +65,17 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.ParseException; -import java.util.Collections; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import scala.Tuple2; - /** * Hoodie Write Client helps you build tables on HDFS [insert()] and then perform efficient mutations on an HDFS * table [upsert()] @@ -505,13 +504,13 @@ public class HoodieWriteClient extends AbstractHo } /** - * Rollback the state to the savepoint. WARNING: This rollsback recent commits and deleted data files. Queries + * Restore the state to the savepoint. WARNING: This rollsback recent commits and deleted data files. Queries * accessing the files will mostly fail. This should be done during a downtime. * * @param savepointTime - savepoint time to rollback to * @return true if the savepoint was rollecback to successfully */ - public boolean rollbackToSavepoint(String savepointTime) { + public boolean restoreToSavepoint(String savepointTime) { HoodieTable table = HoodieTable.create(config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); @@ -544,103 +543,60 @@ public class HoodieWriteClient extends AbstractHo } /** - * Rollback the (inflight/committed) record changes with the given commit time. Three steps: (1) Atomically unpublish - * this commit (2) clean indexing data, (3) clean new generated parquet files. (4) Finally delete .commit or .inflight - * file. + * Rollback the inflight record changes with the given commit time. * - * @param instantTime Instant time of the commit - * @return {@code true} If rollback the record changes successfully. {@code false} otherwise + * @param commitInstantTime Instant time of the commit + * @throws HoodieRollbackException if rollback cannot be performed successfully */ - public boolean rollback(final String instantTime) throws HoodieRollbackException { - rollbackInternal(instantTime); - return true; + public boolean rollback(final String commitInstantTime) throws HoodieRollbackException { + LOG.info("Begin rollback of instant " + commitInstantTime); + final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime(); + final Timer.Context context = this.metrics.getRollbackCtx(); + try { + HoodieTable table = HoodieTable.create(config, jsc); + Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() + .filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitInstantTime)) + .findFirst()); + if (commitInstantOpt.isPresent()) { + HoodieRollbackMetadata rollbackMetadata = table.rollback(jsc, rollbackInstantTime, commitInstantOpt.get(), true); + if (context != null) { + long durationInMs = metrics.getDurationInMs(context.stop()); + metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); + } + return true; + } else { + LOG.info("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"); + return false; + } + } catch (Exception e) { + throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e); + } } /** * NOTE : This action requires all writers (ingest and compact) to a table to be stopped before proceeding. Revert - * the (inflight/committed) record changes for all commits after the provided @param. Four steps: (1) Atomically - * unpublish this commit (2) clean indexing data, (3) clean new generated parquet/log files and/or append rollback to - * existing log files. (4) Finally delete .commit, .inflight, .compaction.inflight or .compaction.requested file + * the (inflight/committed) record changes for all commits after the provided instant time. * * @param instantTime Instant time to which restoration is requested */ - public void restoreToInstant(final String instantTime) throws HoodieRollbackException { - - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(config, jsc); - // Get all the commits on the timeline after the provided commit time - List instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline() - .getReverseOrderedInstants() - .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime)) - .collect(Collectors.toList()); - // Start a rollback instant for all commits to be rolled back - String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime(); - // Start the timer - final Timer.Context context = startContext(); - Map> instantsToStats = new HashMap<>(); - table.getActiveTimeline().createNewInstant( - new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant)); - instantsToRollback.forEach(instant -> { - try { - switch (instant.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.DELTA_COMMIT_ACTION: - List statsForInstant = doRollbackAndGetStats(instant); - instantsToStats.put(instant.getTimestamp(), statsForInstant); - break; - case HoodieTimeline.COMPACTION_ACTION: - // TODO : Get file status and create a rollback stat and file - // TODO : Delete the .aux files along with the instant file, okay for now since the archival process will - // delete these files when it does not see a corresponding instant file under .hoodie - List statsForCompaction = doRollbackAndGetStats(instant); - instantsToStats.put(instant.getTimestamp(), statsForCompaction); - LOG.info("Deleted compaction instant " + instant); - break; - default: - throw new IllegalArgumentException("invalid action name " + instant.getAction()); - } - } catch (IOException io) { - throw new HoodieRollbackException("unable to rollback instant " + instant, io); - } - }); + public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws HoodieRestoreException { + LOG.info("Begin restore to instant " + instantTime); + final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime(); + Timer.Context context = metrics.getRollbackCtx(); try { - finishRestore(context, Collections.unmodifiableMap(instantsToStats), - instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), - startRollbackInstant, instantTime); - } catch (IOException io) { - throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io); - } - } - - private Timer.Context startContext() { - return metrics.getRollbackCtx(); - } - - private void finishRestore(final Timer.Context context, Map> commitToStats, - List commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { - HoodieTable table = HoodieTable.create(config, jsc); - Option durationInMs = Option.empty(); - long numFilesDeleted = 0L; - for (Map.Entry> commitToStat : commitToStats.entrySet()) { - List stats = commitToStat.getValue(); - numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); - } - if (context != null) { - durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); - } - HoodieRestoreMetadata restoreMetadata = - TimelineMetadataUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats); - table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime), - TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata)); - LOG.info("Commits " + commitsToRollback + " rollback is complete. Restored table to " + restoreToInstant); - - if (!table.getActiveTimeline().getCleanerTimeline().empty()) { - LOG.info("Cleaning up older restore meta files"); - // Cleanup of older cleaner meta files - // TODO - make the commit archival generic and archive rollback metadata - FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), - table.getActiveTimeline().getRestoreTimeline().getInstants()); + HoodieTable table = HoodieTable.create(config, jsc); + HoodieRestoreMetadata restoreMetadata = table.restore(jsc, restoreInstantTime, instantTime); + if (context != null) { + final long durationInMs = metrics.getDurationInMs(context.stop()); + final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream() + .flatMap(Collection::stream) + .mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted) + .sum(); + metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted); + } + return restoreMetadata; + } catch (Exception e) { + throw new HoodieRestoreException("Failed to restore to " + instantTime, e); } } @@ -835,11 +791,8 @@ public class HoodieWriteClient extends AbstractHo HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { - // inflight compaction - Needs to rollback first deleting new parquet files before we run compaction. rollbackInflightCompaction(inflightInstant, table); - // refresh table - metaClient = createMetaClient(true); - table = HoodieTable.create(metaClient, config, jsc); + metaClient.reloadActiveTimeline(); pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline(); } @@ -914,9 +867,8 @@ public class HoodieWriteClient extends AbstractHo * @param inflightInstant Inflight Compaction Instant * @param table Hoodie Table */ - public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException { - table.rollback(jsc, inflightInstant, false); - // Revert instant state file + public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) { + table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false); table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant); } diff --git a/hudi-client/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java b/hudi-client/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java new file mode 100644 index 000000000..c6c9076f5 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/exception/HoodieRestoreException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class HoodieRestoreException extends HoodieException { + + public HoodieRestoreException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 03c014f58..087a2b0f1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -22,10 +22,10 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.ParquetReaderIterator; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; @@ -33,9 +33,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; @@ -56,8 +54,8 @@ import org.apache.hudi.table.action.commit.InsertCommitActionExecutor; import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor; -import org.apache.hudi.table.rollback.RollbackHelper; -import org.apache.hudi.table.rollback.RollbackRequest; +import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor; +import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; @@ -68,12 +66,10 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with @@ -202,69 +198,12 @@ public class HoodieCopyOnWriteTable extends Hoodi } @Override - public List rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants) - throws IOException { - long startTime = System.currentTimeMillis(); - List stats = new ArrayList<>(); - HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); - - if (instant.isCompleted()) { - LOG.info("Unpublishing instant " + instant); - instant = activeTimeline.revertToInflight(instant); - } - - // For Requested State (like failure during index lookup), there is nothing to do rollback other than - // deleting the timeline file - if (!instant.isRequested()) { - String commit = instant.getTimestamp(); - - // delete all the data files for this commit - LOG.info("Clean out all parquet files generated for commit: " + commit); - List rollbackRequests = generateRollbackRequests(instant); - - //TODO: We need to persist this as rollback workload and use it in case of partial failures - stats = new RollbackHelper(metaClient, config).performRollback(jsc, instant, rollbackRequests); - } - // Delete Inflight instant if enabled - deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant); - LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); - return stats; + public HoodieRollbackMetadata rollback(JavaSparkContext jsc, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { + return new CopyOnWriteRollbackActionExecutor(jsc, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } - private List generateRollbackRequests(HoodieInstant instantToRollback) - throws IOException { - return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)) - .collect(Collectors.toList()); - } - - - /** - * Delete Inflight instant if enabled. - * - * @param deleteInstant Enable Deletion of Inflight instant - * @param activeTimeline Hoodie active timeline - * @param instantToBeDeleted Instant to be deleted - */ - protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline, - HoodieInstant instantToBeDeleted) { - // Remove marker files always on rollback - deleteMarkerDir(instantToBeDeleted.getTimestamp()); - - // Remove the rolled back inflight commits - if (deleteInstant) { - LOG.info("Deleting instant=" + instantToBeDeleted); - activeTimeline.deletePending(instantToBeDeleted); - if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) { - // Delete corresponding requested instant - instantToBeDeleted = new HoodieInstant(State.REQUESTED, instantToBeDeleted.getAction(), - instantToBeDeleted.getTimestamp()); - activeTimeline.deletePending(instantToBeDeleted); - } - LOG.info("Deleted pending commit " + instantToBeDeleted); - } else { - LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted); - } + public HoodieRestoreMetadata restore(JavaSparkContext jsc, String restoreInstantTime, String instantToRestore) { + return new CopyOnWriteRestoreActionExecutor(jsc, config, this, restoreInstantTime, instantToRestore).execute(); } enum BucketType { @@ -296,8 +235,6 @@ public class HoodieCopyOnWriteTable extends Hoodi } } - - /** * Helper class for a small file's location and its actual size on disk. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index c496df2c3..844b9ad2e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -19,22 +19,17 @@ package org.apache.hudi.table; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieIOException; @@ -46,21 +41,16 @@ import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor; +import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor; +import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor; -import org.apache.hudi.table.rollback.RollbackHelper; -import org.apache.hudi.table.rollback.RollbackRequest; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; /** @@ -172,146 +162,15 @@ public class HoodieMergeOnReadTable extends Hoodi } @Override - public List rollback(JavaSparkContext jsc, HoodieInstant instant, - boolean deleteInstants) throws IOException { - long startTime = System.currentTimeMillis(); - - String commit = instant.getTimestamp(); - LOG.error("Rolling back instant " + instant); - - // Atomically un-publish all non-inflight commits - if (instant.isCompleted()) { - LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants); - instant = this.getActiveTimeline().revertToInflight(instant); - } - - List allRollbackStats = new ArrayList<>(); - - // At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental - // feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback - // (commitToRollback). - // NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is - // required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks. - // Atomically un-publish all non-inflight commits - // Atomically un-publish all non-inflight commits - // For Requested State (like failure during index lookup), there is nothing to do rollback other than - // deleting the timeline file - if (!instant.isRequested()) { - LOG.info("Unpublished " + commit); - List rollbackRequests = generateRollbackRequests(jsc, instant); - // TODO: We need to persist this as rollback workload and use it in case of partial failures - allRollbackStats = new RollbackHelper(metaClient, config).performRollback(jsc, instant, rollbackRequests); - } - - // Delete Inflight instants if enabled - deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant); - - LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); - - return allRollbackStats; + public HoodieRollbackMetadata rollback(JavaSparkContext jsc, + String rollbackInstantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { + return new MergeOnReadRollbackActionExecutor(jsc, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } - /** - * Generate all rollback requests that we need to perform for rolling back this action without actually performing - * rolling back. - * - * @param jsc JavaSparkContext - * @param instantToRollback Instant to Rollback - * @return list of rollback requests - * @throws IOException - */ - private List generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback) - throws IOException { - String commit = instantToRollback.getTimestamp(); - List partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(), - config.shouldAssumeDatePartitioning()); - int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); - return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { - HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload(); - List partitionRollbackRequests = new ArrayList<>(); - switch (instantToRollback.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - LOG.info( - "Rolling back commit action. There are higher delta commits. So only rolling back this instant"); - partitionRollbackRequests.add( - RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); - break; - case HoodieTimeline.COMPACTION_ACTION: - // If there is no delta commit present after the current commit (if compaction), no action, else we - // need to make sure that a compaction commit rollback also deletes any log files written as part of the - // succeeding deltacommit. - boolean higherDeltaCommits = - !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); - if (higherDeltaCommits) { - // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled - // and has not yet finished. In this scenario we should delete only the newly created parquet files - // and not corresponding base commit log files created with this as baseCommit since updates would - // have been written to the log files. - LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); - partitionRollbackRequests.add( - RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback)); - } else { - // No deltacommits present after this compaction commit (inflight or requested). In this case, we - // can also delete any log files that were created with this compaction commit as base - // commit. - LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" - + " log files"); - partitionRollbackRequests.add( - RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); - } - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. - // In this scenario, we delete all the parquet files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base parquet file gets deleted. - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - metaClient.getCommitTimeline() - .getInstantDetails( - new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) - .get(), - HoodieCommitMetadata.class); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or parquet files), - // delete all files for the corresponding failed commit, if present (same as COW) - partitionRollbackRequests.add( - RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); - - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - partitionRollbackRequests - .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); - } - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io); - } - default: - break; - } - return partitionRollbackRequests.iterator(); - }).filter(Objects::nonNull).collect(); + public HoodieRestoreMetadata restore(JavaSparkContext jsc, String restoreInstantTime, String instantToRestore) { + return new MergeOnReadRestoreActionExecutor(jsc, config, this, restoreInstantTime, instantToRestore).execute(); } @Override @@ -320,39 +179,4 @@ public class HoodieMergeOnReadTable extends Hoodi // delegate to base class for MOR tables super.finalizeWrite(jsc, instantTs, stats); } - - private List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, - HoodieCommitMetadata commitMetadata) { - ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); - - // wStat.getPrevCommit() might not give the right commit time in the following - // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be - // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. - // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the - // baseCommit always by listing the file slice - Map fileIdToBaseCommitTimeForLogMap = this.getSliceView().getLatestFileSlices(partitionPath) - .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); - return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { - - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) - && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); - - if (validForRollback) { - // For sanity, log instant time can never be less than base-commit on which we are rolling back - ValidationUtils - .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), - rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); - } - - return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option - // to delete and we should not step on it - wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER); - }).map(wStat -> { - String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), - baseCommitTime, rollbackInstant); - }).collect(Collectors.toList()); - } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 2bf66d3f1..0275b72d6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -23,10 +23,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility; @@ -350,12 +351,27 @@ public abstract class HoodieTable implements Seri public abstract HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime); /** - * Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish - * this commit (2) clean indexing data (3) clean new generated parquet files / log blocks (4) Finally, delete - * ..commit or ..inflight file if deleteInstants = true + * Rollback the (inflight/committed) record changes with the given commit time. + *
+   *   Three steps:
+   *   (1) Atomically unpublish this commit
+   *   (2) clean indexing data
+   *   (3) clean new generated parquet files.
+   *   (4) Finally delete .commit or .inflight file, if deleteInstants = true
+   * 
*/ - public abstract List rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants) - throws IOException; + public abstract HoodieRollbackMetadata rollback(JavaSparkContext jsc, + String rollbackInstantTime, + HoodieInstant commitInstant, + boolean deleteInstants); + + /** + * Restore the table to the given instant. Note that this is a admin table recovery operation + * that would cause any running queries that are accessing file slices written after the instant to fail. + */ + public abstract HoodieRestoreMetadata restore(JavaSparkContext jsc, + String restoreInstantTime, + String instantToRestore); /** * Finalize the written data onto storage. Perform any final cleanups. @@ -364,8 +380,7 @@ public abstract class HoodieTable implements Seri * @param stats List of HoodieWriteStats * @throws HoodieIOException if some paths can't be finalized on storage */ - public void finalizeWrite(JavaSparkContext jsc, String instantTs, List stats) - throws HoodieIOException { + public void finalizeWrite(JavaSparkContext jsc, String instantTs, List stats) throws HoodieIOException { cleanFailedWrites(jsc, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled()); } @@ -374,7 +389,7 @@ public abstract class HoodieTable implements Seri * * @param instantTs Instant Time */ - protected void deleteMarkerDir(String instantTs) { + public void deleteMarkerDir(String instantTs) { try { FileSystem fs = getMetaClient().getFs(); Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs)); @@ -473,8 +488,7 @@ public abstract class HoodieTable implements Seri * @param groupByPartition Files grouped by partition * @param visibility Appear/Disappear */ - private void waitForAllFiles(JavaSparkContext jsc, Map>> groupByPartition, - FileVisibility visibility) { + private void waitForAllFiles(JavaSparkContext jsc, Map>> groupByPartition, FileVisibility visibility) { // This will either ensure all files to be deleted are present. boolean checkPassed = jsc.parallelize(new ArrayList<>(groupByPartition.entrySet()), config.getFinalizeWriteParallelism()) @@ -486,8 +500,7 @@ public abstract class HoodieTable implements Seri } } - private boolean waitForCondition(String partitionPath, Stream> partitionFilePaths, - FileVisibility visibility) { + private boolean waitForCondition(String partitionPath, Stream> partitionFilePaths, FileVisibility visibility) { final FileSystem fileSystem = metaClient.getRawFs(); List fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList()); try { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 48fe71da4..b3caa440a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -57,6 +57,10 @@ public class CleanActionExecutor extends BaseActionExecutor private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class); + public CleanActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, String instantTime) { + super(jsc, config, table, instantTime); + } + /** * Generates List of files to be cleaned. * @@ -172,10 +176,6 @@ public class CleanActionExecutor extends BaseActionExecutor }).collect(Collectors.toList()); } - public CleanActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, String instantTime) { - super(jsc, config, table, instantTime); - } - /** * Creates a Cleaner plan if there are files to be cleaned and stores them in instant file. * diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java new file mode 100644 index 000000000..d1a307e48 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.restore; + +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class BaseRestoreActionExecutor extends BaseActionExecutor { + + private static final Logger LOG = LogManager.getLogger(BaseRestoreActionExecutor.class); + + private final String restoreInstantTime; + + public BaseRestoreActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + String restoreInstantTime) { + super(jsc, config, table, instantTime); + this.restoreInstantTime = restoreInstantTime; + } + + @Override + public HoodieRestoreMetadata execute() { + HoodieTimer restoreTimer = new HoodieTimer(); + restoreTimer.startTimer(); + + // Get all the commits on the timeline after the provided commit time + List instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline() + .getReverseOrderedInstants() + .filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), restoreInstantTime)) + .collect(Collectors.toList()); + + Map> instantToMetadata = new HashMap<>(); + table.getActiveTimeline().createNewInstant(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime)); + instantsToRollback.forEach(instant -> { + instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant))); + LOG.info("Deleted instant " + instant); + }); + + try { + return finishRestore(instantToMetadata, + instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), + restoreTimer.endTimer() + ); + } catch (IOException io) { + throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io); + } + } + + protected abstract HoodieRollbackMetadata rollbackInstant(HoodieInstant rollbackInstant); + + private HoodieRestoreMetadata finishRestore(Map> instantToMetadata, + List instantsRolledBack, + long durationInMs) throws IOException { + + HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.convertRestoreMetadata( + instantTime, durationInMs, instantsRolledBack, instantToMetadata); + table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime), + TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata)); + LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime); + + if (!table.getActiveTimeline().getCleanerTimeline().empty()) { + LOG.info("Cleaning up older restore meta files"); + // Cleanup of older cleaner meta files + // TODO - make the commit archival generic and archive rollback metadata + FSUtils.deleteOlderRollbackMetaFiles( + table.getMetaClient().getFs(), + table.getMetaClient().getMetaPath(), + table.getActiveTimeline().getRestoreTimeline().getInstants() + ); + } + return restoreMetadata; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java new file mode 100644 index 000000000..53e94c648 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/CopyOnWriteRestoreActionExecutor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.restore; + +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; +import org.apache.spark.api.java.JavaSparkContext; + +public class CopyOnWriteRestoreActionExecutor extends BaseRestoreActionExecutor { + + public CopyOnWriteRestoreActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + String restoreInstantTime) { + super(jsc, config, table, instantTime, restoreInstantTime); + } + + @Override + protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { + table.getMetaClient().reloadActiveTimeline(); + CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor( + jsc, + config, + table, + HoodieActiveTimeline.createNewInstantTime(), + instantToRollback, + true, + true); + if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { + throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback); + } + return rollbackActionExecutor.execute(); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java new file mode 100644 index 000000000..804086117 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/MergeOnReadRestoreActionExecutor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.restore; + +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; +import org.apache.spark.api.java.JavaSparkContext; + +public class MergeOnReadRestoreActionExecutor extends BaseRestoreActionExecutor { + + public MergeOnReadRestoreActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + String restoreInstantTime) { + super(jsc, config, table, instantTime, restoreInstantTime); + } + + @Override + protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { + table.getMetaClient().reloadActiveTimeline(); + MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor( + jsc, + config, + table, + HoodieActiveTimeline.createNewInstantTime(), + instantToRollback, + true, + true); + + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: + // TODO : Get file status and create a rollback stat and file + // TODO : Delete the .aux files along with the instant file, okay for now since the archival process will + // delete these files when it does not see a corresponding instant file under .hoodie + return rollbackActionExecutor.execute(); + default: + throw new IllegalArgumentException("invalid action name " + instantToRollback.getAction()); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java new file mode 100644 index 000000000..bb12bdd3c --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public abstract class BaseRollbackActionExecutor extends BaseActionExecutor { + + private static final Logger LOG = LogManager.getLogger(BaseRollbackActionExecutor.class); + + protected final HoodieInstant instantToRollback; + protected final boolean deleteInstants; + protected final boolean skipTimelinePublish; + + public BaseRollbackActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant instantToRollback, + boolean deleteInstants) { + this(jsc, config, table, instantTime, instantToRollback, deleteInstants, false); + } + + public BaseRollbackActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant instantToRollback, + boolean deleteInstants, + boolean skipTimelinePublish) { + super(jsc, config, table, instantTime); + this.instantToRollback = instantToRollback; + this.deleteInstants = deleteInstants; + this.skipTimelinePublish = skipTimelinePublish; + } + + protected abstract List executeRollback() throws IOException; + + @Override + public HoodieRollbackMetadata execute() { + HoodieTimer rollbackTimer = new HoodieTimer().startTimer(); + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata( + instantTime, + Option.of(rollbackTimer.endTimer()), + Collections.singletonList(instantToRollback.getTimestamp()), + doRollbackAndGetStats()); + if (!skipTimelinePublish) { + finishRollback(rollbackMetadata); + } + return rollbackMetadata; + } + + public List doRollbackAndGetStats() { + final String instantTimeToRollback = instantToRollback.getTimestamp(); + final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction()) + && !instantToRollback.isCompleted(); + HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline(); + HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); + // Check if any of the commits is a savepoint - do not allow rollback on those commits + List savepoints = table.getCompletedSavepointTimeline().getInstants() + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + savepoints.forEach(s -> { + if (s.contains(instantTimeToRollback)) { + throw new HoodieRollbackException( + "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s); + } + }); + + if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) { + LOG.info("No commits to rollback " + instantTimeToRollback); + } + + // Make sure only the last n commits are being rolled back + // If there is a commit in-between or after that is not rolled back, then abort + if (!isPendingCompaction) { + if ((instantTimeToRollback != null) && !commitTimeline.empty() + && !commitTimeline.findInstantsAfter(instantTimeToRollback, Integer.MAX_VALUE).empty()) { + throw new HoodieRollbackException( + "Found commits after time :" + instantTimeToRollback + ", please rollback greater commits first"); + } + + List inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + if ((instantTimeToRollback != null) && !inflights.isEmpty() + && (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) { + throw new HoodieRollbackException( + "Found in-flight commits after time :" + instantTimeToRollback + ", please rollback greater commits first"); + } + } + + try { + List stats = executeRollback(); + LOG.info("Rolled back inflight instant " + instantTimeToRollback); + if (!isPendingCompaction) { + if (!table.getIndex().rollbackCommit(instantTimeToRollback)) { + throw new HoodieRollbackException("Rollback index changes failed, for time :" + instantTimeToRollback); + } + LOG.info("Index rolled back for commits " + instantTimeToRollback); + } + return stats; + } catch (IOException e) { + throw new HoodieIOException("Unable to execute rollback ", e); + } + } + + protected void finishRollback(HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException { + try { + table.getActiveTimeline().createNewInstant( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, instantTime)); + table.getActiveTimeline().saveAsComplete( + new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, instantTime), + TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata)); + LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete"); + if (!table.getActiveTimeline().getCleanerTimeline().empty()) { + LOG.info("Cleaning up older rollback meta files"); + FSUtils.deleteOlderRollbackMetaFiles(table.getMetaClient().getFs(), + table.getMetaClient().getMetaPath(), + table.getActiveTimeline().getRollbackTimeline().getInstants()); + } + } catch (IOException e) { + throw new HoodieIOException("Error executing rollback at instant " + instantTime, e); + } + } + + /** + * Delete Inflight instant if enabled. + * + * @param deleteInstant Enable Deletion of Inflight instant + * @param activeTimeline Hoodie active timeline + * @param instantToBeDeleted Instant to be deleted + */ + protected void deleteInflightAndRequestedInstant(boolean deleteInstant, + HoodieActiveTimeline activeTimeline, + HoodieInstant instantToBeDeleted) { + // Remove marker files always on rollback + table.deleteMarkerDir(instantToBeDeleted.getTimestamp()); + + // Remove the rolled back inflight commits + if (deleteInstant) { + LOG.info("Deleting instant=" + instantToBeDeleted); + activeTimeline.deletePending(instantToBeDeleted); + if (instantToBeDeleted.isInflight() && !table.getMetaClient().getTimelineLayoutVersion().isNullVersion()) { + // Delete corresponding requested instant + instantToBeDeleted = new HoodieInstant(HoodieInstant.State.REQUESTED, instantToBeDeleted.getAction(), + instantToBeDeleted.getTimestamp()); + activeTimeline.deletePending(instantToBeDeleted); + } + LOG.info("Deleted pending commit " + instantToBeDeleted); + } else { + LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted); + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java new file mode 100644 index 000000000..112337455 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecutor { + + private static final Logger LOG = LogManager.getLogger(CopyOnWriteRollbackActionExecutor.class); + + public CopyOnWriteRollbackActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { + super(jsc, config, table, instantTime, commitInstant, deleteInstants); + } + + public CopyOnWriteRollbackActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish) { + super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish); + } + + @Override + protected List executeRollback() throws IOException { + long startTime = System.currentTimeMillis(); + List stats = new ArrayList<>(); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieInstant resolvedInstant = instantToRollback; + + if (instantToRollback.isCompleted()) { + LOG.info("Unpublishing instant " + instantToRollback); + resolvedInstant = activeTimeline.revertToInflight(instantToRollback); + } + + // For Requested State (like failure during index lookup), there is nothing to do rollback other than + // deleting the timeline file + if (!resolvedInstant.isRequested()) { + // delete all the data files for this commit + LOG.info("Clean out all parquet files generated for commit: " + resolvedInstant); + List rollbackRequests = generateRollbackRequests(resolvedInstant); + + //TODO: We need to persist this as rollback workload and use it in case of partial failures + stats = new RollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests); + } + // Delete Inflight instant if enabled + deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, resolvedInstant); + LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); + return stats; + } + + private List generateRollbackRequests(HoodieInstant instantToRollback) + throws IOException { + return FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()).stream() + .map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)) + .collect(Collectors.toList()); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java new file mode 100644 index 000000000..8b95198aa --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecutor { + + private static final Logger LOG = LogManager.getLogger(MergeOnReadRollbackActionExecutor.class); + + public MergeOnReadRollbackActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { + super(jsc, config, table, instantTime, commitInstant, deleteInstants); + } + + public MergeOnReadRollbackActionExecutor(JavaSparkContext jsc, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish) { + super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish); + } + + @Override + protected List executeRollback() throws IOException { + long startTime = System.currentTimeMillis(); + LOG.error("Rolling back instant " + instantToRollback.getTimestamp()); + + HoodieInstant resolvedInstant = instantToRollback; + // Atomically un-publish all non-inflight commits + if (instantToRollback.isCompleted()) { + LOG.error("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants); + resolvedInstant = table.getActiveTimeline().revertToInflight(instantToRollback); + } + + List allRollbackStats = new ArrayList<>(); + + // At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental + // feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback + // (commitToRollback). + // NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is + // required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks. + // Atomically un-publish all non-inflight commits + // Atomically un-publish all non-inflight commits + // For Requested State (like failure during index lookup), there is nothing to do rollback other than + // deleting the timeline file + if (!resolvedInstant.isRequested()) { + LOG.info("Unpublished " + resolvedInstant); + List rollbackRequests = generateRollbackRequests(jsc, resolvedInstant); + // TODO: We need to persist this as rollback workload and use it in case of partial failures + allRollbackStats = new RollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests); + } + + // Delete Inflight instants if enabled + deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant); + + LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); + + return allRollbackStats; + } + + /** + * Generate all rollback requests that we need to perform for rolling back this action without actually performing + * rolling back. + * + * @param jsc JavaSparkContext + * @param instantToRollback Instant to Rollback + * @return list of rollback requests + * @throws IOException + */ + private List generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback) + throws IOException { + String commit = instantToRollback.getTimestamp(); + List partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(), + config.shouldAssumeDatePartitioning()); + int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); + return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> { + HoodieActiveTimeline activeTimeline = table.getActiveTimeline().reload(); + List partitionRollbackRequests = new ArrayList<>(); + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + LOG.info("Rolling back commit action. There are higher delta commits. So only rolling back this instant"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); + break; + case HoodieTimeline.COMPACTION_ACTION: + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = + !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created parquet files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback)); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" + + " log files"); + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); + } + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. + // In this scenario, we delete all the parquet files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base parquet file gets deleted. + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline() + .getInstantDetails( + new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) + .get(), + HoodieCommitMetadata.class); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add( + RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); + + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata)); + } + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io); + } + default: + break; + } + return partitionRollbackRequests.iterator(); + }).filter(Objects::nonNull).collect(); + } + + private List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, + HoodieCommitMetadata commitMetadata) { + ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + + // wStat.getPrevCommit() might not give the right commit time in the following + // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be + // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. + // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the + // baseCommit always by listing the file slice + Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath) + .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); + return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { + + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) + && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); + + if (validForRollback) { + // For sanity, log instant time can never be less than base-commit on which we are rolling back + ValidationUtils + .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), + rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)); + } + + return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( + // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option + // to delete and we should not step on it + wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER); + }).map(wStat -> { + String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); + return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), + baseCommitTime, rollbackInstant); + }).collect(Collectors.toList()); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/rollback/RollbackHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java similarity index 99% rename from hudi-client/src/main/java/org/apache/hudi/table/rollback/RollbackHelper.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java index 0aa8b6ed7..7c42e6723 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/rollback/RollbackHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.rollback; +package org.apache.hudi.table.action.rollback; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.fs.FSUtils; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/rollback/RollbackRequest.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackRequest.java similarity index 98% rename from hudi-client/src/main/java/org/apache/hudi/table/rollback/RollbackRequest.java rename to hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackRequest.java index 45b95f100..71cb57d26 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/rollback/RollbackRequest.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/RollbackRequest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.table.rollback; +package org.apache.hudi.table.action.rollback; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 4b3a9edf1..16ccb0574 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -132,7 +132,7 @@ public class TestClientRollback extends TestHoodieClientBase { // rolling back to a non existent savepoint must not succeed try { - client.rollbackToSavepoint("001"); + client.restoreToSavepoint("001"); fail("Rolling back to non-existent savepoint should not be allowed"); } catch (HoodieRollbackException e) { // this is good @@ -140,24 +140,18 @@ public class TestClientRollback extends TestHoodieClientBase { // rollback to savepoint 002 HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get(); - client.rollbackToSavepoint(savepoint.getTimestamp()); + client.restoreToSavepoint(savepoint.getTimestamp()); metaClient = HoodieTableMetaClient.reload(metaClient); table = HoodieTable.create(metaClient, getConfig(), jsc); final BaseFileOnlyView view3 = table.getBaseFileOnlyView(); - dataFiles = partitionPaths.stream().flatMap(s -> { - return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002")); - }).collect(Collectors.toList()); + dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList()); assertEquals("The data files for commit 002 be available", 3, dataFiles.size()); - dataFiles = partitionPaths.stream().flatMap(s -> { - return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003")); - }).collect(Collectors.toList()); + dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"))).collect(Collectors.toList()); assertEquals("The data files for commit 003 should be rolled back", 0, dataFiles.size()); - dataFiles = partitionPaths.stream().flatMap(s -> { - return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004")); - }).collect(Collectors.toList()); + dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList()); assertEquals("The data files for commit 004 should be rolled back", 0, dataFiles.size()); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index d8b315d47..573e44830 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -900,7 +900,7 @@ public class TestCleaner extends TestHoodieClientBase { table.getActiveTimeline().transitionRequestedToInflight( new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); metaClient.reloadActiveTimeline(); - table.rollback(jsc, new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true); + table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true); assertEquals("All temp files are deleted.", 0, getTotalTempFiles()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index d986524ec..48fc561bc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -51,15 +51,12 @@ public class TimelineMetadataUtils { private static final Integer DEFAULT_VERSION = 1; - public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, Option durationInMs, - List commits, Map> commitToStats) { - Map> commitToStatsMap = new HashMap<>(); - for (Map.Entry> commitToStat : commitToStats.entrySet()) { - commitToStatsMap.put(commitToStat.getKey(), - Collections.singletonList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue()))); - } - return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits, - Collections.unmodifiableMap(commitToStatsMap), DEFAULT_VERSION); + public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, + long durationInMs, + List commits, + Map> instantToRollbackMetadata) { + return new HoodieRestoreMetadata(startRestoreTime, durationInMs, commits, + Collections.unmodifiableMap(instantToRollbackMetadata), DEFAULT_VERSION); } public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, Option durationInMs, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java index 1ef073727..a3ba38444 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystem.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.junit.After; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.io.File; @@ -115,8 +116,8 @@ public class TestInLineFileSystem { } } - // @Test - // Disabling flaky test for now https://issues.apache.org/jira/browse/HUDI-786 + @Test + @Ignore // Disabling flaky test for now https://issues.apache.org/jira/browse/HUDI-786 public void testFileSystemApis() throws IOException { OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000); Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length);