[HUDI-2997] Skip the corrupt meta file for pending rollback action (#4296)
This commit is contained in:
@@ -82,6 +82,7 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -898,21 +899,22 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch map of pending commits to be rolledback to {@link HoodiePendingRollbackInfo}.
|
||||
* Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}.
|
||||
* @param metaClient instance of {@link HoodieTableMetaClient} to use.
|
||||
* @return map of pending commits to be rolledback instants to Rollback Instant and Rollback plan Pair.
|
||||
* @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair.
|
||||
*/
|
||||
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
|
||||
return metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().map(
|
||||
entry -> {
|
||||
try {
|
||||
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, entry);
|
||||
return Pair.of(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(entry, rollbackPlan)));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Fetching rollback plan failed for " + entry, e);
|
||||
}
|
||||
}
|
||||
).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
|
||||
for (HoodieInstant instant : instants) {
|
||||
try {
|
||||
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
|
||||
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
|
||||
}
|
||||
}
|
||||
return infoMap;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -65,8 +65,9 @@ public class RollbackUtils {
|
||||
public static HoodieRollbackPlan getRollbackPlan(HoodieTableMetaClient metaClient, HoodieInstant rollbackInstant)
|
||||
throws IOException {
|
||||
// TODO: add upgrade step if required.
|
||||
final HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(rollbackInstant);
|
||||
return TimelineMetadataUtils.deserializeAvroMetadata(
|
||||
metaClient.getActiveTimeline().readRollbackInfoAsBytes(rollbackInstant).get(), HoodieRollbackPlan.class);
|
||||
metaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(), HoodieRollbackPlan.class);
|
||||
}
|
||||
|
||||
static Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String instantToRollback, String rollbackInstantTime) {
|
||||
|
||||
@@ -334,6 +334,10 @@ public interface HoodieTimeline extends Serializable {
|
||||
return new HoodieInstant(State.INFLIGHT, REPLACE_COMMIT_ACTION, timestamp);
|
||||
}
|
||||
|
||||
static HoodieInstant getRollbackRequestedInstant(HoodieInstant instant) {
|
||||
return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the inflight instant corresponding to the instant being passed. Takes care of changes in action names
|
||||
* between inflight and completed instants (compaction <=> commit).
|
||||
|
||||
Reference in New Issue
Block a user