[HUDI-3101] Excluding compaction instants from pending rollback info (#4443)
This commit is contained in:
@@ -902,10 +902,14 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient) {
|
||||
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
|
||||
HoodieTimeline pendingCompactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline();
|
||||
for (HoodieInstant instant : instants) {
|
||||
try {
|
||||
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
|
||||
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
||||
String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
|
||||
if (!pendingCompactionTimeline.containsInstant(instantToRollback)) {
|
||||
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
|
||||
}
|
||||
|
||||
@@ -18,14 +18,18 @@
|
||||
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieInstantInfo;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
|
||||
@@ -340,6 +344,27 @@ public class TestClientRollback extends HoodieClientTestBase {
|
||||
rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||
assertEquals(rollbackInstants.size(), 1);
|
||||
assertEquals(rollbackInstants.get(0), rollbackInstant);
|
||||
|
||||
final String commitTime4 = "20160507040601";
|
||||
final String commitTime5 = "20160507050611";
|
||||
|
||||
// add inflight compaction then rolls it back
|
||||
testTable.addInflightCompaction(commitTime4, new HoodieCommitMetadata());
|
||||
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
|
||||
rollbackPlan.setRollbackRequests(Collections.emptyList());
|
||||
rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime4, HoodieTimeline.COMPACTION_ACTION));
|
||||
testTable.addRequestedRollback(commitTime5, rollbackPlan);
|
||||
|
||||
// the compaction instants should be excluded
|
||||
metaClient.reloadActiveTimeline();
|
||||
assertEquals(0, client.getPendingRollbackInfos(metaClient).size());
|
||||
|
||||
// verify there is no extra rollback instants
|
||||
client.rollback(commitTime4);
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
rollbackInstants = metaClient.reloadActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||
assertEquals(2, rollbackInstants.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
@@ -63,6 +64,7 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serial
|
||||
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRequestedReplaceMetadata;
|
||||
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRestoreMetadata;
|
||||
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRollbackMetadata;
|
||||
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRollbackPlan;
|
||||
|
||||
public class FileCreateUtils {
|
||||
|
||||
@@ -225,6 +227,10 @@ public class FileCreateUtils {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION, serializeCleanerPlan(cleanerPlan).get());
|
||||
}
|
||||
|
||||
public static void createRequestedRollbackFile(String basePath, String instantTime, HoodieRollbackPlan plan) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get());
|
||||
}
|
||||
|
||||
public static void createInflightRollbackFile(String basePath, String instantTime) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
|
||||
import org.apache.hudi.common.HoodieCleanStat;
|
||||
@@ -104,6 +105,7 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCo
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCompaction;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedReplaceCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRollbackFile;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRestoreFile;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRollbackFile;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
|
||||
@@ -309,6 +311,12 @@ public class HoodieTestTable {
|
||||
return Pair.of(cleanerPlan, convertCleanMetadata(commitTime, Option.of(0L), cleanStats));
|
||||
}
|
||||
|
||||
public HoodieTestTable addRequestedRollback(String instantTime, HoodieRollbackPlan plan) throws IOException {
|
||||
createRequestedRollbackFile(basePath, instantTime, plan);
|
||||
currentInstantTime = instantTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable addInflightRollback(String instantTime) throws IOException {
|
||||
createInflightRollbackFile(basePath, instantTime);
|
||||
currentInstantTime = instantTime;
|
||||
|
||||
Reference in New Issue
Block a user