diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index a6a7e18b1..5eb8e270a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -672,14 +672,24 @@ public abstract class BaseHoodieWriteClient commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) .findFirst()); - if (commitInstantOpt.isPresent()) { - LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime); + if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) { + LOG.info(String.format("Scheduling Rollback at instant time : %s " + + "(exists in active timeline: %s), with rollback plan: %s", + rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent())); Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())) .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); if (rollbackPlanOption.isPresent()) { - // execute rollback - HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, - skipLocking); + // There can be a case where the inflight rollback failed after the instant files + // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is + // not present in the timeline. In such a case, the hoodie instant instance + // is reconstructed to allow the rollback to be reattempted, and the deleteInstants + // is set to false since they are already deleted. + // Execute rollback + HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent() + ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking) + : table.rollback(context, rollbackInstantTime, new HoodieInstant( + true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime), + false, skipLocking); if (timerContext != null) { long durationInMs = metrics.getDurationInMs(timerContext.stop()); metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 3b5393527..c06b0a062 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -20,6 +20,7 @@ package org.apache.hudi.client; import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCleaningPolicy; @@ -48,6 +49,9 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.Collections; @@ -55,6 +59,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -269,11 +274,19 @@ public class TestClientRollback extends HoodieClientTestBase { } } + private static Stream testFailedRollbackCommitParams() { + return Arrays.stream(new Boolean[][] { + {true, true}, {true, false}, {false, true}, {false, false}, + }).map(Arguments::of); + } + /** * Test Cases for effects of rollbacking completed/inflight commits. */ - @Test - public void testFailedRollbackCommit() throws Exception { + @ParameterizedTest + @MethodSource("testFailedRollbackCommitParams") + public void testFailedRollbackCommit( + boolean enableMetadataTable, boolean instantToRollbackExists) throws Exception { // Let's create some commit files and base files final String p1 = "2016/05/01"; final String p2 = "2016/05/02"; @@ -302,8 +315,20 @@ public class TestClientRollback extends HoodieClientTestBase { put(p3, "id33"); } }; - HoodieTestTable testTable = HoodieTestTable.of(metaClient) - .withPartitionMetaFiles(p1, p2, p3) + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withRollbackUsingMarkers(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); + + HoodieTestTable testTable = enableMetadataTable + ? HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create( + metaClient.getHadoopConf(), config, context)) + : HoodieTestTable.of(metaClient); + + testTable.withPartitionMetaFiles(p1, p2, p3) .addCommit(commitTime1) .withBaseFilesInPartitions(partitionAndFileId1) .addCommit(commitTime2) @@ -311,12 +336,6 @@ public class TestClientRollback extends HoodieClientTestBase { .addInflightCommit(commitTime3) .withBaseFilesInPartitions(partitionAndFileId3); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withRollbackUsingMarkers(false) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { // Rollback commit3 @@ -333,8 +352,10 @@ public class TestClientRollback extends HoodieClientTestBase { // delete rollback completed meta file and retry rollback. FileCreateUtils.deleteRollbackCommit(basePath, rollbackInstant.getTimestamp()); - // recreate actual commit files so that we can retry the rollback - testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + if (instantToRollbackExists) { + // recreate actual commit files if needed + testTable.addInflightCommit(commitTime3).withBaseFilesInPartitions(partitionAndFileId3); + } // retry rolling back the commit again. client.rollback(commitTime3);