From 386767693d46e7419c4fb0fa292ccb7ab7f7098d Mon Sep 17 00:00:00 2001 From: satishkotha Date: Tue, 27 Apr 2021 14:21:42 -0700 Subject: [PATCH] [HUDI-1833] rollback pending clustering even if there is greater commit (#2863) * [HUDI-1833] rollback pending clustering even if there are greater commits --- .../rollback/BaseRollbackActionExecutor.java | 6 ++- .../TestHoodieClientOnCopyOnWriteStorage.java | 45 ++++++++++++++++--- 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 4792cd717..3f997c21b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -174,8 +175,11 @@ public abstract class BaseRollbackActionExecutor allRecords = testClustering(clusteringConfig, false); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); + List> pendingClusteringPlans = + ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList()); + assertEquals(1, pendingClusteringPlans.size()); + HoodieInstant pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft(); + + // complete another commit after pending clustering + HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build(); + SparkRDDWriteClient client = getHoodieWriteClient(config); + dataGen = new HoodieTestDataGenerator(); + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + allRecords.addAll(dataGen.generateInserts(commitTime, 200)); + writeAndVerifyBatch(client, allRecords, commitTime); + + // verify pending clustering can be rolled back (even though there is a completed commit greater than pending clustering) + client.rollback(pendingClusteringInstant.getTimestamp()); + metaClient.reloadActiveTimeline(); + // verify there are no pending clustering instants + assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count()); + } + + private List testClustering(HoodieClusteringConfig clusteringConfig) throws Exception { + return testClustering(clusteringConfig, false); + } + + private List testClustering(HoodieClusteringConfig clusteringConfig, boolean completeClustering) throws Exception { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10); SparkRDDWriteClient client = getHoodieWriteClient(config); dataGen = new HoodieTestDataGenerator(); - String commitTime = "100"; + String commitTime = HoodieActiveTimeline.createNewInstantTime(); List records1 = dataGen.generateInserts(commitTime, 200); List statuses1 = writeAndVerifyBatch(client, records1, commitTime); Set fileIds1 = getFileGroupIdsFromWriteStatus(statuses1); - commitTime = "200"; + commitTime = HoodieActiveTimeline.createNewInstantTime(); List records2 = dataGen.generateInserts(commitTime, 200); List statuses2 = writeAndVerifyBatch(client, records2, commitTime); Set fileIds2 = getFileGroupIdsFromWriteStatus(statuses2); @@ -1134,12 +1167,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { fileIdIntersection.retainAll(fileIds2); assertEquals(0, fileIdIntersection.size()); - config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withClusteringConfig(clusteringConfig).build(); + config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(completeClustering) + .withClusteringConfig(clusteringConfig).build(); // create client with new config. client = getHoodieWriteClient(config); String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); - HoodieWriteMetadata> clusterMetadata = client.cluster(clusteringCommitTime, true); + HoodieWriteMetadata> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering); List allRecords = Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()); verifyRecordsWritten(clusteringCommitTime, allRecords, clusterMetadata.getWriteStatuses().collect()); Set insertedFileIds = new HashSet<>(); @@ -1151,6 +1185,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { partitionFiles.getValue().stream().forEach(file -> replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file)))); assertEquals(insertedFileIds, replacedFileIds); + return allRecords; } private Set getFileGroupIdsFromWriteStatus(List statuses) {