[HUDI-3805] Delete existing corrupted requested rollback plan during rollback (#5245)
This commit is contained in:
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.async.AsyncArchiveService;
|
||||
import org.apache.hudi.async.AsyncCleanerService;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
@@ -72,7 +71,6 @@ import org.apache.hudi.exception.HoodieRestoreException;
|
||||
import org.apache.hudi.exception.HoodieRollbackException;
|
||||
import org.apache.hudi.exception.HoodieSavepointException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadata;
|
||||
import org.apache.hudi.internal.schema.InternalSchema;
|
||||
import org.apache.hudi.internal.schema.Type;
|
||||
import org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
|
||||
@@ -82,6 +80,7 @@ import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
|
||||
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
|
||||
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
|
||||
import org.apache.hudi.internal.schema.utils.SerDeHelper;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadata;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
||||
import org.apache.hudi.metadata.MetadataPartitionType;
|
||||
import org.apache.hudi.metrics.HoodieMetrics;
|
||||
@@ -95,8 +94,9 @@ import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
|
||||
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -105,11 +105,11 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -1113,9 +1113,28 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
||||
protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) {
|
||||
List<HoodieInstant> instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
|
||||
for (HoodieInstant instant : instants) {
|
||||
for (HoodieInstant rollbackInstant : instants) {
|
||||
HoodieRollbackPlan rollbackPlan;
|
||||
try {
|
||||
rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant);
|
||||
} catch (IOException e) {
|
||||
if (rollbackInstant.isRequested()) {
|
||||
LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e);
|
||||
try {
|
||||
metaClient.getActiveTimeline().deletePending(rollbackInstant);
|
||||
} catch (HoodieIOException he) {
|
||||
LOG.warn("Cannot delete " + rollbackInstant, he);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// Here we assume that if the rollback is inflight, the rollback plan is intact
|
||||
// in instant.rollback.requested. The exception here can be due to other reasons.
|
||||
LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, instant);
|
||||
String action = rollbackPlan.getInstantToRollback().getAction();
|
||||
if (ignoreCompactionAndClusteringInstants) {
|
||||
if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
|
||||
@@ -1124,14 +1143,14 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
||||
rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
|
||||
if (!isClustering) {
|
||||
String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime();
|
||||
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
||||
infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
|
||||
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the plan", e);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e);
|
||||
}
|
||||
}
|
||||
return infoMap;
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieInstantInfo;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackRequest;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
@@ -61,9 +62,11 @@ import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
|
||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@@ -481,4 +484,107 @@ public class TestClientRollback extends HoodieClientTestBase {
|
||||
assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
|
||||
}
|
||||
}
|
||||
|
||||
private static Stream<Arguments> testRollbackWithRequestedRollbackPlanParams() {
|
||||
return Arrays.stream(new Boolean[][] {
|
||||
{true, true}, {true, false}, {false, true}, {false, false},
|
||||
}).map(Arguments::of);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("testRollbackWithRequestedRollbackPlanParams")
|
||||
public void testRollbackWithRequestedRollbackPlan(boolean enableMetadataTable, boolean isRollbackPlanCorrupted) throws Exception {
|
||||
// Let's create some commit files and base files
|
||||
final String p1 = "2022/04/05";
|
||||
final String p2 = "2022/04/06";
|
||||
final String commitTime1 = "20220406010101002";
|
||||
final String commitTime2 = "20220406020601002";
|
||||
final String commitTime3 = "20220406030611002";
|
||||
final String rollbackInstantTime = "20220406040611002";
|
||||
Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
|
||||
{
|
||||
put(p1, "id11");
|
||||
put(p2, "id12");
|
||||
}
|
||||
};
|
||||
Map<String, String> partitionAndFileId2 = new HashMap<String, String>() {
|
||||
{
|
||||
put(p1, "id21");
|
||||
put(p2, "id22");
|
||||
}
|
||||
};
|
||||
Map<String, String> partitionAndFileId3 = new HashMap<String, String>() {
|
||||
{
|
||||
put(p1, "id31");
|
||||
put(p2, "id32");
|
||||
}
|
||||
};
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withRollbackUsingMarkers(false)
|
||||
.withMetadataConfig(
|
||||
HoodieMetadataConfig.newBuilder()
|
||||
// Column Stats Index is disabled, since these tests construct tables which are
|
||||
// not valid (empty commit metadata, invalid parquet files)
|
||||
.withMetadataIndexColumnStats(false)
|
||||
.enable(enableMetadataTable)
|
||||
.build()
|
||||
)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
||||
|
||||
HoodieTestTable testTable = enableMetadataTable
|
||||
? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
|
||||
metaClient.getHadoopConf(), config, context))
|
||||
: HoodieTestTable.of(metaClient);
|
||||
|
||||
testTable.withPartitionMetaFiles(p1, p2)
|
||||
.addCommit(commitTime1)
|
||||
.withBaseFilesInPartitions(partitionAndFileId1)
|
||||
.addCommit(commitTime2)
|
||||
.withBaseFilesInPartitions(partitionAndFileId2)
|
||||
.addInflightCommit(commitTime3)
|
||||
.withBaseFilesInPartitions(partitionAndFileId3);
|
||||
|
||||
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
|
||||
if (isRollbackPlanCorrupted) {
|
||||
// Add a corrupted requested rollback plan
|
||||
FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, new byte[] {0, 1, 2});
|
||||
} else {
|
||||
// Add a valid requested rollback plan to roll back commitTime3
|
||||
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
|
||||
List<HoodieRollbackRequest> rollbackRequestList = partitionAndFileId3.keySet().stream()
|
||||
.map(partition -> new HoodieRollbackRequest(partition, EMPTY_STRING, EMPTY_STRING,
|
||||
Collections.singletonList(metaClient.getBasePath() + "/" + partition + "/"
|
||||
+ FileCreateUtils.baseFileName(commitTime3, partitionAndFileId3.get(p1))),
|
||||
Collections.emptyMap()))
|
||||
.collect(Collectors.toList());
|
||||
rollbackPlan.setRollbackRequests(rollbackRequestList);
|
||||
rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3, HoodieTimeline.COMMIT_ACTION));
|
||||
FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(), rollbackInstantTime, rollbackPlan);
|
||||
}
|
||||
|
||||
// Rollback commit3
|
||||
client.rollback(commitTime3);
|
||||
assertFalse(testTable.inflightCommitExists(commitTime3));
|
||||
assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
|
||||
assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
List<HoodieInstant> rollbackInstants = metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList());
|
||||
// Corrupted requested rollback plan should be deleted before scheduling a new one
|
||||
assertEquals(rollbackInstants.size(), 1);
|
||||
HoodieInstant rollbackInstant = rollbackInstants.get(0);
|
||||
assertTrue(rollbackInstant.isCompleted());
|
||||
|
||||
if (isRollbackPlanCorrupted) {
|
||||
// Should create a new rollback instant
|
||||
assertNotEquals(rollbackInstantTime, rollbackInstant.getTimestamp());
|
||||
} else {
|
||||
// Should reuse the rollback instant
|
||||
assertEquals(rollbackInstantTime, rollbackInstant.getTimestamp());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,6 +244,10 @@ public class FileCreateUtils {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get());
|
||||
}
|
||||
|
||||
public static void createRequestedRollbackFile(String basePath, String instantTime, byte[] content) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, content);
|
||||
}
|
||||
|
||||
public static void createInflightRollbackFile(String basePath, String instantTime) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user