[HUDI-2861] Re-use same rollback instant time for failed rollbacks (#4123)
This commit is contained in:
committed by
GitHub
parent
a88691fed3
commit
f8e0176eb0
@@ -32,6 +32,7 @@ import org.apache.hudi.client.embedded.EmbeddedTimelineService;
|
||||
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
|
||||
import org.apache.hudi.client.transaction.TransactionManager;
|
||||
import org.apache.hudi.client.utils.TransactionUtils;
|
||||
import org.apache.hudi.common.HoodiePendingRollbackInfo;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||
@@ -67,6 +68,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.HoodieTimelineArchiveLog;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.action.rollback.RollbackUtils;
|
||||
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
|
||||
import org.apache.hudi.table.marker.WriteMarkersFactory;
|
||||
|
||||
@@ -80,6 +82,8 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -584,7 +588,23 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
|
||||
@Deprecated
|
||||
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
|
||||
return rollback(commitInstantTime, false);
|
||||
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
|
||||
Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
|
||||
return rollback(commitInstantTime, pendingRollbackInfo, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @Deprecated
|
||||
* Rollback the inflight record changes with the given commit time. This
|
||||
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
|
||||
* Adding this api for backwards compatability.
|
||||
* @param commitInstantTime Instant time of the commit
|
||||
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
|
||||
* @throws HoodieRollbackException if rollback cannot be performed successfully
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException {
|
||||
return rollback(commitInstantTime, Option.empty(), skipLocking);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -593,13 +613,15 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
|
||||
*
|
||||
* @param commitInstantTime Instant time of the commit
|
||||
* @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
|
||||
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
|
||||
* @throws HoodieRollbackException if rollback cannot be performed successfully
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException {
|
||||
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
|
||||
LOG.info("Begin rollback of instant " + commitInstantTime);
|
||||
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
boolean pendingRollback = pendingRollbackInfo.isPresent();
|
||||
final String rollbackInstantTime = pendingRollback ? pendingRollbackInfo.get().getRollbackInstant().getTimestamp() : HoodieActiveTimeline.createNewInstantTime();
|
||||
final Timer.Context timerContext = this.metrics.getRollbackCtx();
|
||||
try {
|
||||
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
|
||||
@@ -608,7 +630,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
.findFirst());
|
||||
if (commitInstantOpt.isPresent()) {
|
||||
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
|
||||
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime,
|
||||
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollback ? Option.of(pendingRollbackInfo.get().getRollbackPlan()) : table.scheduleRollback(context, rollbackInstantTime,
|
||||
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
|
||||
if (rollbackPlanOption.isPresent()) {
|
||||
// execute rollback
|
||||
@@ -838,6 +860,29 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
return inflightTimelineExcludeClusteringCommit;
|
||||
}
|
||||
|
||||
private Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
|
||||
Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfos(metaClient).get(commitToRollback);
|
||||
return pendingRollbackInfo != null ? pendingRollbackInfo : Option.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch map of pending commits to be rolledback to {@link HoodiePendingRollbackInfo}.
|
||||
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
|
||||
* @return map of pending commits to be rolledback instants to Rollback Instnat and Rollback plan Pair.
|
||||
*/
|
||||
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
|
||||
return metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().map(
|
||||
entry -> {
|
||||
try {
|
||||
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, entry);
|
||||
return Pair.of(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(entry, rollbackPlan)));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Fetching rollback plan failed for " + entry, e);
|
||||
}
|
||||
}
|
||||
).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||
}
|
||||
|
||||
/**
|
||||
* Rollback all failed writes.
|
||||
*/
|
||||
@@ -851,22 +896,28 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
*/
|
||||
public Boolean rollbackFailedWrites(boolean skipLocking) {
|
||||
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
|
||||
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(),
|
||||
Option.empty());
|
||||
rollbackFailedWrites(instantsToRollback, skipLocking);
|
||||
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
|
||||
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
|
||||
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
|
||||
|
||||
HashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = pendingRollbacks.entrySet()
|
||||
.stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
|
||||
rollbackFailedWrites(reverseSortedRollbackInstants, skipLocking);
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void rollbackFailedWrites(List<String> instantsToRollback, boolean skipLocking) {
|
||||
for (String instant : instantsToRollback) {
|
||||
if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS,
|
||||
protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
|
||||
for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : instantsToRollback.entrySet()) {
|
||||
if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
|
||||
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
|
||||
// do we need to handle failed rollback of a bootstrap
|
||||
rollbackFailedBootstrap();
|
||||
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config);
|
||||
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
|
||||
break;
|
||||
} else {
|
||||
rollback(instant, skipLocking);
|
||||
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instant, config);
|
||||
rollback(entry.getKey(), entry.getValue(), skipLocking);
|
||||
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ public class RollbackUtils {
|
||||
* @return Rollback plan corresponding to rollback instant
|
||||
* @throws IOException
|
||||
*/
|
||||
static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant)
|
||||
public static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant)
|
||||
throws IOException {
|
||||
// TODO: add upgrade step if required.
|
||||
return TimelineMetadataUtils.deserializeAvroMetadata(
|
||||
|
||||
Reference in New Issue
Block a user