1
0

[HUDI-1833] rollback pending clustering even if there is greater commit (#2863)

* [HUDI-1833] rollback pending clustering even if there are greater commits
This commit is contained in:
satishkotha
2021-04-27 14:21:42 -07:00
committed by GitHub
parent e4fd195d9f
commit 386767693d
2 changed files with 45 additions and 6 deletions

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -174,8 +175,11 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
final String instantTimeToRollback = instantToRollback.getTimestamp();
final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
&& !instantToRollback.isCompleted();
final boolean isPendingClustering = Objects.equals(HoodieTimeline.REPLACE_COMMIT_ACTION, instantToRollback.getAction())
&& !instantToRollback.isCompleted() && ClusteringUtils.getClusteringPlan(table.getMetaClient(), instantToRollback).isPresent();
validateSavepointRollbacks();
if (!isPendingCompaction) {
if (!isPendingCompaction && !isPendingClustering) {
validateRollbackCommitSequence();
}

View File

@@ -1115,17 +1115,50 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
testClustering(clusteringConfig);
}
private void testClustering(HoodieClusteringConfig clusteringConfig) throws Exception {
@Test
public void testPendingClusteringRollback() throws Exception {
// setup clustering config
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
// start clustering, but dont commit
List<HoodieRecord> allRecords = testClustering(clusteringConfig, false);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
List<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans =
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
assertEquals(1, pendingClusteringPlans.size());
HoodieInstant pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft();
// complete another commit after pending clustering
HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER).build();
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator();
String commitTime = HoodieActiveTimeline.createNewInstantTime();
allRecords.addAll(dataGen.generateInserts(commitTime, 200));
writeAndVerifyBatch(client, allRecords, commitTime);
// verify pending clustering can be rolled back (even though there is a completed commit greater than pending clustering)
client.rollback(pendingClusteringInstant.getTimestamp());
metaClient.reloadActiveTimeline();
// verify there are no pending clustering instants
assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
}
private List<HoodieRecord> testClustering(HoodieClusteringConfig clusteringConfig) throws Exception {
return testClustering(clusteringConfig, false);
}
private List<HoodieRecord> testClustering(HoodieClusteringConfig clusteringConfig, boolean completeClustering) throws Exception {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10);
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator();
String commitTime = "100";
String commitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime);
Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1);
commitTime = "200";
commitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
@@ -1134,12 +1167,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
fileIdIntersection.retainAll(fileIds2);
assertEquals(0, fileIdIntersection.size());
config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withClusteringConfig(clusteringConfig).build();
config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(completeClustering)
.withClusteringConfig(clusteringConfig).build();
// create client with new config.
client = getHoodieWriteClient(config);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, true);
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
List<HoodieRecord> allRecords = Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList());
verifyRecordsWritten(clusteringCommitTime, allRecords, clusterMetadata.getWriteStatuses().collect());
Set<HoodieFileGroupId> insertedFileIds = new HashSet<>();
@@ -1151,6 +1185,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
partitionFiles.getValue().stream().forEach(file ->
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file))));
assertEquals(insertedFileIds, replacedFileIds);
return allRecords;
}
private Set<HoodieFileGroupId> getFileGroupIdsFromWriteStatus(List<WriteStatus> statuses) {