[HUDI-3720] Fix the logic of reattempting pending rollback (#5148)
This commit is contained in:
@@ -672,14 +672,24 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
|
|||||||
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
|
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
|
||||||
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
|
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
|
||||||
.findFirst());
|
.findFirst());
|
||||||
if (commitInstantOpt.isPresent()) {
|
if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
|
||||||
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
|
LOG.info(String.format("Scheduling Rollback at instant time : %s "
|
||||||
|
+ "(exists in active timeline: %s), with rollback plan: %s",
|
||||||
|
rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent()));
|
||||||
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
|
Option<HoodieRollbackPlan> rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
|
||||||
.orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
|
.orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers()));
|
||||||
if (rollbackPlanOption.isPresent()) {
|
if (rollbackPlanOption.isPresent()) {
|
||||||
// execute rollback
|
// There can be a case where the inflight rollback failed after the instant files
|
||||||
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
|
// are deleted for commitInstantTime, so that commitInstantOpt is empty as it is
|
||||||
skipLocking);
|
// not present in the timeline. In such a case, the hoodie instant instance
|
||||||
|
// is reconstructed to allow the rollback to be reattempted, and the deleteInstants
|
||||||
|
// is set to false since they are already deleted.
|
||||||
|
// Execute rollback
|
||||||
|
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
|
||||||
|
? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking)
|
||||||
|
: table.rollback(context, rollbackInstantTime, new HoodieInstant(
|
||||||
|
true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
|
||||||
|
false, skipLocking);
|
||||||
if (timerContext != null) {
|
if (timerContext != null) {
|
||||||
long durationInMs = metrics.getDurationInMs(timerContext.stop());
|
long durationInMs = metrics.getDurationInMs(timerContext.stop());
|
||||||
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
|
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
|
|||||||
|
|
||||||
import org.apache.hudi.avro.model.HoodieInstantInfo;
|
import org.apache.hudi.avro.model.HoodieInstantInfo;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
import org.apache.hudi.avro.model.HoodieRollbackPlan;
|
||||||
|
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||||
@@ -48,6 +49,9 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
|
|||||||
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.Arguments;
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -55,6 +59,7 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
@@ -269,11 +274,19 @@ public class TestClientRollback extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Stream<Arguments> testFailedRollbackCommitParams() {
|
||||||
|
return Arrays.stream(new Boolean[][] {
|
||||||
|
{true, true}, {true, false}, {false, true}, {false, false},
|
||||||
|
}).map(Arguments::of);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Cases for effects of rollbacking completed/inflight commits.
|
* Test Cases for effects of rollbacking completed/inflight commits.
|
||||||
*/
|
*/
|
||||||
@Test
|
@ParameterizedTest
|
||||||
public void testFailedRollbackCommit() throws Exception {
|
@MethodSource("testFailedRollbackCommitParams")
|
||||||
|
public void testFailedRollbackCommit(
|
||||||
|
boolean enableMetadataTable, boolean instantToRollbackExists) throws Exception {
|
||||||
// Let's create some commit files and base files
|
// Let's create some commit files and base files
|
||||||
final String p1 = "2016/05/01";
|
final String p1 = "2016/05/01";
|
||||||
final String p2 = "2016/05/02";
|
final String p2 = "2016/05/02";
|
||||||
@@ -302,8 +315,20 @@ public class TestClientRollback extends HoodieClientTestBase {
|
|||||||
put(p3, "id33");
|
put(p3, "id33");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient)
|
|
||||||
.withPartitionMetaFiles(p1, p2, p3)
|
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||||
|
.withRollbackUsingMarkers(false)
|
||||||
|
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||||
|
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
||||||
|
|
||||||
|
HoodieTestTable testTable = enableMetadataTable
|
||||||
|
? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
|
||||||
|
metaClient.getHadoopConf(), config, context))
|
||||||
|
: HoodieTestTable.of(metaClient);
|
||||||
|
|
||||||
|
testTable.withPartitionMetaFiles(p1, p2, p3)
|
||||||
.addCommit(commitTime1)
|
.addCommit(commitTime1)
|
||||||
.withBaseFilesInPartitions(partitionAndFileId1)
|
.withBaseFilesInPartitions(partitionAndFileId1)
|
||||||
.addCommit(commitTime2)
|
.addCommit(commitTime2)
|
||||||
@@ -311,12 +336,6 @@ public class TestClientRollback extends HoodieClientTestBase {
|
|||||||
.addInflightCommit(commitTime3)
|
.addInflightCommit(commitTime3)
|
||||||
.withBaseFilesInPartitions(partitionAndFileId3);
|
.withBaseFilesInPartitions(partitionAndFileId3);
|
||||||
|
|
||||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
|
|
||||||
.withRollbackUsingMarkers(false)
|
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
|
||||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
|
|
||||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
|
||||||
|
|
||||||
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
|
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
|
||||||
|
|
||||||
// Rollback commit3
|
// Rollback commit3
|
||||||
@@ -333,8 +352,10 @@ public class TestClientRollback extends HoodieClientTestBase {
|
|||||||
// delete rollback completed meta file and retry rollback.
|
// delete rollback completed meta file and retry rollback.
|
||||||
FileCreateUtils.deleteRollbackCommit(basePath, rollbackInstant.getTimestamp());
|
FileCreateUtils.deleteRollbackCommit(basePath, rollbackInstant.getTimestamp());
|
||||||
|
|
||||||
// recreate actual commit files so that we can retry the rollback
|
if (instantToRollbackExists) {
|
||||||
testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3);
|
// recreate actual commit files if needed
|
||||||
|
testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3);
|
||||||
|
}
|
||||||
|
|
||||||
// retry rolling back the commit again.
|
// retry rolling back the commit again.
|
||||||
client.rollback(commitTime3);
|
client.rollback(commitTime3);
|
||||||
|
|||||||
Reference in New Issue
Block a user