Refactor HoodieTable Rollback to write one rollback instant for a batch of commits to rollback
This commit is contained in:
@@ -19,10 +19,10 @@ package com.uber.hoodie;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.uber.hoodie.avro.model.HoodieCleanMetadata;
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
|
||||
import com.uber.hoodie.avro.model.HoodieRestoreMetadata;
|
||||
import com.uber.hoodie.avro.model.HoodieRollbackMetadata;
|
||||
import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
|
||||
import com.uber.hoodie.common.HoodieCleanStat;
|
||||
@@ -65,13 +65,13 @@ import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -758,7 +758,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
logger.info("Rolling back commits " + commitsToRollback);
|
||||
|
||||
rollback(commitsToRollback);
|
||||
restoreToInstant(savepointTime);
|
||||
|
||||
// Make sure the rollback was successful
|
||||
Optional<HoodieInstant> lastInstant = activeTimeline.reload().getCommitsAndCompactionTimeline()
|
||||
@@ -776,7 +776,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
* files. (4) Finally delete .commit or .inflight file,
|
||||
*/
|
||||
public boolean rollback(final String commitTime) throws HoodieRollbackException {
|
||||
rollback(Lists.newArrayList(commitTime));
|
||||
rollbackInternal(commitTime);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -787,137 +787,190 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
* files and/or append rollback to existing log files. (4) Finally delete .commit, .inflight, .compaction.inflight
|
||||
* or .compaction.requested file
|
||||
*/
|
||||
public void restoreToCommit(final String commitTime) throws HoodieRollbackException {
|
||||
public void restoreToInstant(final String instantTime) throws HoodieRollbackException {
|
||||
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||
// Get all the commits on the timeline after the provided commit time
|
||||
List<HoodieInstant> commitsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants()
|
||||
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), commitTime))
|
||||
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants()
|
||||
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
|
||||
.collect(Collectors.toList());
|
||||
// reverse the commits to descending order of commit time
|
||||
Collections.reverse(commitsToRollback);
|
||||
commitsToRollback.stream().forEach(instant -> {
|
||||
switch (instant.getAction()) {
|
||||
case HoodieTimeline.COMMIT_ACTION:
|
||||
case HoodieTimeline.DELTA_COMMIT_ACTION:
|
||||
rollback(instant.getTimestamp());
|
||||
break;
|
||||
case HoodieTimeline.COMPACTION_ACTION:
|
||||
if (instant.isRequested()) {
|
||||
// TODO : Get file status and create a rollback stat and file
|
||||
// TODO : Delete the .aux files along with the instant file, okay for now since the archival process will
|
||||
// delete these files when it does not see a corresponding instant file under .hoodie
|
||||
deleteRequestedCompaction(instant.getTimestamp());
|
||||
logger.info("Deleted pending scheduled compaction " + instant.getTimestamp());
|
||||
} else {
|
||||
rollback(instant.getTimestamp());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action name " + instant.getAction());
|
||||
}
|
||||
Collections.reverse(instantsToRollback);
|
||||
// Start a rollback instant for all commits to be rolled back
|
||||
String startRollbackInstant = startInstant();
|
||||
// Start the timer
|
||||
final Timer.Context context = startContext();
|
||||
ImmutableMap.Builder<String, List<HoodieRollbackStat>> instantsToStats =
|
||||
ImmutableMap.builder();
|
||||
instantsToRollback.stream().forEach(instant -> {
|
||||
try {
|
||||
// Ensure unique rollback instants for seconds granularity
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new HoodieRollbackException("unable to rollback instant " + instant, e);
|
||||
switch (instant.getAction()) {
|
||||
case HoodieTimeline.COMMIT_ACTION:
|
||||
case HoodieTimeline.DELTA_COMMIT_ACTION:
|
||||
List<HoodieRollbackStat> statsForInstant = doRollbackAndGetStats(instant.getTimestamp());
|
||||
instantsToStats.put(instant.getTimestamp(), statsForInstant);
|
||||
break;
|
||||
case HoodieTimeline.COMPACTION_ACTION:
|
||||
if (instant.isRequested()) {
|
||||
// TODO : Get file status and create a rollback stat and file
|
||||
// TODO : Delete the .aux files along with the instant file, okay for now since the archival process will
|
||||
// delete these files when it does not see a corresponding instant file under .hoodie
|
||||
deleteRequestedCompaction(instant.getTimestamp());
|
||||
logger.info("Deleted pending scheduled compaction " + instant.getTimestamp());
|
||||
} else {
|
||||
List<HoodieRollbackStat> statsForCompaction = doRollbackAndGetStats(instant.getTimestamp());
|
||||
instantsToStats.put(instant.getTimestamp(), statsForCompaction);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid action name " + instant.getAction());
|
||||
}
|
||||
} catch (IOException io) {
|
||||
throw new HoodieRollbackException("unable to rollback instant " + instant, io);
|
||||
}
|
||||
});
|
||||
try {
|
||||
finishRestore(context, instantsToStats.build(),
|
||||
instantsToRollback.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()),
|
||||
startRollbackInstant, instantTime);
|
||||
} catch (IOException io) {
|
||||
throw new HoodieRollbackException("unable to rollback instants " + instantsToRollback, io);
|
||||
}
|
||||
}
|
||||
|
||||
private void rollback(List<String> commitsToRollback) {
|
||||
if (commitsToRollback.isEmpty()) {
|
||||
logger.info("List of commits to rollback is empty");
|
||||
return;
|
||||
}
|
||||
private String startInstant() {
|
||||
return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
|
||||
}
|
||||
|
||||
final Timer.Context context = metrics.getRollbackCtx();
|
||||
String startRollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
|
||||
private Timer.Context startContext() {
|
||||
return metrics.getRollbackCtx();
|
||||
}
|
||||
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
private List<HoodieRollbackStat> doRollbackAndGetStats(final String commitToRollback) throws
|
||||
IOException {
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||
Set<String> pendingCompactions =
|
||||
table.getActiveTimeline().filterPendingCompactionTimeline().getInstants()
|
||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
|
||||
HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
|
||||
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
|
||||
|
||||
// Check if any of the commits is a savepoint - do not allow rollback on those commits
|
||||
List<String> savepoints = table.getCompletedSavepointTimeline().getInstants()
|
||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
commitsToRollback.forEach(s -> {
|
||||
if (savepoints.contains(s)) {
|
||||
savepoints.stream().forEach(s -> {
|
||||
if (s.contains(commitToRollback)) {
|
||||
throw new HoodieRollbackException(
|
||||
"Could not rollback a savepointed commit. Delete savepoint first before rolling back"
|
||||
+ s);
|
||||
}
|
||||
});
|
||||
|
||||
if (commitTimeline.empty() && inflightCommitTimeline.empty()) {
|
||||
// nothing to rollback
|
||||
logger.info("No commits to rollback " + commitToRollback);
|
||||
}
|
||||
|
||||
// Make sure only the last n commits are being rolled back
|
||||
// If there is a commit in-between or after that is not rolled back, then abort
|
||||
String lastCommit = commitToRollback;
|
||||
|
||||
if ((lastCommit != null) && !commitTimeline.empty() && !commitTimeline
|
||||
.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {
|
||||
throw new HoodieRollbackException(
|
||||
"Found commits after time :" + lastCommit + ", please rollback greater commits first");
|
||||
}
|
||||
|
||||
List<String> inflights = inflightCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toList());
|
||||
if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) {
|
||||
throw new HoodieRollbackException("Found in-flight commits after time :" + lastCommit
|
||||
+ ", please rollback greater commits first");
|
||||
}
|
||||
|
||||
List<HoodieRollbackStat> stats = table.rollback(jsc, commitToRollback, true);
|
||||
|
||||
logger.info("Deleted inflight commits " + commitToRollback);
|
||||
|
||||
// cleanup index entries
|
||||
if (!index.rollbackCommit(commitToRollback)) {
|
||||
throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback);
|
||||
}
|
||||
logger.info("Index rolled back for commits " + commitToRollback);
|
||||
return stats;
|
||||
}
|
||||
|
||||
private void finishRollback(final Timer.Context context, List<HoodieRollbackStat> rollbackStats,
|
||||
List<String> commitsToRollback, final String startRollbackTime) throws IOException {
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||
Optional<Long> durationInMs = Optional.empty();
|
||||
Long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum();
|
||||
if (context != null) {
|
||||
durationInMs = Optional.of(metrics.getDurationInMs(context.stop()));
|
||||
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
|
||||
}
|
||||
HoodieRollbackMetadata rollbackMetadata = AvroUtils
|
||||
.convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats);
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
|
||||
AvroUtils.serializeRollbackMetadata(rollbackMetadata));
|
||||
logger.info("Commits " + commitsToRollback + " rollback is complete");
|
||||
|
||||
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
|
||||
logger.info("Cleaning up older rollback meta files");
|
||||
// Cleanup of older cleaner meta files
|
||||
// TODO - make the commit archival generic and archive rollback metadata
|
||||
FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
|
||||
table.getActiveTimeline().getRollbackTimeline().getInstants());
|
||||
}
|
||||
}
|
||||
|
||||
private void finishRestore(final Timer.Context context, Map<String, List<HoodieRollbackStat>> commitToStats,
|
||||
List<String> commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException {
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
|
||||
Optional<Long> durationInMs = Optional.empty();
|
||||
Long numFilesDeleted = 0L;
|
||||
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : commitToStats.entrySet()) {
|
||||
List<HoodieRollbackStat> stats = commitToStat.getValue();
|
||||
numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size())
|
||||
.sum();
|
||||
}
|
||||
if (context != null) {
|
||||
durationInMs = Optional.of(metrics.getDurationInMs(context.stop()));
|
||||
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
|
||||
}
|
||||
HoodieRestoreMetadata restoreMetadata = AvroUtils
|
||||
.convertRestoreMetadata(startRestoreTime, durationInMs, commitsToRollback, commitToStats);
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, startRestoreTime),
|
||||
AvroUtils.serializeRestoreMetadata(restoreMetadata));
|
||||
logger.info("Commits " + commitsToRollback + " rollback is complete. Restored dataset to " + restoreToInstant);
|
||||
|
||||
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
|
||||
logger.info("Cleaning up older restore meta files");
|
||||
// Cleanup of older cleaner meta files
|
||||
// TODO - make the commit archival generic and archive rollback metadata
|
||||
FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
|
||||
table.getActiveTimeline().getRestoreTimeline().getInstants());
|
||||
}
|
||||
}
|
||||
|
||||
private void rollbackInternal(String commitToRollback) {
|
||||
if (commitToRollback.isEmpty()) {
|
||||
logger.info("List of commits to rollback is empty");
|
||||
return;
|
||||
}
|
||||
final String startRollbackTime = startInstant();
|
||||
final Timer.Context context = startContext();
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
try {
|
||||
if (commitTimeline.empty() && inflightCommitTimeline.empty()) {
|
||||
// nothing to rollback
|
||||
logger.info("No commits to rollback " + commitsToRollback);
|
||||
}
|
||||
|
||||
// Make sure only the last n commits are being rolled back
|
||||
// If there is a commit in-between or after that is not rolled back, then abort
|
||||
String lastCommit = null;
|
||||
if (!commitsToRollback.isEmpty()) {
|
||||
lastCommit = commitsToRollback.get(commitsToRollback.size() - 1);
|
||||
}
|
||||
|
||||
if ((lastCommit != null) && !commitTimeline.empty() && !commitTimeline
|
||||
.findInstantsAfter(lastCommit, Integer.MAX_VALUE).empty()) {
|
||||
throw new HoodieRollbackException(
|
||||
"Found commits after time :" + lastCommit + ", please rollback greater commits first");
|
||||
}
|
||||
|
||||
List<String> inflights = inflightCommitTimeline.getInstants().map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toList());
|
||||
if ((lastCommit != null) && !inflights.isEmpty() && (inflights.indexOf(lastCommit) != inflights.size() - 1)) {
|
||||
throw new HoodieRollbackException("Found in-flight commits after time :" + lastCommit
|
||||
+ ", please rollback greater commits first");
|
||||
}
|
||||
|
||||
List<HoodieRollbackStat> stats = table.rollback(jsc, commitsToRollback, true);
|
||||
|
||||
logger.info("Deleted inflight commits " + commitsToRollback);
|
||||
|
||||
// cleanup index entries
|
||||
commitsToRollback.forEach(s -> {
|
||||
if (!index.rollbackCommit(s)) {
|
||||
throw new HoodieRollbackException("Rollback index changes failed, for time :" + s);
|
||||
}
|
||||
});
|
||||
logger.info("Index rolled back for commits " + commitsToRollback);
|
||||
|
||||
Optional<Long> durationInMs = Optional.empty();
|
||||
if (context != null) {
|
||||
durationInMs = Optional.of(metrics.getDurationInMs(context.stop()));
|
||||
Long numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size())
|
||||
.sum();
|
||||
metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted);
|
||||
}
|
||||
HoodieRollbackMetadata rollbackMetadata = AvroUtils
|
||||
.convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, stats);
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
|
||||
AvroUtils.serializeRollbackMetadata(rollbackMetadata));
|
||||
logger.info("Commits " + commitsToRollback + " rollback is complete");
|
||||
|
||||
if (!table.getActiveTimeline().getCleanerTimeline().empty()) {
|
||||
logger.info("Cleaning up older rollback meta files");
|
||||
// Cleanup of older cleaner meta files
|
||||
// TODO - make the commit archival generic and archive rollback metadata
|
||||
FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(),
|
||||
table.getActiveTimeline().getRollbackTimeline().getInstants());
|
||||
}
|
||||
List<HoodieRollbackStat> stats = doRollbackAndGetStats(commitToRollback);
|
||||
Map<String, List<HoodieRollbackStat>> statToCommit = new HashMap<>();
|
||||
finishRollback(context, stats, Arrays.asList(commitToRollback), startRollbackTime);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieRollbackException(
|
||||
"Failed to rollback " + config.getBasePath() + " commits " + commitsToRollback, e);
|
||||
"Failed to rollback " + config.getBasePath() + " commits " + commitToRollback, e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1251,7 +1304,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable table) throws IOException {
|
||||
table.rollback(jsc, ImmutableList.copyOf(new String[]{inflightInstant.getTimestamp()}), false);
|
||||
table.rollback(jsc, inflightInstant.getTimestamp(), false);
|
||||
// Revert instant state file
|
||||
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
|
||||
}
|
||||
|
||||
@@ -330,7 +330,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
* Common method used for cleaning out parquet files under a partition path during rollback of a
|
||||
* set of commits
|
||||
*/
|
||||
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, List<String> commits, String
|
||||
protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String commit, String
|
||||
partitionPath)
|
||||
throws IOException {
|
||||
logger.info("Cleaning path " + partitionPath);
|
||||
@@ -338,7 +338,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
PathFilter filter = (path) -> {
|
||||
if (path.toString().contains(".parquet")) {
|
||||
String fileCommitTime = FSUtils.getCommitTime(path.getName());
|
||||
return commits.contains(fileCommitTime);
|
||||
return commit.equals(fileCommitTime);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
@@ -352,28 +352,27 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits, boolean deleteInstants)
|
||||
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
|
||||
throws IOException {
|
||||
String actionType = metaClient.getCommitActionType();
|
||||
HoodieActiveTimeline activeTimeline = this.getActiveTimeline();
|
||||
List<String> inflights = this.getInflightCommitTimeline().getInstants()
|
||||
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
// Atomically unpublish the commits
|
||||
if (!inflights.contains(commit)) {
|
||||
activeTimeline.revertToInflight(new HoodieInstant(false, actionType, commit));
|
||||
}
|
||||
logger.info("Unpublished " + commit);
|
||||
|
||||
// Atomically unpublish all the commits
|
||||
commits.stream().filter(s -> !inflights.contains(s))
|
||||
.map(s -> new HoodieInstant(false, actionType, s))
|
||||
.forEach(activeTimeline::revertToInflight);
|
||||
logger.info("Unpublished " + commits);
|
||||
|
||||
// delete all the data files for all these commits
|
||||
logger.info("Clean out all parquet files generated for commits: " + commits);
|
||||
// delete all the data files for this commit
|
||||
logger.info("Clean out all parquet files generated for commit: " + commit);
|
||||
List<HoodieRollbackStat> stats = jsc.parallelize(FSUtils
|
||||
.getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(),
|
||||
config.shouldAssumeDatePartitioning()))
|
||||
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
|
||||
// Scan all partitions files with this commit time
|
||||
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
|
||||
deleteCleanedFiles(filesToDeletedStatus, commits, partitionPath);
|
||||
deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
|
||||
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
|
||||
.withDeletedFileResults(filesToDeletedStatus).build();
|
||||
}).collect();
|
||||
@@ -381,26 +380,26 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
// clean temporary data files
|
||||
cleanTemporaryDataFiles(jsc);
|
||||
|
||||
// Delete Inflight instants if enabled
|
||||
deleteInflightInstants(deleteInstants, activeTimeline,
|
||||
commits.stream().map(s -> new HoodieInstant(true, actionType, s)).collect(Collectors.toList()));
|
||||
// Delete Inflight instant if enabled
|
||||
deleteInflightInstant(deleteInstants, activeTimeline,
|
||||
new HoodieInstant(true, actionType, commit));
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete Inflight instants if enabled
|
||||
* @param deleteInstants Enable Deletion of Inflight instants
|
||||
* Delete Inflight instant if enabled
|
||||
* @param deleteInstant Enable Deletion of Inflight instant
|
||||
* @param activeTimeline Hoodie active timeline
|
||||
* @param instantsToBeDeleted Instants to be deleted
|
||||
* @param instantToBeDeleted Instant to be deleted
|
||||
*/
|
||||
protected static void deleteInflightInstants(boolean deleteInstants, HoodieActiveTimeline activeTimeline,
|
||||
List<HoodieInstant> instantsToBeDeleted) {
|
||||
protected static void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
|
||||
HoodieInstant instantToBeDeleted) {
|
||||
// Remove the rolled back inflight commits
|
||||
if (deleteInstants) {
|
||||
instantsToBeDeleted.forEach(activeTimeline::deleteInflight);
|
||||
logger.info("Deleted inflight commits " + instantsToBeDeleted);
|
||||
if (deleteInstant) {
|
||||
activeTimeline.deleteInflight(instantToBeDeleted);
|
||||
logger.info("Deleted inflight commit " + instantToBeDeleted);
|
||||
} else {
|
||||
logger.warn("Rollback finished without deleting inflight instant files. Instants=" + instantsToBeDeleted);
|
||||
logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -174,7 +174,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits, boolean deleteInstants)
|
||||
public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
|
||||
throws IOException {
|
||||
|
||||
// At the moment, MOR table type does not support bulk nested rollbacks. Nested rollbacks is an experimental
|
||||
@@ -182,26 +182,23 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
// (commitToRollback).
|
||||
// NOTE {@link HoodieCompactionConfig#withCompactionLazyBlockReadEnabled} needs to be set to TRUE. This is
|
||||
// required to avoid OOM when merging multiple LogBlocks performed during nested rollbacks.
|
||||
if (commits.size() > 1) {
|
||||
throw new UnsupportedOperationException("Bulk Nested Rollbacks are not supported");
|
||||
}
|
||||
// Atomically un-publish all non-inflight commits
|
||||
Map<String, HoodieInstant> commitsAndCompactions = this.getActiveTimeline()
|
||||
Optional<HoodieInstant> commitOrCompactionOption = this.getActiveTimeline()
|
||||
.getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION,
|
||||
HoodieActiveTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION)).getInstants()
|
||||
.filter(i -> commits.contains(i.getTimestamp()))
|
||||
.collect(Collectors.toMap(HoodieInstant::getTimestamp, i -> i));
|
||||
|
||||
.filter(i -> commit.equals(i.getTimestamp()))
|
||||
.findFirst();
|
||||
HoodieInstant instantToRollback = commitOrCompactionOption.get();
|
||||
// Atomically un-publish all non-inflight commits
|
||||
commitsAndCompactions.entrySet().stream().map(Map.Entry::getValue)
|
||||
.filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight);
|
||||
logger.info("Unpublished " + commits);
|
||||
if (!instantToRollback.isInflight()) {
|
||||
this.getActiveTimeline().revertToInflight(instantToRollback);
|
||||
}
|
||||
logger.info("Unpublished " + commit);
|
||||
Long startTime = System.currentTimeMillis();
|
||||
List<HoodieRollbackStat> allRollbackStats = jsc.parallelize(FSUtils
|
||||
.getAllPartitionPaths(this.metaClient.getFs(), this.getMetaClient().getBasePath(),
|
||||
config.shouldAssumeDatePartitioning()))
|
||||
.map((Function<String, List<HoodieRollbackStat>>) partitionPath -> commits.stream().map(commit -> {
|
||||
HoodieInstant instant = commitsAndCompactions.get(commit);
|
||||
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
|
||||
HoodieActiveTimeline activeTimeline = this.getActiveTimeline().reload();
|
||||
HoodieRollbackStat hoodieRollbackStats = null;
|
||||
// Need to put the path filter here since Filter is not serializable
|
||||
@@ -209,18 +206,18 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
PathFilter filter = (path) -> {
|
||||
if (path.toString().contains(".parquet")) {
|
||||
String fileCommitTime = FSUtils.getCommitTime(path.getName());
|
||||
return commits.contains(fileCommitTime);
|
||||
return commit.equals(fileCommitTime);
|
||||
} else if (path.toString().contains(".log")) {
|
||||
// Since the baseCommitTime is the only commit for new log files, it's okay here
|
||||
String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path);
|
||||
return commits.contains(fileCommitTime);
|
||||
return commit.equals(fileCommitTime);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
|
||||
|
||||
switch (instant.getAction()) {
|
||||
switch (instantToRollback.getAction()) {
|
||||
case HoodieTimeline.COMMIT_ACTION:
|
||||
try {
|
||||
// Rollback of a commit should delete the newly created parquet files along with any log
|
||||
@@ -244,7 +241,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
// and has not yet finished. In this scenario we should delete only the newly created parquet files
|
||||
// and not corresponding base commit log files created with this as baseCommit since updates would
|
||||
// have been written to the log files.
|
||||
super.deleteCleanedFiles(filesToDeletedStatus, commits, partitionPath);
|
||||
super.deleteCleanedFiles(filesToDeletedStatus, commit, partitionPath);
|
||||
hoodieRollbackStats = HoodieRollbackStat.newBuilder()
|
||||
.withPartitionPath(partitionPath).withDeletedFileResults(filesToDeletedStatus).build();
|
||||
} else {
|
||||
@@ -286,7 +283,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
try {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
|
||||
metaClient.getCommitTimeline().getInstantDetails(
|
||||
new HoodieInstant(true, instant.getAction(), instant.getTimestamp()))
|
||||
new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp()))
|
||||
.get(), HoodieCommitMetadata.class);
|
||||
|
||||
// read commit file and (either append delete blocks or delete file)
|
||||
@@ -315,13 +312,11 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
||||
break;
|
||||
}
|
||||
return hoodieRollbackStats;
|
||||
}).collect(Collectors.toList())).flatMap(List::iterator).filter(Objects::nonNull).collect();
|
||||
}).filter(Objects::nonNull).collect();
|
||||
|
||||
// Delete Inflight instants if enabled
|
||||
deleteInflightInstants(deleteInstants, this.getActiveTimeline(),
|
||||
commitsAndCompactions.entrySet().stream().map(
|
||||
entry -> new HoodieInstant(true, entry.getValue().getAction(), entry.getValue().getTimestamp()))
|
||||
.collect(Collectors.toList()));
|
||||
deleteInflightInstant(deleteInstants, this.getActiveTimeline(), new HoodieInstant(true, instantToRollback
|
||||
.getAction(), instantToRollback.getTimestamp()));
|
||||
|
||||
logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
|
||||
|
||||
|
||||
@@ -253,7 +253,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
||||
* Atomically unpublish this commit (2) clean indexing data (3) clean new generated parquet files
|
||||
* / log blocks (4) Finally, delete .<action>.commit or .<action>.inflight file if deleteInstants = true
|
||||
*/
|
||||
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits, boolean deleteInstants)
|
||||
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, String commit, boolean deleteInstants)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user