[HUDI-2892][BUG] Pending Clustering may stain the ActiveTimeLine and lead to incomplete query results (#4172)
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -158,8 +158,12 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
// TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
|
||||
// with logic above to avoid Stream.concats
|
||||
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
|
||||
Option<HoodieInstant> oldestPendingCompactionInstant =
|
||||
table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
|
||||
|
||||
Option<HoodieInstant> oldestPendingCompactionAndReplaceInstant = table.getActiveTimeline()
|
||||
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
|
||||
.filter(s -> !s.isCompleted())
|
||||
.firstInstant();
|
||||
|
||||
Option<HoodieInstant> oldestInflightCommitInstant =
|
||||
table.getActiveTimeline()
|
||||
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
|
||||
@@ -176,7 +180,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
|
||||
}).filter(s -> {
|
||||
// Ensure commits >= oldest pending compaction commit is retained
|
||||
return oldestPendingCompactionInstant
|
||||
return oldestPendingCompactionAndReplaceInstant
|
||||
.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
|
||||
.orElse(true);
|
||||
});
|
||||
|
||||
@@ -243,6 +243,25 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
||||
"Archived commits should always be safe");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testPendingClusteringWillBlockArchival(boolean enableMetadata) throws Exception {
|
||||
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 5, 2);
|
||||
HoodieTestDataGenerator.createPendingReplaceFile(basePath, "00000000", wrapperFs.getConf());
|
||||
for (int i = 1; i < 8; i++) {
|
||||
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 2);
|
||||
// archival
|
||||
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
|
||||
List<HoodieInstant> originalCommits = commitsList.getKey();
|
||||
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
|
||||
assertEquals(originalCommits, commitsAfterArchival);
|
||||
}
|
||||
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
||||
assertEquals(7, timeline.countInstants(),
|
||||
"Since we have a pending clustering instant at 00000000, we should never archive any commit after 00000000");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testArchiveRollbacksTestTable(boolean enableMetadata) throws Exception {
|
||||
|
||||
@@ -388,6 +388,17 @@ public class HoodieTestDataGenerator {
|
||||
.forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata));
|
||||
}
|
||||
|
||||
public static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
|
||||
Arrays.asList(HoodieTimeline.makeInflightReplaceFileName(instantTime),
|
||||
HoodieTimeline.makeRequestedReplaceFileName(instantTime))
|
||||
.forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata));
|
||||
}
|
||||
|
||||
public static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration) {
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
createPendingReplaceFile(basePath, instantTime, configuration, commitMetadata);
|
||||
}
|
||||
|
||||
public static void createEmptyCleanRequestedFile(String basePath, String instantTime, Configuration configuration)
|
||||
throws IOException {
|
||||
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
||||
|
||||
Reference in New Issue
Block a user