[MINOR] Follow ups from HUDI-2861 (re-use same rollback instant for failed rollback) (#4133)
This commit is contained in:
committed by
GitHub
parent
257a6a7456
commit
9c059ef8e5
@@ -82,7 +82,6 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -620,8 +619,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
@Deprecated
|
||||
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
|
||||
LOG.info("Begin rollback of instant " + commitInstantTime);
|
||||
boolean pendingRollback = pendingRollbackInfo.isPresent();
|
||||
final String rollbackInstantTime = pendingRollback ? pendingRollbackInfo.get().getRollbackInstant().getTimestamp() : HoodieActiveTimeline.createNewInstantTime();
|
||||
final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
|
||||
final Timer.Context timerContext = this.metrics.getRollbackCtx();
|
||||
try {
|
||||
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
|
||||
@@ -630,8 +628,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
.findFirst());
|
||||
if (commitInstantOpt.isPresent()) {
|
||||
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
|
||||
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollback ? Option.of(pendingRollbackInfo.get().getRollbackPlan()) : table.scheduleRollback(context, rollbackInstantTime,
|
||||
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers());
|
||||
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())).orElse(table.scheduleRollback(context, rollbackInstantTime,
|
||||
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
|
||||
if (rollbackPlanOption.isPresent()) {
|
||||
// execute rollback
|
||||
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
|
||||
@@ -861,8 +859,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
}
|
||||
|
||||
private Option<HoodiePendingRollbackInfo> getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) {
|
||||
Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfos(metaClient).get(commitToRollback);
|
||||
return pendingRollbackInfo != null ? pendingRollbackInfo : Option.empty();
|
||||
return getPendingRollbackInfos(metaClient).getOrDefault(commitToRollback, Option.empty());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -899,16 +896,16 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
|
||||
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
|
||||
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
|
||||
|
||||
HashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = pendingRollbacks.entrySet()
|
||||
.stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
|
||||
rollbackFailedWrites(reverseSortedRollbackInstants, skipLocking);
|
||||
rollbackFailedWrites(pendingRollbacks, skipLocking);
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void rollbackFailedWrites(Map<String, Option<HoodiePendingRollbackInfo>> instantsToRollback, boolean skipLocking) {
|
||||
for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : instantsToRollback.entrySet()) {
|
||||
// sort in reverse order of commit times
|
||||
LinkedHashMap<String, Option<HoodiePendingRollbackInfo>> reverseSortedRollbackInstants = instantsToRollback.entrySet()
|
||||
.stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
|
||||
for (Map.Entry<String, Option<HoodiePendingRollbackInfo>> entry : reverseSortedRollbackInstants.entrySet()) {
|
||||
if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
|
||||
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
|
||||
// do we need to handle failed rollback of a bootstrap
|
||||
|
||||
Reference in New Issue
Block a user