diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java index 9025623e8..1fac279f8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java @@ -136,6 +136,9 @@ public abstract class BaseRestoreActionExecutor HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime)) .collect(Collectors.toList()); instantsToRollback.forEach(entry -> { + if (entry.isCompleted()) { + table.getActiveTimeline().deleteCompletedRollback(entry); + } table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp())); table.getActiveTimeline().deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, entry.getTimestamp())); }); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index d44ba5590..8e34f0fe5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -186,7 +186,12 @@ public abstract class BaseRollbackActionExecutor inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) + List inflights = inflightAndRequestedCommitTimeline.getInstants().filter(instant -> { + if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + return true; + } + return !ClusteringUtils.isPendingClusteringInstant(table.getMetaClient(), instant); + }).map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); if ((instantTimeToRollback != null) && !inflights.isEmpty() && (inflights.indexOf(instantTimeToRollback) != inflights.size() - 1)) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 3b7895400..a6a37030e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1399,6 +1399,41 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } + @Test + public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception { + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true) + .withPreserveHoodieCommitMetadata(true).build(); + // trigger clustering, but do not complete + testInsertAndClustering(clusteringConfig, true, false, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); + + // trigger another partial commit, followed by valid commit. rollback of partial commit should succeed. + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withAutoCommit(false); + SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build()); + String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); + List records1 = dataGen.generateInserts(commitTime1, 200); + client.startCommitWithTime(commitTime1); + JavaRDD insertRecordsRDD1 = jsc.parallelize(records1, 2); + JavaRDD statuses = client.upsert(insertRecordsRDD1, commitTime1); + List statusList = statuses.collect(); + assertNoWriteErrors(statusList); + + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); + assertEquals(2, metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants()); + + // trigger another commit. this should rollback latest partial commit. + records1 = dataGen.generateInserts(commitTime1, 200); + client.startCommitWithTime(commitTime1); + insertRecordsRDD1 = jsc.parallelize(records1, 2); + statuses = client.upsert(insertRecordsRDD1, commitTime1); + statusList = statuses.collect(); + assertNoWriteErrors(statusList); + client.commit(commitTime1, statuses); + metaClient.reloadActiveTimeline(); + // rollback should have succeeded. Essentially, the pending clustering should not hinder the rollback of regular commits. + assertEquals(1, metaClient.getActiveTimeline().getCommitsTimeline().filterInflightsAndRequested().countInstants()); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testInlineScheduleClustering(boolean scheduleInlineClustering) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 36dd5368d..f7dc2f63a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -201,6 +201,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { deleteInstantFile(instant); } + public void deleteCompletedRollback(HoodieInstant instant) { + ValidationUtils.checkArgument(instant.isCompleted()); + deleteInstantFile(instant); + } + public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) { try { fs.delete(new Path(metaPath, instant.getFileName()), false);