diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index c53554d8e..2992f4abd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -64,6 +64,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -76,12 +77,14 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; /** * Archiver to bound the growth of files under .hoodie meta path. @@ -409,9 +412,11 @@ public class HoodieTimelineArchiver { .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterInflights().firstInstant(); - // We cannot have any holes in the commit timeline. We cannot archive any commits which are - // made after the first savepoint present. + // NOTE: We cannot have any holes in the commit timeline. + // We cannot archive any commits which are made after the first savepoint present, + // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled. Option firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); + Set savepointTimestamps = table.getSavepointTimestamps(); if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) { // For Merge-On-Read table, inline or async compaction is enabled // We need to make sure that there are enough delta commits in the active timeline @@ -428,28 +433,33 @@ public class HoodieTimelineArchiver { // Actually do the commits Stream instantToArchiveStream = commitTimeline.getInstants() .filter(s -> { - // if no savepoint present, then don't filter - return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); + if (config.shouldArchiveBeyondSavepoint()) { + // skip savepoint commits and proceed further + return !savepointTimestamps.contains(s.getTimestamp()); + } else { + // if no savepoint present, then don't filter + // stop at first savepoint commit + return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); + } }).filter(s -> { // Ensure commits >= oldest pending compaction commit is retained return oldestPendingCompactionAndReplaceInstant - .map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) + .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); }).filter(s -> { // We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't // get archived, i.e, instants after the oldestInflight are retained on the timeline if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) { return oldestInflightCommitInstant.map(instant -> - HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) + compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); } return true; }).filter(s -> oldestInstantToRetainForCompaction.map(instantToRetain -> - HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) + compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) .orElse(true) ); - return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep); } else { return Stream.empty(); @@ -479,7 +489,7 @@ public class HoodieTimelineArchiver { instants = Stream.empty(); } else { LOG.info("Limiting archiving of instants to latest compaction on metadata table at " + latestCompactionTime.get()); - instants = instants.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, + instants = instants.filter(instant -> compareTimestamps(instant.getTimestamp(), LESSER_THAN, latestCompactionTime.get())); } } catch (Exception e) { @@ -487,18 +497,29 @@ public class HoodieTimelineArchiver { } } - // If this is a metadata table, do not archive the commits that live in data set - // active timeline. This is required by metadata table, - // see HoodieTableMetadataUtil#processRollbackMetadata for details. if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) { HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() .setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath())) .setConf(metaClient.getHadoopConf()) .build(); - Option earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp); - if (earliestActiveDatasetCommit.isPresent()) { - instants = instants.filter(instant -> - HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get())); + Option earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant(); + + if (config.shouldArchiveBeyondSavepoint()) { + // There are chances that there could be holes in the timeline due to archival and savepoint interplay. + // So, the first non-savepoint commit in the data timeline is considered as beginning of the active timeline. + Option firstNonSavepointCommit = dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit(); + if (firstNonSavepointCommit.isPresent()) { + String firstNonSavepointCommitTime = firstNonSavepointCommit.get().getTimestamp(); + instants = instants.filter(instant -> + compareTimestamps(instant.getTimestamp(), LESSER_THAN, firstNonSavepointCommitTime)); + } + } else { + // Do not archive the commits that live in data set active timeline. + // This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata for details. + if (earliestActiveDatasetCommit.isPresent()) { + instants = instants.filter(instant -> + compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp())); + } } } @@ -589,7 +610,7 @@ public class HoodieTimelineArchiver { } List instantsToBeDeleted = - instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(), + instants.stream().filter(instant1 -> compareTimestamps(instant1.getTimestamp(), LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList()); for (HoodieInstant deleteInstant : instantsToBeDeleted) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java index 32bccc3a3..3244b4228 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import javax.annotation.concurrent.Immutable; + import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -34,8 +35,8 @@ import java.util.Properties; */ @Immutable @ConfigClassProperty(name = "Archival Configs", - groupName = ConfigGroups.Names.WRITE_CLIENT, - description = "Configurations that control archival.") + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "Configurations that control archival.") public class HoodieArchivalConfig extends HoodieConfig { public static final ConfigProperty AUTO_ARCHIVE = ConfigProperty @@ -92,6 +93,13 @@ public class HoodieArchivalConfig extends HoodieConfig { .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's" + " useful when storage scheme doesn't support append operation."); + public static final ConfigProperty ARCHIVE_BEYOND_SAVEPOINT = ConfigProperty + .key("hoodie.archive.beyond.savepoint") + .defaultValue(false) + .sinceVersion("0.12.0") + .withDocumentation("If enabled, archival will proceed beyond savepoint, skipping savepoint commits. " + + "If disabled, archival will stop at the earliest savepoint commit."); + /** * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */ @@ -107,7 +115,9 @@ public class HoodieArchivalConfig extends HoodieConfig { */ @Deprecated public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key(); - /** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */ + /** + * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead + */ @Deprecated private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue(); /** @@ -186,6 +196,11 @@ public class HoodieArchivalConfig extends HoodieConfig { return this; } + public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) { + archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT, String.valueOf(archiveBeyondSavepoint)); + return this; + } + public HoodieArchivalConfig build() { archivalConfig.setDefaults(HoodieArchivalConfig.class.getName()); return archivalConfig; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index f787232c5..4902c3861 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1209,7 +1209,11 @@ public class HoodieWriteConfig extends HoodieConfig { } public boolean getArchiveMergeEnable() { - return getBoolean(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE); + return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE); + } + + public boolean shouldArchiveBeyondSavepoint() { + return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT); } public long getArchiveMergeSmallFileLimitBytes() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 1e68f820d..5ca3aee76 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -368,10 +368,10 @@ public abstract class HoodieTable implem } /** - * Get the list of savepoints in this table. + * Get the list of savepoint timestamps in this table. */ - public List getSavepoints() { - return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + public Set getSavepointTimestamps() { + return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); } public HoodieActiveTimeline getActiveTimeline() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 79eef43b3..f837d08af 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -104,7 +104,7 @@ public class CleanPlanner implements Ser * Get the list of data file names savepointed. */ public Stream getSavepointedDataFiles(String savepointTime) { - if (!hoodieTable.getSavepoints().contains(savepointTime)) { + if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) { throw new HoodieSavepointException( "Could not get data files for savepoint " + savepointTime + ". No such savepoint."); } @@ -227,7 +227,7 @@ public class CleanPlanner implements Ser + " file versions. "); List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints - List savepointedFiles = hoodieTable.getSavepoints().stream() + List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() .flatMap(this::getSavepointedDataFiles) .collect(Collectors.toList()); @@ -295,7 +295,7 @@ public class CleanPlanner implements Ser List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints - List savepointedFiles = hoodieTable.getSavepoints().stream() + List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() .flatMap(this::getSavepointedDataFiles) .collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 4f41c4a44..0af05b2d6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -19,6 +19,7 @@ package org.apache.hudi.io; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.utils.MetadataConversionUtils; @@ -44,9 +45,9 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -65,7 +66,9 @@ import org.apache.log4j.Logger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; @@ -88,6 +91,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable; +import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -180,6 +184,33 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { long size, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, WriteConcurrencyMode writeConcurrencyMode) throws Exception { + return initTestTableAndGetWriteConfig( + enableMetadata, + minArchivalCommits, + maxArchivalCommits, + maxDeltaCommits, + maxDeltaCommitsMetadataTable, + tableType, + enableArchiveMerge, + archiveFilesBatch, + size, + failedWritesCleaningPolicy, + writeConcurrencyMode, + ARCHIVE_BEYOND_SAVEPOINT.defaultValue()); + } + + private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, + int minArchivalCommits, + int maxArchivalCommits, + int maxDeltaCommits, + int maxDeltaCommitsMetadataTable, + HoodieTableType tableType, + boolean enableArchiveMerge, + int archiveFilesBatch, + long size, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + WriteConcurrencyMode writeConcurrencyMode, + boolean archiveProceedBeyondSavepoints) throws Exception { init(tableType); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) @@ -188,7 +219,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { .withArchiveMergeEnable(enableArchiveMerge) .withArchiveMergeFilesBatchSize(archiveFilesBatch) .withArchiveMergeSmallFileLimit(size) - .archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build()) + .archiveCommitsWith(minArchivalCommits, maxArchivalCommits) + .withArchiveBeyondSavepoint(archiveProceedBeyondSavepoints).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() @@ -249,6 +281,59 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception { + boolean enableMetadata = false; + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, HoodieTableType.COPY_ON_WRITE, + false, 10, 209715200, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint); + + // min archival commits is 2 and max archival commits is 4. and so, after 5th commit, 3 commits will be archived. + for (int i = 1; i < 5; i++) { + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + + // savepoint 3rd commit + String commitToSavepoint = String.format("%08d", 3); + HoodieSavepointMetadata savepointMetadata = testTable.doSavepoint(commitToSavepoint); + testTable.addSavepoint(commitToSavepoint, savepointMetadata); + + for (int i = 5; i < 7; i++) { + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + // trigger archival + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); + List originalCommits = commitsList.getKey(); + List commitsAfterArchival = commitsList.getValue(); + + if (archiveBeyondSavepoint) { + // retains only 2 commits. C3 and C8. and savepointed commit for C3. + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000004", "00000005")), + Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", "00000006")).stream(), getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream()) + .collect(Collectors.toList()), commitsAfterArchival); + } else { + // archives only C1 and C2. stops at first savepointed commit C3. + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002")), + Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", "00000004", "00000005", "00000006")).stream(), + getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream()) + .collect(Collectors.toList()), commitsAfterArchival); + } + + for (int i = 7; i < 10; i++) { + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + + // once savepoint is removed. C3 will be archived. + testTable.deleteSavepoint(commitToSavepoint); + commitsList = archiveAndGetCommitsList(writeConfig); + originalCommits = commitsList.getKey(); + commitsAfterArchival = commitsList.getValue(); + + metaClient.reloadActiveTimeline(); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002","00000003", "00000004", "00000005", "00000006", "00000007")), + getActiveCommitInstants(Arrays.asList("00000008", "00000009")), commitsAfterArchival); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception { @@ -563,13 +648,22 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { assertEquals(originalCommits, commitsAfterArchival); } + private static Stream archiveCommitSavepointNoHoleParams() { + return Arrays.stream(new Boolean[][] { + {true, true}, + {false, true}, + {true, false}, + {false, false} + }).map(Arguments::of); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable) throws Exception { + @MethodSource("archiveCommitSavepointNoHoleParams") + public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable, boolean archiveBeyondSavepoint) throws Exception { init(); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table") - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).withArchiveBeyondSavepoint(archiveBeyondSavepoint).build()) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) @@ -596,14 +690,30 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); assertTrue(archiver.archiveIfRequired(context)); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); - assertEquals(5, timeline.countInstants(), - "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); - assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")), - "Archived commits should always be safe"); - assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")), - "Archived commits should always be safe"); - assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")), - "Archived commits should always be safe"); + if (archiveBeyondSavepoint) { + // commits in active timeline = 101 and 105. + assertEquals(2, timeline.countInstants(), + "Since archiveBeyondSavepoint config is enabled, we will archive commits 102, 103 "); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")), + "Savepointed commits should always be safe"); + assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")), + "102 expected to be archived"); + assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")), + "103 expected to be archived"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105")), + "104 expected to be archived"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105")), + "105 expected to be in active timeline"); + } else { + assertEquals(5, timeline.countInstants(), + "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")), + "Archived commits should always be safe"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")), + "Archived commits should always be safe"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")), + "Archived commits should always be safe"); + } } @ParameterizedTest @@ -934,7 +1044,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { HoodieInstant firstInstant = metaClient.reloadActiveTimeline().firstInstant().get(); expectedArchivedInstants = expectedArchivedInstants.stream() .filter(entry -> HoodieTimeline.compareTimestamps(entry.getTimestamp(), HoodieTimeline.LESSER_THAN, firstInstant.getTimestamp() - )).collect(Collectors.toList()); + )).collect(Collectors.toList()); expectedArchivedInstants.forEach(entry -> assertTrue(metaClient.getArchivedTimeline().containsInstant(entry))); } @@ -1283,7 +1393,7 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { HoodieTimeline timeline = metaClient.getActiveTimeline(); expectedArchivedInstants.forEach(entry -> { // check safety - if (entry.getAction() != HoodieTimeline.ROLLBACK_ACTION) { + if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) { assertTrue(timeline.containsOrBeforeTimelineStarts(entry.getTimestamp()), "Archived commits should always be safe"); } } @@ -1315,6 +1425,10 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness { return getActiveCommitInstants(commitTimes, HoodieTimeline.COMMIT_ACTION); } + private List getActiveSavepointedCommitInstants(List commitTimes) { + return getActiveCommitInstants(commitTimes, HoodieTimeline.SAVEPOINT_ACTION); + } + private List getActiveCommitInstants(List commitTimes, String action) { List allInstants = new ArrayList<>(); commitTimes.forEach(entry -> allInstants.add(new HoodieInstant(State.COMPLETED, action, entry))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java index bf6ce611e..9e407aa76 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java @@ -28,6 +28,10 @@ import java.util.List; import java.util.TreeMap; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; + /** * A set of data/base files + set of log files, that make up an unit for all operations. */ @@ -118,21 +122,22 @@ public class HoodieFileGroup implements Serializable { * some log files, that are based off a commit or delta commit. */ private boolean isFileSliceCommitted(FileSlice slice) { - String maxCommitTime = lastInstant.get().getTimestamp(); - return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()) - && HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime); + if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) { + return false; + } + return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()); } /** - * Get all the the file slices including in-flight ones as seen in underlying file-system. + * Get all the file slices including in-flight ones as seen in underlying file system. */ public Stream getAllFileSlicesIncludingInflight() { return fileSlices.values().stream(); } /** - * Get latest file slices including in-flight ones. + * Get the latest file slices including inflight ones. */ public Option getLatestFileSlicesIncludingInflight() { return Option.fromJavaOptional(getAllFileSlicesIncludingInflight().findFirst()); @@ -169,8 +174,7 @@ public class HoodieFileGroup implements Serializable { * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime. */ public Option getLatestFileSliceBeforeOrOn(String maxInstantTime) { - return Option.fromJavaOptional(getAllFileSlices().filter(slice -> HoodieTimeline - .compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)).findFirst()); + return Option.fromJavaOptional(getAllFileSlices().filter(slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, maxInstantTime)).findFirst()); } /** @@ -181,7 +185,7 @@ public class HoodieFileGroup implements Serializable { */ public Option getLatestFileSliceBefore(String maxInstantTime) { return Option.fromJavaOptional(getAllFileSlices().filter( - slice -> HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, maxInstantTime)) + slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN, maxInstantTime)) .findFirst()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index ac1dd007d..e7970baf6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -35,6 +35,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.Collections.reverse; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; /** * HoodieDefaultTimeline is a default implementation of the HoodieTimeline. It provides methods to inspect a @@ -118,7 +119,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline { Option earliestPending = getWriteTimeline().filterInflightsAndRequested().firstInstant(); if (earliestPending.isPresent()) { return getWriteTimeline().filterCompletedInstants() - .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, earliestPending.get().getTimestamp())); + .filter(instant -> compareTimestamps(instant.getTimestamp(), LESSER_THAN, earliestPending.get().getTimestamp())); } return getWriteTimeline().filterCompletedInstants(); } @@ -156,34 +157,34 @@ public class HoodieDefaultTimeline implements HoodieTimeline { @Override public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)).limit(numCommits), + .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)).limit(numCommits), details); } @Override public HoodieTimeline findInstantsAfter(String instantTime) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details); + .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details); } @Override public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime, int numCommits) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, commitTime)) + .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, commitTime)) .limit(numCommits), details); } @Override public HoodieDefaultTimeline findInstantsBefore(String instantTime) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantTime)), + .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN, instantTime)), details); } @Override public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, instantTime)), + .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, instantTime)), details); } @@ -362,11 +363,28 @@ public class HoodieDefaultTimeline implements HoodieTimeline { @Override public boolean isBeforeTimelineStarts(String instant) { - Option firstCommit = firstInstant(); - return firstCommit.isPresent() - && HoodieTimeline.compareTimestamps(instant, LESSER_THAN, firstCommit.get().getTimestamp()); + Option firstNonSavepointCommit = getFirstNonSavepointCommit(); + return firstNonSavepointCommit.isPresent() + && compareTimestamps(instant, LESSER_THAN, firstNonSavepointCommit.get().getTimestamp()); } + public Option getFirstNonSavepointCommit() { + Option firstCommit = firstInstant(); + Set savepointTimestamps = instants.stream() + .filter(entry -> entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION)) + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toSet()); + Option firstNonSavepointCommit = firstCommit; + if (!savepointTimestamps.isEmpty()) { + // There are chances that there could be holes in the timeline due to archival and savepoint interplay. + // So, the first non-savepoint commit is considered as beginning of the active timeline. + firstNonSavepointCommit = Option.fromJavaOptional(instants.stream() + .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp())) + .findFirst()); + } + return firstNonSavepointCommit; + } + @Override public Option getInstantDetails(HoodieInstant instant) { return details.apply(instant); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index c3fbd9731..e52a27959 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -305,6 +305,15 @@ public interface HoodieTimeline extends Serializable { */ boolean isBeforeTimelineStarts(String ts); + /** + * First non-savepoint commit in the active data timeline. Examples: + * 1. An active data timeline C1, C2, C3, C4, C5 returns C1. + * 2. If archival is allowed beyond savepoint and let's say C1, C2, C4 have been archived + * while C3, C5 have been savepointed, then for the data timeline + * C3, C3_Savepoint, C5, C5_Savepoint, C6, C7 returns C6. + */ + Option getFirstNonSavepointCommit(); + /** * Read the completed instant details. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java index 8ea9ad94a..91a2019f1 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java @@ -18,10 +18,15 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.MockHoodieTimeline; + import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -47,4 +52,25 @@ public class TestHoodieFileGroup { assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("001")); assertTrue((new HoodieFileGroup(fileGroup)).getLatestFileSlice().get().getBaseInstantTime().equals("001")); } + + @Test + public void testCommittedFileSlicesWithSavepointAndHoles() { + MockHoodieTimeline activeTimeline = new MockHoodieTimeline(Stream.of( + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"), + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "01"), + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"), + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05") // this can be DELTA_COMMIT/REPLACE_COMMIT as well + ).collect(Collectors.toList())); + HoodieFileGroup fileGroup = new HoodieFileGroup("", "data", activeTimeline.filterCompletedAndCompactionInstants()); + for (int i = 0; i < 7; i++) { + HoodieBaseFile baseFile = new HoodieBaseFile("data_1_0" + i); + fileGroup.addBaseFile(baseFile); + } + List allFileSlices = fileGroup.getAllFileSlices().collect(Collectors.toList()); + assertEquals(6, allFileSlices.size()); + assertTrue(!allFileSlices.stream().anyMatch(s -> s.getBaseInstantTime().equals("06"))); + assertEquals(7, fileGroup.getAllFileSlicesIncludingInflight().count()); + assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("05")); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 569233747..628aeb8e8 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -262,6 +262,57 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { assertEquals(instant7.getTimestamp(), timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp()); } + @Test + public void testTimelineWithSavepointAndHoles() { + timeline = new MockHoodieTimeline(Stream.of( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05") // this can be DELTA_COMMIT/REPLACE_COMMIT as well + ).collect(Collectors.toList())); + assertTrue(timeline.isBeforeTimelineStarts("00")); + assertTrue(timeline.isBeforeTimelineStarts("01")); + assertTrue(timeline.isBeforeTimelineStarts("02")); + assertTrue(timeline.isBeforeTimelineStarts("03")); + assertTrue(timeline.isBeforeTimelineStarts("04")); + assertFalse(timeline.isBeforeTimelineStarts("05")); + assertFalse(timeline.isBeforeTimelineStarts("06")); + + // with an inflight savepoint in between + timeline = new MockHoodieTimeline(Stream.of( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"), + new HoodieInstant(State.INFLIGHT, HoodieTimeline.SAVEPOINT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05") + ).collect(Collectors.toList())); + assertTrue(timeline.isBeforeTimelineStarts("00")); + assertTrue(timeline.isBeforeTimelineStarts("01")); + assertTrue(timeline.isBeforeTimelineStarts("02")); + assertTrue(timeline.isBeforeTimelineStarts("03")); + assertTrue(timeline.isBeforeTimelineStarts("04")); + assertFalse(timeline.isBeforeTimelineStarts("05")); + assertFalse(timeline.isBeforeTimelineStarts("06")); + + // with a pending replacecommit after savepoints + timeline = new MockHoodieTimeline(Stream.of( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05"), + new HoodieInstant(State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, "06") + ).collect(Collectors.toList())); + assertTrue(timeline.isBeforeTimelineStarts("00")); + assertTrue(timeline.isBeforeTimelineStarts("01")); + assertTrue(timeline.isBeforeTimelineStarts("02")); + assertTrue(timeline.isBeforeTimelineStarts("03")); + assertTrue(timeline.isBeforeTimelineStarts("04")); + assertFalse(timeline.isBeforeTimelineStarts("05")); + assertFalse(timeline.isBeforeTimelineStarts("06")); + } + @Test public void testTimelineGetOperations() { List allInstants = getAllInstants(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 290753ef5..f631ec94b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -35,6 +36,7 @@ import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; @@ -156,6 +158,10 @@ public class FileCreateUtils { } } + public static void createSavepointCommit(String basePath, String instantTime, HoodieSavepointMetadata savepointMetadata) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.SAVEPOINT_EXTENSION, TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata).get()); + } + public static void createCommit(String basePath, String instantTime, FileSystem fs) throws IOException { createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION, fs); } @@ -285,6 +291,10 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION); } + public static void createInflightSavepoint(String basePath, String instantTime) throws IOException { + createAuxiliaryMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION); + } + public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); @@ -439,4 +449,9 @@ public class FileCreateUtils { public static void deleteDeltaCommit(String basePath, String instantTime, FileSystem fs) throws IOException { deleteMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION, fs); } + + public static void deleteSavepointCommit(String basePath, String instantTime, FileSystem fs) throws IOException { + deleteMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION, fs); + deleteMetaFile(basePath, instantTime, HoodieTimeline.SAVEPOINT_EXTENSION, fs); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 412a69c94..1351d1681 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -99,6 +99,7 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCom import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightSavepoint; import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile; @@ -109,6 +110,8 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRe import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRollbackFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createRestoreFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createRollbackFile; +import static org.apache.hudi.common.testutils.FileCreateUtils.createSavepointCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.deleteSavepointCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName; import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap; @@ -199,6 +202,12 @@ public class HoodieTestTable { return this; } + public HoodieTestTable addSavepointCommit(String instantTime, HoodieSavepointMetadata savepointMetadata) throws IOException { + createInflightSavepoint(basePath, instantTime); + createSavepointCommit(basePath, instantTime, savepointMetadata); + return this; + } + public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationType, String commitTime, HoodieTestTableState testTableState) { String actionType = getCommitActionType(operationType, metaClient.getTableType()); @@ -394,7 +403,7 @@ public class HoodieTestTable { public HoodieSavepointMetadata getSavepointMetadata(String instant, Map> partitionToFilesMeta) { HoodieSavepointMetadata savepointMetadata = new HoodieSavepointMetadata(); - savepointMetadata.setSavepointedAt(Long.valueOf(instant)); + savepointMetadata.setSavepointedAt(12345L); Map partitionMetadataMap = new HashMap<>(); for (Map.Entry> entry : partitionToFilesMeta.entrySet()) { HoodieSavepointPartitionMetadata savepointPartitionMetadata = new HoodieSavepointPartitionMetadata(); @@ -404,6 +413,7 @@ public class HoodieTestTable { } savepointMetadata.setPartitionMetadata(partitionMetadataMap); savepointMetadata.setSavepointedBy("test"); + savepointMetadata.setComments("test_comment"); return savepointMetadata; } @@ -454,6 +464,17 @@ public class HoodieTestTable { return this; } + public HoodieTestTable addSavepoint(String instantTime, HoodieSavepointMetadata savepointMetadata) throws IOException { + createInflightSavepoint(basePath, instantTime); + createSavepointCommit(basePath, instantTime, savepointMetadata); + return this; + } + + public HoodieTestTable deleteSavepoint(String instantTime) throws IOException { + deleteSavepointCommit(basePath, instantTime, fs); + return this; + } + public HoodieTestTable forCommit(String instantTime) { currentInstantTime = instantTime; return this; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java index 5da6b325f..401453180 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/MockHoodieTimeline.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import java.util.Comparator; +import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -38,4 +39,9 @@ public class MockHoodieTimeline extends HoodieActiveTimeline { inflights.map(s -> new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, s))) .sorted(Comparator.comparing(HoodieInstant::getFileName)).collect(Collectors.toList())); } + + public MockHoodieTimeline(List instants) { + super(); + this.setInstants(instants); + } }