[HUDI-3102] Do not store rollback plan in inflight instant (#4445)
This commit is contained in:
@@ -105,62 +105,45 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
|
|||||||
private HoodieRollbackMetadata runRollback(HoodieTable<T, I, K, O> table, HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) {
|
private HoodieRollbackMetadata runRollback(HoodieTable<T, I, K, O> table, HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) {
|
||||||
ValidationUtils.checkArgument(rollbackInstant.getState().equals(HoodieInstant.State.REQUESTED)
|
ValidationUtils.checkArgument(rollbackInstant.getState().equals(HoodieInstant.State.REQUESTED)
|
||||||
|| rollbackInstant.getState().equals(HoodieInstant.State.INFLIGHT));
|
|| rollbackInstant.getState().equals(HoodieInstant.State.INFLIGHT));
|
||||||
try {
|
final HoodieTimer timer = new HoodieTimer();
|
||||||
final HoodieInstant inflightInstant;
|
timer.startTimer();
|
||||||
final HoodieTimer timer = new HoodieTimer();
|
final HoodieInstant inflightInstant = rollbackInstant.isRequested()
|
||||||
timer.startTimer();
|
? table.getActiveTimeline().transitionRollbackRequestedToInflight(rollbackInstant)
|
||||||
if (rollbackInstant.isRequested()) {
|
: rollbackInstant;
|
||||||
inflightInstant = table.getActiveTimeline().transitionRollbackRequestedToInflight(rollbackInstant,
|
|
||||||
TimelineMetadataUtils.serializeRollbackPlan(rollbackPlan));
|
|
||||||
} else {
|
|
||||||
inflightInstant = rollbackInstant;
|
|
||||||
}
|
|
||||||
|
|
||||||
HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
|
HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
|
||||||
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackPlan);
|
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackPlan);
|
||||||
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
|
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
|
||||||
instantTime,
|
instantTime,
|
||||||
Option.of(rollbackTimer.endTimer()),
|
Option.of(rollbackTimer.endTimer()),
|
||||||
Collections.singletonList(instantToRollback),
|
Collections.singletonList(instantToRollback),
|
||||||
stats);
|
stats);
|
||||||
if (!skipTimelinePublish) {
|
if (!skipTimelinePublish) {
|
||||||
finishRollback(inflightInstant, rollbackMetadata);
|
finishRollback(inflightInstant, rollbackMetadata);
|
||||||
}
|
|
||||||
|
|
||||||
// Finally, remove the markers post rollback.
|
|
||||||
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
|
|
||||||
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
|
||||||
|
|
||||||
return rollbackMetadata;
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Failed to rollback commit ", e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finally, remove the markers post rollback.
|
||||||
|
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
|
||||||
|
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
|
||||||
|
|
||||||
|
return rollbackMetadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HoodieRollbackMetadata execute() {
|
public HoodieRollbackMetadata execute() {
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
List<HoodieInstant> rollBackInstants = table.getRollbackTimeline()
|
Option<HoodieInstant> rollbackInstant = table.getRollbackTimeline()
|
||||||
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
|
.filterInflightsAndRequested()
|
||||||
if (rollBackInstants.isEmpty()) {
|
.filter(instant -> instant.getTimestamp().equals(instantTime))
|
||||||
throw new HoodieRollbackException("No Requested Rollback Instants found to execute rollback ");
|
.firstInstant();
|
||||||
|
if (!rollbackInstant.isPresent()) {
|
||||||
|
throw new HoodieRollbackException("No pending rollback instants found to execute rollback");
|
||||||
}
|
}
|
||||||
HoodieInstant rollbackInstant = null;
|
try {
|
||||||
for (HoodieInstant instant : rollBackInstants) {
|
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant.get());
|
||||||
if (instantTime.equals(instant.getTimestamp())) {
|
return runRollback(table, rollbackInstant.get(), rollbackPlan);
|
||||||
rollbackInstant = instant;
|
} catch (IOException e) {
|
||||||
break;
|
throw new HoodieIOException("Failed to fetch rollback plan for commit " + instantTime, e);
|
||||||
}
|
|
||||||
}
|
|
||||||
if (rollbackInstant != null) {
|
|
||||||
try {
|
|
||||||
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant);
|
|
||||||
return runRollback(table, rollBackInstants.get(0), rollbackPlan);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Failed to fetch rollback plan to rollback commit " + rollbackInstant.getTimestamp(), e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new HoodieIOException("No inflight rollback instants found for commit time " + instantTime);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -357,14 +357,13 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
* Transition Rollback State from requested to inflight.
|
* Transition Rollback State from requested to inflight.
|
||||||
*
|
*
|
||||||
* @param requestedInstant requested instant
|
* @param requestedInstant requested instant
|
||||||
* @param data Optional data to be stored
|
|
||||||
* @return commit instant
|
* @return commit instant
|
||||||
*/
|
*/
|
||||||
public HoodieInstant transitionRollbackRequestedToInflight(HoodieInstant requestedInstant, Option<byte[]> data) {
|
public HoodieInstant transitionRollbackRequestedToInflight(HoodieInstant requestedInstant) {
|
||||||
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
|
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION));
|
||||||
ValidationUtils.checkArgument(requestedInstant.isRequested());
|
ValidationUtils.checkArgument(requestedInstant.isRequested());
|
||||||
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, ROLLBACK_ACTION, requestedInstant.getTimestamp());
|
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, ROLLBACK_ACTION, requestedInstant.getTimestamp());
|
||||||
transitionState(requestedInstant, inflight, data);
|
transitionState(requestedInstant, inflight, Option.empty());
|
||||||
return inflight;
|
return inflight;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user