[HUDI-3257] Excluding clustering instants from pending rollback info (#4616)
This commit is contained in:
@@ -906,14 +906,18 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
|
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
|
||||||
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
|
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||||
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
|
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
|
||||||
HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
|
|
||||||
for (HoodieInstant instant : instants) {
|
for (HoodieInstant instant : instants) {
|
||||||
try {
|
try {
|
||||||
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
|
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
|
||||||
|
String action = rollbackPlan.getInstantToRollback().getAction();
|
||||||
|
if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
|
||||||
|
boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
|
||||||
|
&& ClusteringUtils.getClusteringPlan(metaClient, instant).isPresent();
|
||||||
|
if (!isClustering) {
|
||||||
String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
|
String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
|
||||||
if (!pendingCompactionTimeline.containsInstant(instantToRollback)) {
|
|
||||||
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
|
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user