1
0

[HUDI-761] Refactoring rollback and restore actions using the ActionExecutor abstraction (#1492)

- rollback() and restore() table level APIs introduced
- Restore is implemented by wrapping calls to rollback executor
- Existing tests transparently cover this, since its just a refactor
This commit is contained in:
vinoth chandar
2020-04-13 08:29:19 -07:00
committed by GitHub
parent 17bf930342
commit 661b0b3bab
20 changed files with 907 additions and 520 deletions

View File

@@ -283,7 +283,7 @@ public class SparkMain {
private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTime, String basePath) throws Exception {
HoodieWriteClient client = createHoodieClient(jsc, basePath);
if (client.rollbackToSavepoint(savepointTime)) {
if (client.restoreToSavepoint(savepointTime)) {
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
return 0;
} else {

View File

@@ -18,11 +18,9 @@
package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import com.codahale.metrics.Timer;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStat;
@@ -32,19 +30,14 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import com.codahale.metrics.Timer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -53,10 +46,8 @@ import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Abstract Write Client providing functionality for performing commit, index updates and rollback
@@ -82,12 +73,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
return this.operationType;
}
protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig) {
super(jsc, clientConfig);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.index = index;
}
protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineServer) {
super(jsc, clientConfig, timelineServer);
@@ -320,105 +305,4 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
// before this point
this.index.close();
}
protected void rollbackInternal(String commitToRollback) {
final String startRollbackTime = HoodieActiveTimeline.createNewInstantTime();
final Timer.Context context = metrics.getRollbackCtx();
// Create a Hoodie table which encapsulated the commits and files visible
try {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, jsc);
Option<HoodieInstant> rollbackInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback))
.findFirst());
if (rollbackInstantOpt.isPresent()) {
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackInstantOpt.get());
finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime);
}
} catch (IOException e) {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback,
e);
}
}
protected List<HoodieRollbackStat> doRollbackAndGetStats(final HoodieInstant instantToRollback) throws
IOException {
final String commitToRollback = instantToRollback.getTimestamp();
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// Check if any of the commits is a savepoint - do not allow rollback on those commits
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
savepoints.forEach(s -> {
if (s.contains(commitToRollback)) {
throw new HoodieRollbackException(
"Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
}
});
if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) {
// nothing to rollback
LOG.info("No commits to rollback " + commitToRollback);
}
// Make sure only the last n commits are being rolled back
// If there is a commit in-between or after that is not rolled back, then abort
if ((commitToRollback != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(commitToRollback, Integer.MAX_VALUE).empty()) {
throw new HoodieRollbackException(
"Found commits after time :" + commitToRollback + ", please rollback greater commits first");
}
List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if ((commitToRollback != null) && !inflights.isEmpty()
&& (inflights.indexOf(commitToRollback) != inflights.size() - 1)) {
throw new HoodieRollbackException(
"Found in-flight commits after time :" + commitToRollback + ", please rollback greater commits first");
}
List<HoodieRollbackStat> stats = table.rollback(jsc, instantToRollback, true);
LOG.info("Deleted inflight commits " + commitToRollback);
// cleanup index entries
if (!getIndex().rollbackCommit(commitToRollback)) {
throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback);
}
LOG.info("Index rolled back for commits " + commitToRollback);
return stats;
}
private void finishRollback(final Timer.Context context, List<HoodieRollbackStat> rollbackStats,
List<String> commitsToRollback, final String startRollbackTime) throws IOException {
HoodieTable<T> table = HoodieTable.create(config, jsc);
Option<Long> durationInMs = Option.empty();
long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
}
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils
.convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats);
//TODO: varadarb - This will be fixed when Rollback transition mimics that of commit
table.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION,
startRollbackTime));
table.getActiveTimeline().saveAsComplete(
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Rollback of Commits " + commitsToRollback + " is complete");
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
LOG.info("Cleaning up older rollback meta files");
// Cleanup of older cleaner meta files
// TODO - make the commit archival generic and archive rollback metadata
FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
table.getActiveTimeline().getRollbackTimeline().getInstants());
}
}
}

View File

@@ -18,13 +18,14 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkConfigUtils;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -49,6 +50,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
@@ -56,8 +58,6 @@ import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieCommitArchiveLog;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import com.codahale.metrics.Timer;
import org.apache.hudi.table.action.commit.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -65,18 +65,17 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.Tuple2;
/**
* Hoodie Write Client helps you build tables on HDFS [insert()] and then perform efficient mutations on an HDFS
* table [upsert()]
@@ -505,13 +504,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
/**
* Rollback the state to the savepoint. WARNING: This rollsback recent commits and deleted data files. Queries
* Restore the state to the savepoint. WARNING: This rollsback recent commits and deleted data files. Queries
* accessing the files will mostly fail. This should be done during a downtime.
*
* @param savepointTime - savepoint time to rollback to
* @return true if the savepoint was rollecback to successfully
*/
public boolean rollbackToSavepoint(String savepointTime) {
public boolean restoreToSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
@@ -544,103 +543,60 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
/**
* Rollback the (inflight/committed) record changes with the given commit time. Three steps: (1) Atomically unpublish
* this commit (2) clean indexing data, (3) clean new generated parquet files. (4) Finally delete .commit or .inflight
* file.
* Rollback the inflight record changes with the given commit time.
*
* @param instantTime Instant time of the commit
* @return {@code true} If rollback the record changes successfully. {@code false} otherwise
* @param commitInstantTime Instant time of the commit
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
public boolean rollback(final String instantTime) throws HoodieRollbackException {
rollbackInternal(instantTime);
return true;
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
final Timer.Context context = this.metrics.getRollbackCtx();
try {
HoodieTable<T> table = HoodieTable.create(config, jsc);
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
.filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
HoodieRollbackMetadata rollbackMetadata = table.rollback(jsc, rollbackInstantTime, commitInstantOpt.get(), true);
if (context != null) {
long durationInMs = metrics.getDurationInMs(context.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
}
return true;
} else {
LOG.info("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
return false;
}
} catch (Exception e) {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e);
}
}
/**
* NOTE : This action requires all writers (ingest and compact) to a table to be stopped before proceeding. Revert
* the (inflight/committed) record changes for all commits after the provided @param. Four steps: (1) Atomically
* unpublish this commit (2) clean indexing data, (3) clean new generated parquet/log files and/or append rollback to
* existing log files. (4) Finally delete .commit, .inflight, .compaction.inflight or .compaction.requested file
* the (inflight/committed) record changes for all commits after the provided instant time.
*
* @param instantTime Instant time to which restoration is requested
*/
public void restoreToInstant(final String instantTime) throws HoodieRollbackException {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, jsc);
// Get all the commits on the timeline after the provided commit time
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
.getReverseOrderedInstants()
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
.collect(Collectors.toList());
// Start a rollback instant for all commits to be rolled back
String startRollbackInstant = HoodieActiveTimeline.createNewInstantTime();
// Start the timer
final Timer.Context context = startContext();
Map<String, List<HoodieRollbackStat>> instantsToStats = new HashMap<>();
table.getActiveTimeline().createNewInstant(
new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRollbackInstant));
instantsToRollback.forEach(instant -> {
try {
switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION:
List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant);
instantsToStats.put(instant.getTimestamp(), statsForInstant);
break;
case HoodieTimeline.COMPACTION_ACTION:
// TODO : Get file status and create a rollback stat and file
// TODO : Delete the .aux files along with the instant file, okay for now since the archival process will
// delete these files when it does not see a corresponding instant file under .hoodie
List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant);
instantsToStats.put(instant.getTimestamp(), statsForCompaction);
LOG.info("Deleted compaction instant " + instant);
break;
default:
throw new IllegalArgumentException("invalid action name " + instant.getAction());
}
} catch (IOException io) {
throw new HoodieRollbackException("unable to rollback instant " + instant, io);
}
});
public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws HoodieRestoreException {
LOG.info("Begin restore to instant " + instantTime);
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context context = metrics.getRollbackCtx();
try {
finishRestore(context, Collections.unmodifiableMap(instantsToStats),
instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
startRollbackInstant, instantTime);
} catch (IOException io) {
throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
}
}
private Timer.Context startContext() {
return metrics.getRollbackCtx();
}
private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,
List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
HoodieTable<T> table = HoodieTable.create(config, jsc);
Option<Long> durationInMs = Option.empty();
long numFilesDeleted = 0L;
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
List<HoodieRollbackStat> stats = commitToStat.getValue();
numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
}
if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
}
HoodieRestoreMetadata restoreMetadata =
TimelineMetadataUtils.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats);
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime),
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
LOG.info("Commits " + commitsToRollback + " rollback is complete. Restored table to " + restoreToInstant);
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
LOG.info("Cleaning up older restore meta files");
// Cleanup of older cleaner meta files
// TODO - make the commit archival generic and archive rollback metadata
FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
table.getActiveTimeline().getRestoreTimeline().getInstants());
HoodieTable<T> table = HoodieTable.create(config, jsc);
HoodieRestoreMetadata restoreMetadata = table.restore(jsc, restoreInstantTime, instantTime);
if (context != null) {
final long durationInMs = metrics.getDurationInMs(context.stop());
final long totalFilesDeleted = restoreMetadata.getHoodieRestoreMetadata().values().stream()
.flatMap(Collection::stream)
.mapToLong(HoodieRollbackMetadata::getTotalFilesDeleted)
.sum();
metrics.updateRollbackMetrics(durationInMs, totalFilesDeleted);
}
return restoreMetadata;
} catch (Exception e) {
throw new HoodieRestoreException("Failed to restore to " + instantTime, e);
}
}
@@ -835,11 +791,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
// inflight compaction - Needs to rollback first deleting new parquet files before we run compaction.
rollbackInflightCompaction(inflightInstant, table);
// refresh table
metaClient = createMetaClient(true);
table = HoodieTable.create(metaClient, config, jsc);
metaClient.reloadActiveTimeline();
pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
}
@@ -914,9 +867,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
* @param inflightInstant Inflight Compaction Instant
* @param table Hoodie Table
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException {
table.rollback(jsc, inflightInstant, false);
// Revert instant state file
public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) {
table.rollback(jsc, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}

View File

@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.exception;
public class HoodieRestoreException extends HoodieException {
public HoodieRestoreException(String msg, Throwable e) {
super(msg, e);
}
}

View File

@@ -22,10 +22,10 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.ParquetReaderIterator;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -33,9 +33,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
@@ -56,8 +54,8 @@ import org.apache.hudi.table.action.commit.InsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.UpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.rollback.RollbackHelper;
import org.apache.hudi.table.rollback.RollbackRequest;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroParquetReader;
@@ -68,12 +66,10 @@ import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
@@ -202,69 +198,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants)
throws IOException {
long startTime = System.currentTimeMillis();
List<HoodieRollbackStat> stats = new ArrayList<>();
HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
if (instant.isCompleted()) {
LOG.info("Unpublishing instant " + instant);
instant = activeTimeline.revertToInflight(instant);
}
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
// deleting the timeline file
if (!instant.isRequested()) {
String commit = instant.getTimestamp();
// delete all the data files for this commit
LOG.info("Clean out all parquet files generated for commit: " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(instant);
//TODO: We need to persist this as rollback workload and use it in case of partial failures
stats = new RollbackHelper(metaClient, config).performRollback(jsc, instant, rollbackRequests);
}
// Delete Inflight instant if enabled
deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant);
LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return stats;
public HoodieRollbackMetadata rollback(JavaSparkContext jsc, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
return new CopyOnWriteRollbackActionExecutor(jsc, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
throws IOException {
return FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()).stream().map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback))
.collect(Collectors.toList());
}
/**
* Delete Inflight instant if enabled.
*
* @param deleteInstant Enable Deletion of Inflight instant
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
*/
protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
HoodieInstant instantToBeDeleted) {
// Remove marker files always on rollback
deleteMarkerDir(instantToBeDeleted.getTimestamp());
// Remove the rolled back inflight commits
if (deleteInstant) {
LOG.info("Deleting instant=" + instantToBeDeleted);
activeTimeline.deletePending(instantToBeDeleted);
if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Delete corresponding requested instant
instantToBeDeleted = new HoodieInstant(State.REQUESTED, instantToBeDeleted.getAction(),
instantToBeDeleted.getTimestamp());
activeTimeline.deletePending(instantToBeDeleted);
}
LOG.info("Deleted pending commit " + instantToBeDeleted);
} else {
LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
}
public HoodieRestoreMetadata restore(JavaSparkContext jsc, String restoreInstantTime, String instantToRestore) {
return new CopyOnWriteRestoreActionExecutor(jsc, config, this, restoreInstantTime, instantToRestore).execute();
}
enum BucketType {
@@ -296,8 +235,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
/**
* Helper class for a small file's location and its actual size on disk.
*/

View File

@@ -19,22 +19,17 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieIOException;
@@ -46,21 +41,16 @@ import org.apache.hudi.table.action.deltacommit.InsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.InsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.UpsertDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.UpsertPreppedDeltaCommitActionExecutor;
import org.apache.hudi.table.action.restore.MergeOnReadRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.compact.HoodieMergeOnReadTableCompactor;
import org.apache.hudi.table.rollback.RollbackHelper;
import org.apache.hudi.table.rollback.RollbackRequest;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -172,146 +162,15 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
}
@Override
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant,
boolean deleteInstants) throws IOException {
long startTime = System.currentTimeMillis();
String commit = instant.getTimestamp();
LOG.error("Rolling back instant " + instant);
// Atomically un-publish all non-inflight commits
if (instant.isCompleted()) {
LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants);
instant = this.getActiveTimeline().revertToInflight(instant);
}
List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
// At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
// feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback
// (commitToRollback).
// NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
// required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
// Atomically un-publish all non-inflight commits
// Atomically un-publish all non-inflight commits
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
// deleting the timeline file
if (!instant.isRequested()) {
LOG.info("Unpublished " + commit);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, instant);
// TODO: We need to persist this as rollback workload and use it in case of partial failures
allRollbackStats = new RollbackHelper(metaClient, config).performRollback(jsc, instant, rollbackRequests);
}
// Delete Inflight instants if enabled
deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant);
LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return allRollbackStats;
public HoodieRollbackMetadata rollback(JavaSparkContext jsc,
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
return new MergeOnReadRollbackActionExecutor(jsc, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
}
/**
* Generate all rollback requests that we need to perform for rolling back this action without actually performing
* rolling back.
*
* @param jsc JavaSparkContext
* @param instantToRollback Instant to Rollback
* @return list of rollback requests
* @throws IOException
*/
private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback)
throws IOException {
String commit = instantToRollback.getTimestamp();
List<String> partitions = FSUtils.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
LOG.info(
"Rolling back commit action. There are higher delta commits. So only rolling back this instant");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
break;
case HoodieTimeline.COMPACTION_ACTION:
// If there is no delta commit present after the current commit (if compaction), no action, else we
// need to make sure that a compaction commit rollback also deletes any log files written as part of the
// succeeding deltacommit.
boolean higherDeltaCommits =
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty();
if (higherDeltaCommits) {
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
// and has not yet finished. In this scenario we should delete only the newly created parquet files
// and not corresponding base commit log files created with this as baseCommit since updates would
// have been written to the log files.
LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback));
} else {
// No deltacommits present after this compaction commit (inflight or requested). In this case, we
// can also delete any log files that were created with this compaction commit as base
// commit.
LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
+ " log files");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
// In this scenario, we delete all the parquet files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base parquet file gets deleted.
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
metaClient.getCommitTimeline()
.getInstantDetails(
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
.get(),
HoodieCommitMetadata.class);
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata));
}
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
}
default:
break;
}
return partitionRollbackRequests.iterator();
}).filter(Objects::nonNull).collect();
public HoodieRestoreMetadata restore(JavaSparkContext jsc, String restoreInstantTime, String instantToRestore) {
return new MergeOnReadRestoreActionExecutor(jsc, config, this, restoreInstantTime, instantToRestore).execute();
}
@Override
@@ -320,39 +179,4 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
// delegate to base class for MOR tables
super.finalizeWrite(jsc, instantTs, stats);
}
private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata) {
ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the parquet and not the requested, hence get the
// baseCommit always by listing the file slice
Map<String, String> fileIdToBaseCommitTimeForLogMap = this.getSliceView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
if (validForRollback) {
// For sanity, log instant time can never be less than base-commit on which we are rolling back
ValidationUtils
.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
}
return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
// Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
// to delete and we should not step on it
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
}).map(wStat -> {
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
baseCommitTime, rollbackInstant);
}).collect(Collectors.toList());
}
}

View File

@@ -23,10 +23,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuard.FileVisibility;
@@ -350,12 +351,27 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
public abstract HoodieCleanMetadata clean(JavaSparkContext jsc, String cleanInstantTime);
/**
* Rollback the (inflight/committed) record changes with the given commit time. Four steps: (1) Atomically unpublish
* this commit (2) clean indexing data (3) clean new generated parquet files / log blocks (4) Finally, delete
* .<action>.commit or .<action>.inflight file if deleteInstants = true
* Rollback the (inflight/committed) record changes with the given commit time.
* <pre>
* Three steps:
* (1) Atomically unpublish this commit
* (2) clean indexing data
* (3) clean new generated parquet files.
* (4) Finally delete .commit or .inflight file, if deleteInstants = true
* </pre>
*/
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, HoodieInstant instant, boolean deleteInstants)
throws IOException;
public abstract HoodieRollbackMetadata rollback(JavaSparkContext jsc,
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants);
/**
* Restore the table to the given instant. Note that this is a admin table recovery operation
* that would cause any running queries that are accessing file slices written after the instant to fail.
*/
public abstract HoodieRestoreMetadata restore(JavaSparkContext jsc,
String restoreInstantTime,
String instantToRestore);
/**
* Finalize the written data onto storage. Perform any final cleanups.
@@ -364,8 +380,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* @param stats List of HoodieWriteStats
* @throws HoodieIOException if some paths can't be finalized on storage
*/
public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException {
cleanFailedWrites(jsc, instantTs, stats, config.getConsistencyGuardConfig().isConsistencyCheckEnabled());
}
@@ -374,7 +389,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
*
* @param instantTs Instant Time
*/
protected void deleteMarkerDir(String instantTs) {
public void deleteMarkerDir(String instantTs) {
try {
FileSystem fs = getMetaClient().getFs();
Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
@@ -473,8 +488,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* @param groupByPartition Files grouped by partition
* @param visibility Appear/Disappear
*/
private void waitForAllFiles(JavaSparkContext jsc, Map<String, List<Pair<String, String>>> groupByPartition,
FileVisibility visibility) {
private void waitForAllFiles(JavaSparkContext jsc, Map<String, List<Pair<String, String>>> groupByPartition, FileVisibility visibility) {
// This will either ensure all files to be deleted are present.
boolean checkPassed =
jsc.parallelize(new ArrayList<>(groupByPartition.entrySet()), config.getFinalizeWriteParallelism())
@@ -486,8 +500,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
}
}
private boolean waitForCondition(String partitionPath, Stream<Pair<String, String>> partitionFilePaths,
FileVisibility visibility) {
private boolean waitForCondition(String partitionPath, Stream<Pair<String, String>> partitionFilePaths, FileVisibility visibility) {
final FileSystem fileSystem = metaClient.getRawFs();
List<String> fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
try {

View File

@@ -57,6 +57,10 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class);
public CleanActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
super(jsc, config, table, instantTime);
}
/**
* Generates List of files to be cleaned.
*
@@ -172,10 +176,6 @@ public class CleanActionExecutor extends BaseActionExecutor<HoodieCleanMetadata>
}).collect(Collectors.toList());
}
public CleanActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, String instantTime) {
super(jsc, config, table, instantTime);
}
/**
* Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
*

View File

@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseRestoreActionExecutor extends BaseActionExecutor<HoodieRestoreMetadata> {
private static final Logger LOG = LogManager.getLogger(BaseRestoreActionExecutor.class);
private final String restoreInstantTime;
public BaseRestoreActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
String restoreInstantTime) {
super(jsc, config, table, instantTime);
this.restoreInstantTime = restoreInstantTime;
}
@Override
public HoodieRestoreMetadata execute() {
HoodieTimer restoreTimer = new HoodieTimer();
restoreTimer.startTimer();
// Get all the commits on the timeline after the provided commit time
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
.getReverseOrderedInstants()
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), restoreInstantTime))
.collect(Collectors.toList());
Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
table.getActiveTimeline().createNewInstant(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime));
instantsToRollback.forEach(instant -> {
instantToMetadata.put(instant.getTimestamp(), Collections.singletonList(rollbackInstant(instant)));
LOG.info("Deleted instant " + instant);
});
try {
return finishRestore(instantToMetadata,
instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
restoreTimer.endTimer()
);
} catch (IOException io) {
throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
}
}
protected abstract HoodieRollbackMetadata rollbackInstant(HoodieInstant rollbackInstant);
private HoodieRestoreMetadata finishRestore(Map<String, List<HoodieRollbackMetadata>> instantToMetadata,
List<String> instantsRolledBack,
long durationInMs) throws IOException {
HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.convertRestoreMetadata(
instantTime, durationInMs, instantsRolledBack, instantToMetadata);
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime);
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
LOG.info("Cleaning up older restore meta files");
// Cleanup of older cleaner meta files
// TODO - make the commit archival generic and archive rollback metadata
FSUtils.deleteOlderRollbackMetaFiles(
table.getMetaClient().getFs(),
table.getMetaClient().getMetaPath(),
table.getActiveTimeline().getRestoreTimeline().getInstants()
);
}
return restoreMetadata;
}
}

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.spark.api.java.JavaSparkContext;
public class CopyOnWriteRestoreActionExecutor extends BaseRestoreActionExecutor {
public CopyOnWriteRestoreActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
String restoreInstantTime) {
super(jsc, config, table, instantTime, restoreInstantTime);
}
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
CopyOnWriteRollbackActionExecutor rollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(
jsc,
config,
table,
HoodieActiveTimeline.createNewInstantTime(),
instantToRollback,
true,
true);
if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback);
}
return rollbackActionExecutor.execute();
}
}

View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.restore;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.spark.api.java.JavaSparkContext;
public class MergeOnReadRestoreActionExecutor extends BaseRestoreActionExecutor {
public MergeOnReadRestoreActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
String restoreInstantTime) {
super(jsc, config, table, instantTime, restoreInstantTime);
}
@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
table.getMetaClient().reloadActiveTimeline();
MergeOnReadRollbackActionExecutor rollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
jsc,
config,
table,
HoodieActiveTimeline.createNewInstantTime(),
instantToRollback,
true,
true);
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMPACTION_ACTION:
// TODO : Get file status and create a rollback stat and file
// TODO : Delete the .aux files along with the instant file, okay for now since the archival process will
// delete these files when it does not see a corresponding instant file under .hoodie
return rollbackActionExecutor.execute();
default:
throw new IllegalArgumentException("invalid action name " + instantToRollback.getAction());
}
}
}

View File

@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<HoodieRollbackMetadata> {
private static final Logger LOG = LogManager.getLogger(BaseRollbackActionExecutor.class);
protected final HoodieInstant instantToRollback;
protected final boolean deleteInstants;
protected final boolean skipTimelinePublish;
public BaseRollbackActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
HoodieInstant instantToRollback,
boolean deleteInstants) {
this(jsc, config, table, instantTime, instantToRollback, deleteInstants, false);
}
public BaseRollbackActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipTimelinePublish) {
super(jsc, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.deleteInstants = deleteInstants;
this.skipTimelinePublish = skipTimelinePublish;
}
protected abstract List<HoodieRollbackStat> executeRollback() throws IOException;
@Override
public HoodieRollbackMetadata execute() {
HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
instantTime,
Option.of(rollbackTimer.endTimer()),
Collections.singletonList(instantToRollback.getTimestamp()),
doRollbackAndGetStats());
if (!skipTimelinePublish) {
finishRollback(rollbackMetadata);
}
return rollbackMetadata;
}
public List<HoodieRollbackStat> doRollbackAndGetStats() {
final String instantTimeToRollback = instantToRollback.getTimestamp();
final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
&& !instantToRollback.isCompleted();
HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// Check if any of the commits is a savepoint - do not allow rollback on those commits
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
savepoints.forEach(s -> {
if (s.contains(instantTimeToRollback)) {
throw new HoodieRollbackException(
"Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s);
}
});
if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) {
LOG.info("No commits to rollback " + instantTimeToRollback);
}
// Make sure only the last n commits are being rolled back
// If there is a commit in-between or after that is not rolled back, then abort
if (!isPendingCompaction) {
if ((instantTimeToRollback != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(instantTimeToRollback, Integer.MAX_VALUE).empty()) {
throw new HoodieRollbackException(
"Found commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
}
List<String> inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
if ((instantTimeToRollback != null) && !inflights.isEmpty()
&& (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) {
throw new HoodieRollbackException(
"Found in-flight commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
}
}
try {
List<HoodieRollbackStat> stats = executeRollback();
LOG.info("Rolled back inflight instant " + instantTimeToRollback);
if (!isPendingCompaction) {
if (!table.getIndex().rollbackCommit(instantTimeToRollback)) {
throw new HoodieRollbackException("Rollback index changes failed, for time :" + instantTimeToRollback);
}
LOG.info("Index rolled back for commits " + instantTimeToRollback);
}
return stats;
} catch (IOException e) {
throw new HoodieIOException("Unable to execute rollback ", e);
}
}
protected void finishRollback(HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
try {
table.getActiveTimeline().createNewInstant(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, instantTime));
table.getActiveTimeline().saveAsComplete(
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, instantTime),
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
LOG.info("Cleaning up older rollback meta files");
FSUtils.deleteOlderRollbackMetaFiles(table.getMetaClient().getFs(),
table.getMetaClient().getMetaPath(),
table.getActiveTimeline().getRollbackTimeline().getInstants());
}
} catch (IOException e) {
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
}
}
/**
* Delete Inflight instant if enabled.
*
* @param deleteInstant Enable Deletion of Inflight instant
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
*/
protected void deleteInflightAndRequestedInstant(boolean deleteInstant,
HoodieActiveTimeline activeTimeline,
HoodieInstant instantToBeDeleted) {
// Remove marker files always on rollback
table.deleteMarkerDir(instantToBeDeleted.getTimestamp());
// Remove the rolled back inflight commits
if (deleteInstant) {
LOG.info("Deleting instant=" + instantToBeDeleted);
activeTimeline.deletePending(instantToBeDeleted);
if (instantToBeDeleted.isInflight() && !table.getMetaClient().getTimelineLayoutVersion().isNullVersion()) {
// Delete corresponding requested instant
instantToBeDeleted = new HoodieInstant(HoodieInstant.State.REQUESTED, instantToBeDeleted.getAction(),
instantToBeDeleted.getTimestamp());
activeTimeline.deletePending(instantToBeDeleted);
}
LOG.info("Deleted pending commit " + instantToBeDeleted);
} else {
LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
}
}
}

View File

@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecutor {
private static final Logger LOG = LogManager.getLogger(CopyOnWriteRollbackActionExecutor.class);
public CopyOnWriteRollbackActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(jsc, config, table, instantTime, commitInstant, deleteInstants);
}
public CopyOnWriteRollbackActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish) {
super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish);
}
@Override
protected List<HoodieRollbackStat> executeRollback() throws IOException {
long startTime = System.currentTimeMillis();
List<HoodieRollbackStat> stats = new ArrayList<>();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant resolvedInstant = instantToRollback;
if (instantToRollback.isCompleted()) {
LOG.info("Unpublishing instant " + instantToRollback);
resolvedInstant = activeTimeline.revertToInflight(instantToRollback);
}
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
// deleting the timeline file
if (!resolvedInstant.isRequested()) {
// delete all the data files for this commit
LOG.info("Clean out all parquet files generated for commit: " + resolvedInstant);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(resolvedInstant);
//TODO: We need to persist this as rollback workload and use it in case of partial failures
stats = new RollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests);
}
// Delete Inflight instant if enabled
deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, resolvedInstant);
LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return stats;
}
private List<RollbackRequest> generateRollbackRequests(HoodieInstant instantToRollback)
throws IOException {
return FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning()).stream()
.map(partitionPath -> RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback))
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecutor {
private static final Logger LOG = LogManager.getLogger(MergeOnReadRollbackActionExecutor.class);
public MergeOnReadRollbackActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(jsc, config, table, instantTime, commitInstant, deleteInstants);
}
public MergeOnReadRollbackActionExecutor(JavaSparkContext jsc,
HoodieWriteConfig config,
HoodieTable<?> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish) {
super(jsc, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish);
}
@Override
protected List<HoodieRollbackStat> executeRollback() throws IOException {
long startTime = System.currentTimeMillis();
LOG.error("Rolling back instant " + instantToRollback.getTimestamp());
HoodieInstant resolvedInstant = instantToRollback;
// Atomically un-publish all non-inflight commits
if (instantToRollback.isCompleted()) {
LOG.error("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants);
resolvedInstant = table.getActiveTimeline().revertToInflight(instantToRollback);
}
List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
// At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
// feature that is expensive. To perform nested rollbacks, initiate multiple requests of client.rollback
// (commitToRollback).
// NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
// required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
// Atomically un-publish all non-inflight commits
// Atomically un-publish all non-inflight commits
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
// deleting the timeline file
if (!resolvedInstant.isRequested()) {
LOG.info("Unpublished " + resolvedInstant);
List<RollbackRequest> rollbackRequests = generateRollbackRequests(jsc, resolvedInstant);
// TODO: We need to persist this as rollback workload and use it in case of partial failures
allRollbackStats = new RollbackHelper(table.getMetaClient(), config).performRollback(jsc, resolvedInstant, rollbackRequests);
}
// Delete Inflight instants if enabled
deleteInflightAndRequestedInstant(deleteInstants, table.getActiveTimeline(), resolvedInstant);
LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return allRollbackStats;
}
/**
* Generate all rollback requests that we need to perform for rolling back this action without actually performing
* rolling back.
*
* @param jsc JavaSparkContext
* @param instantToRollback Instant to Rollback
* @return list of rollback requests
* @throws IOException
*/
private List<RollbackRequest> generateRollbackRequests(JavaSparkContext jsc, HoodieInstant instantToRollback)
throws IOException {
String commit = instantToRollback.getTimestamp();
List<String> partitions = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), table.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1);
return jsc.parallelize(partitions, Math.min(partitions.size(), sparkPartitions)).flatMap(partitionPath -> {
HoodieActiveTimeline activeTimeline = table.getActiveTimeline().reload();
List<RollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
LOG.info("Rolling back commit action. There are higher delta commits. So only rolling back this instant");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
break;
case HoodieTimeline.COMPACTION_ACTION:
// If there is no delta commit present after the current commit (if compaction), no action, else we
// need to make sure that a compaction commit rollback also deletes any log files written as part of the
// succeeding deltacommit.
boolean higherDeltaCommits =
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty();
if (higherDeltaCommits) {
// Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled
// and has not yet finished. In this scenario we should delete only the newly created parquet files
// and not corresponding base commit log files created with this as baseCommit since updates would
// have been written to the log files.
LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath, instantToRollback));
} else {
// No deltacommits present after this compaction commit (inflight or requested). In this case, we
// can also delete any log files that were created with this compaction commit as base
// commit.
LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and"
+ " log files");
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
// --------------------------------------------------------------------------------------------------
// (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal
// --------------------------------------------------------------------------------------------------
// (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In
// this scenario we would want to delete these log files.
// (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario,
// HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks.
// (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is
// being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and
// and hence will end up deleting these log files. This is done so there are no orphan log files
// lying around.
// (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions
// taken in this scenario is a combination of (A.2) and (A.3)
// ---------------------------------------------------------------------------------------------------
// (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal
// ---------------------------------------------------------------------------------------------------
// (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries.
// In this scenario, we delete all the parquet files written for the failed commit.
// (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In
// this scenario, perform (A.1) and for updates written to log files, write rollback blocks.
// (B.3) Rollback triggered for first commit - Same as (B.1)
// (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files
// as well if the base parquet file gets deleted.
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
table.getMetaClient().getCommitTimeline()
.getInstantDetails(
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
.get(),
HoodieCommitMetadata.class);
// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata));
}
break;
} catch (IOException io) {
throw new UncheckedIOException("Failed to collect rollback actions for commit " + commit, io);
}
default:
break;
}
return partitionRollbackRequests.iterator();
}).filter(Objects::nonNull).collect();
}
private List<RollbackRequest> generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant,
HoodieCommitMetadata commitMetadata) {
ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
// wStat.getPrevCommit() might not give the right commit time in the following
// scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the parquet and not the requested, hence get the
// baseCommit always by listing the file slice
Map<String, String> fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
// Filter out stats without prevCommit since they are all inserts
boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT))
&& (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId());
if (validForRollback) {
// For sanity, log instant time can never be less than base-commit on which we are rolling back
ValidationUtils
.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
}
return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
// Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
// to delete and we should not step on it
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
}).map(wStat -> {
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
baseCommitTime, rollbackInstant);
}).collect(Collectors.toList());
}
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.rollback;
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.table.rollback;
package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;

View File

@@ -132,7 +132,7 @@ public class TestClientRollback extends TestHoodieClientBase {
// rolling back to a non existent savepoint must not succeed
try {
client.rollbackToSavepoint("001");
client.restoreToSavepoint("001");
fail("Rolling back to non-existent savepoint should not be allowed");
} catch (HoodieRollbackException e) {
// this is good
@@ -140,24 +140,18 @@ public class TestClientRollback extends TestHoodieClientBase {
// rollback to savepoint 002
HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
client.rollbackToSavepoint(savepoint.getTimestamp());
client.restoreToSavepoint(savepoint.getTimestamp());
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.create(metaClient, getConfig(), jsc);
final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
dataFiles = partitionPaths.stream().flatMap(s -> {
return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList());
assertEquals("The data files for commit 002 be available", 3, dataFiles.size());
dataFiles = partitionPaths.stream().flatMap(s -> {
return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"));
}).collect(Collectors.toList());
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"))).collect(Collectors.toList());
assertEquals("The data files for commit 003 should be rolled back", 0, dataFiles.size());
dataFiles = partitionPaths.stream().flatMap(s -> {
return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"));
}).collect(Collectors.toList());
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals("The data files for commit 004 should be rolled back", 0, dataFiles.size());
}
}

View File

@@ -900,7 +900,7 @@ public class TestCleaner extends TestHoodieClientBase {
table.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
metaClient.reloadActiveTimeline();
table.rollback(jsc, new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
}

View File

@@ -51,15 +51,12 @@ public class TimelineMetadataUtils {
private static final Integer DEFAULT_VERSION = 1;
public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime, Option<Long> durationInMs,
List<String> commits, Map<String, List<HoodieRollbackStat>> commitToStats) {
Map<String, List<HoodieRollbackMetadata>> commitToStatsMap = new HashMap<>();
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
commitToStatsMap.put(commitToStat.getKey(),
Collections.singletonList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue())));
}
return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits,
Collections.unmodifiableMap(commitToStatsMap), DEFAULT_VERSION);
public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTime,
long durationInMs,
List<String> commits,
Map<String, List<HoodieRollbackMetadata>> instantToRollbackMetadata) {
return new HoodieRestoreMetadata(startRestoreTime, durationInMs, commits,
Collections.unmodifiableMap(instantToRollbackMetadata), DEFAULT_VERSION);
}
public static HoodieRollbackMetadata convertRollbackMetadata(String startRollbackTime, Option<Long> durationInMs,

View File

@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
@@ -115,8 +116,8 @@ public class TestInLineFileSystem {
}
}
// @Test
// Disabling flaky test for now https://issues.apache.org/jira/browse/HUDI-786
@Test
@Ignore // Disabling flaky test for now https://issues.apache.org/jira/browse/HUDI-786
public void testFileSystemApis() throws IOException {
OuterPathInfo outerPathInfo = generateOuterFileAndGetInfo(1000);
Path inlinePath = FileSystemTestUtils.getPhantomFile(outerPathInfo.outerPath, outerPathInfo.startOffset, outerPathInfo.length);