diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index bb295b64d..6e1f14ab7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -126,9 +126,11 @@ public class HoodieCommitArchiveLog { HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); // GroupBy each action and limit each action timeline to maxCommitsToKeep + // TODO: Handle ROLLBACK_ACTION in future + // ROLLBACK_ACTION is currently not defined in HoodieActiveTimeline HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline() - .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION, - HoodieTimeline.ROLLBACK_ACTION)); + .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)) + .filterCompletedInstants(); Stream instants = cleanAndRollbackTimeline.getInstants() .collect(Collectors.groupingBy(s -> s.getAction())) .entrySet() diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index d0333d077..3add748c9 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -19,6 +19,7 @@ package com.uber.hoodie.io; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Sets; import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieLogFile; @@ -90,11 +91,13 @@ public class TestHoodieCommitArchiveLog { assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); HoodieTestUtils.createCleanFiles(basePath, "100"); + HoodieTestUtils.createInflightCleanFiles(basePath, "101"); HoodieTestUtils.createCleanFiles(basePath, "101"); HoodieTestUtils.createCleanFiles(basePath, "102"); HoodieTestUtils.createCleanFiles(basePath, "103"); HoodieTestUtils.createCleanFiles(basePath, "104"); HoodieTestUtils.createCleanFiles(basePath, "105"); + HoodieTestUtils.createInflightCleanFiles(basePath, "106", "107"); //reload the timeline and get all the commmits before archive timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline() @@ -103,6 +106,9 @@ public class TestHoodieCommitArchiveLog { assertEquals("Loaded 6 commits and the count should match", 12, timeline.countInstants()); + // verify in-flight instants before archive + verifyInflightInstants(metaClient, 3); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, new HoodieTableMetaClient(fs.getConf(), basePath, true)); @@ -141,6 +147,9 @@ public class TestHoodieCommitArchiveLog { "Read commits map should match the originalCommits - commitsLoadedFromArchival", originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()), readCommits); + + // verify in-flight instants after archive + verifyInflightInstants(metaClient, 3); } @Test @@ -233,5 +242,10 @@ public class TestHoodieCommitArchiveLog { timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103"))); } - + private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) { + HoodieTimeline timeline = metaClient.getActiveTimeline().reload() + .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights(); + assertEquals("Loaded inflight clean actions and the count should match", + expectedTotalInstants, timeline.countInstants()); + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 74103f8fa..db0e87d6f 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -124,6 +124,14 @@ public class HoodieTestUtils { } } + public static final void createInflightCleanFiles(String basePath, String... commitTimes) + throws IOException { + for (String commitTime : commitTimes) { + new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeInflightCleanerFileName(commitTime)).createNewFile(); + } + } + public static final String createNewDataFile(String basePath, String partitionPath, String commitTime) throws IOException { String fileID = UUID.randomUUID().toString();