[HUDI-3884] Support archival beyond savepoint commits (#5837)
Co-authored-by: sivabalan <n.siva.b@gmail.com>
This commit is contained in:
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.io;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
||||
import org.apache.hudi.client.HoodieTimelineArchiver;
|
||||
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
|
||||
import org.apache.hudi.client.utils.MetadataConversionUtils;
|
||||
@@ -44,9 +45,9 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieArchivalConfig;
|
||||
import org.apache.hudi.config.HoodieCleanConfig;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieLockConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -65,7 +66,9 @@ import org.apache.log4j.Logger;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.CsvSource;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -88,6 +91,7 @@ import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable;
|
||||
import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
@@ -180,6 +184,33 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
long size,
|
||||
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
|
||||
WriteConcurrencyMode writeConcurrencyMode) throws Exception {
|
||||
return initTestTableAndGetWriteConfig(
|
||||
enableMetadata,
|
||||
minArchivalCommits,
|
||||
maxArchivalCommits,
|
||||
maxDeltaCommits,
|
||||
maxDeltaCommitsMetadataTable,
|
||||
tableType,
|
||||
enableArchiveMerge,
|
||||
archiveFilesBatch,
|
||||
size,
|
||||
failedWritesCleaningPolicy,
|
||||
writeConcurrencyMode,
|
||||
ARCHIVE_BEYOND_SAVEPOINT.defaultValue());
|
||||
}
|
||||
|
||||
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
|
||||
int minArchivalCommits,
|
||||
int maxArchivalCommits,
|
||||
int maxDeltaCommits,
|
||||
int maxDeltaCommitsMetadataTable,
|
||||
HoodieTableType tableType,
|
||||
boolean enableArchiveMerge,
|
||||
int archiveFilesBatch,
|
||||
long size,
|
||||
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
|
||||
WriteConcurrencyMode writeConcurrencyMode,
|
||||
boolean archiveProceedBeyondSavepoints) throws Exception {
|
||||
init(tableType);
|
||||
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
|
||||
@@ -188,7 +219,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
.withArchiveMergeEnable(enableArchiveMerge)
|
||||
.withArchiveMergeFilesBatchSize(archiveFilesBatch)
|
||||
.withArchiveMergeSmallFileLimit(size)
|
||||
.archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
|
||||
.archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
|
||||
.withArchiveBeyondSavepoint(archiveProceedBeyondSavepoints).build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
|
||||
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
||||
@@ -249,6 +281,59 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {
|
||||
boolean enableMetadata = false;
|
||||
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, HoodieTableType.COPY_ON_WRITE,
|
||||
false, 10, 209715200, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint);
|
||||
|
||||
// min archival commits is 2 and max archival commits is 4. and so, after 5th commit, 3 commits will be archived.
|
||||
for (int i = 1; i < 5; i++) {
|
||||
testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
|
||||
}
|
||||
|
||||
// savepoint 3rd commit
|
||||
String commitToSavepoint = String.format("%08d", 3);
|
||||
HoodieSavepointMetadata savepointMetadata = testTable.doSavepoint(commitToSavepoint);
|
||||
testTable.addSavepoint(commitToSavepoint, savepointMetadata);
|
||||
|
||||
for (int i = 5; i < 7; i++) {
|
||||
testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
|
||||
}
|
||||
// trigger archival
|
||||
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
|
||||
List<HoodieInstant> originalCommits = commitsList.getKey();
|
||||
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
|
||||
|
||||
if (archiveBeyondSavepoint) {
|
||||
// retains only 2 commits. C3 and C8. and savepointed commit for C3.
|
||||
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000004", "00000005")),
|
||||
Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", "00000006")).stream(), getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
|
||||
.collect(Collectors.toList()), commitsAfterArchival);
|
||||
} else {
|
||||
// archives only C1 and C2. stops at first savepointed commit C3.
|
||||
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002")),
|
||||
Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", "00000004", "00000005", "00000006")).stream(),
|
||||
getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream())
|
||||
.collect(Collectors.toList()), commitsAfterArchival);
|
||||
}
|
||||
|
||||
for (int i = 7; i < 10; i++) {
|
||||
testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
|
||||
}
|
||||
|
||||
// once savepoint is removed. C3 will be archived.
|
||||
testTable.deleteSavepoint(commitToSavepoint);
|
||||
commitsList = archiveAndGetCommitsList(writeConfig);
|
||||
originalCommits = commitsList.getKey();
|
||||
commitsAfterArchival = commitsList.getValue();
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002","00000003", "00000004", "00000005", "00000006", "00000007")),
|
||||
getActiveCommitInstants(Arrays.asList("00000008", "00000009")), commitsAfterArchival);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception {
|
||||
@@ -563,13 +648,22 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
assertEquals(originalCommits, commitsAfterArchival);
|
||||
}
|
||||
|
||||
private static Stream<Arguments> archiveCommitSavepointNoHoleParams() {
|
||||
return Arrays.stream(new Boolean[][] {
|
||||
{true, true},
|
||||
{false, true},
|
||||
{true, false},
|
||||
{false, false}
|
||||
}).map(Arguments::of);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable) throws Exception {
|
||||
@MethodSource("archiveCommitSavepointNoHoleParams")
|
||||
public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable, boolean archiveBeyondSavepoint) throws Exception {
|
||||
init();
|
||||
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
|
||||
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).build())
|
||||
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).withArchiveBeyondSavepoint(archiveBeyondSavepoint).build())
|
||||
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
|
||||
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
||||
.withRemoteServerPort(timelineServicePort).build())
|
||||
@@ -596,14 +690,30 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
|
||||
assertTrue(archiver.archiveIfRequired(context));
|
||||
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
||||
assertEquals(5, timeline.countInstants(),
|
||||
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")),
|
||||
"Archived commits should always be safe");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")),
|
||||
"Archived commits should always be safe");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")),
|
||||
"Archived commits should always be safe");
|
||||
if (archiveBeyondSavepoint) {
|
||||
// commits in active timeline = 101 and 105.
|
||||
assertEquals(2, timeline.countInstants(),
|
||||
"Since archiveBeyondSavepoint config is enabled, we will archive commits 102, 103 ");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")),
|
||||
"Savepointed commits should always be safe");
|
||||
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")),
|
||||
"102 expected to be archived");
|
||||
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")),
|
||||
"103 expected to be archived");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105")),
|
||||
"104 expected to be archived");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105")),
|
||||
"105 expected to be in active timeline");
|
||||
} else {
|
||||
assertEquals(5, timeline.countInstants(),
|
||||
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")),
|
||||
"Archived commits should always be safe");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")),
|
||||
"Archived commits should always be safe");
|
||||
assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")),
|
||||
"Archived commits should always be safe");
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@@ -934,7 +1044,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
HoodieInstant firstInstant = metaClient.reloadActiveTimeline().firstInstant().get();
|
||||
expectedArchivedInstants = expectedArchivedInstants.stream()
|
||||
.filter(entry -> HoodieTimeline.compareTimestamps(entry.getTimestamp(), HoodieTimeline.LESSER_THAN, firstInstant.getTimestamp()
|
||||
)).collect(Collectors.toList());
|
||||
)).collect(Collectors.toList());
|
||||
expectedArchivedInstants.forEach(entry -> assertTrue(metaClient.getArchivedTimeline().containsInstant(entry)));
|
||||
}
|
||||
|
||||
@@ -1283,7 +1393,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline();
|
||||
expectedArchivedInstants.forEach(entry -> {
|
||||
// check safety
|
||||
if (entry.getAction() != HoodieTimeline.ROLLBACK_ACTION) {
|
||||
if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
|
||||
assertTrue(timeline.containsOrBeforeTimelineStarts(entry.getTimestamp()), "Archived commits should always be safe");
|
||||
}
|
||||
}
|
||||
@@ -1315,6 +1425,10 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
|
||||
return getActiveCommitInstants(commitTimes, HoodieTimeline.COMMIT_ACTION);
|
||||
}
|
||||
|
||||
private List<HoodieInstant> getActiveSavepointedCommitInstants(List<String> commitTimes) {
|
||||
return getActiveCommitInstants(commitTimes, HoodieTimeline.SAVEPOINT_ACTION);
|
||||
}
|
||||
|
||||
private List<HoodieInstant> getActiveCommitInstants(List<String> commitTimes, String action) {
|
||||
List<HoodieInstant> allInstants = new ArrayList<>();
|
||||
commitTimes.forEach(entry -> allInstants.add(new HoodieInstant(State.COMPLETED, action, entry)));
|
||||
|
||||
Reference in New Issue
Block a user