diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 9c6914667..dc8251874 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -488,7 +488,7 @@ public class HoodieWriteClient implements Seriali AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap); // Nothing to save in the savepoint table.getActiveTimeline().saveAsComplete( - new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, commitTime), + new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime), AvroUtils.serializeSavepointMetadata(metadata)); logger.info("Savepoint " + commitTime + " created"); return true; @@ -687,7 +687,7 @@ public class HoodieWriteClient implements Seriali HoodieRollbackMetadata rollbackMetadata = AvroUtils.convertRollbackMetadata(startRollbackTime, durationInMs, commits, stats); table.getActiveTimeline().saveAsComplete( - new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), + new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), AvroUtils.serializeRollbackMetadata(rollbackMetadata)); logger.info("Commits " + commits + " rollback is complete"); @@ -757,7 +757,7 @@ public class HoodieWriteClient implements Seriali metadata.getTotalFilesDeleted()); table.getActiveTimeline().saveAsComplete( - new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, startCleanTime), + new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, startCleanTime), AvroUtils.serializeCleanMetadata(metadata)); logger.info("Marked clean started on " + startCleanTime + " as complete"); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 2345d8326..1c991ad26 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.table.timeline; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -194,6 +195,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public void saveAsComplete(HoodieInstant instant, Optional data) { log.info("Marking instant complete " + instant); + Preconditions.checkArgument(instant.isInflight(), + "Could not mark an already completed instant as complete again " + instant); moveInflightToComplete(instant, HoodieTimeline.getCompletedInstant(instant), data); log.info("Completed " + instant); } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java index bbbde5945..cc520acf2 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java @@ -53,13 +53,22 @@ public class HoodieActiveTimelineTest { @Test public void testLoadingInstantsFromFiles() throws IOException { HoodieInstant instant1 = - new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1"); + new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "1"); HoodieInstant instant2 = - new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3"); + new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "3"); HoodieInstant instant3 = - new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5"); + new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "5"); HoodieInstant instant4 = + new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "8"); + HoodieInstant instant1_complete = + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1"); + HoodieInstant instant2_complete = + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3"); + HoodieInstant instant3_complete = + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5"); + HoodieInstant instant4_complete = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "8"); + HoodieInstant instant5 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "9"); @@ -72,13 +81,14 @@ public class HoodieActiveTimelineTest { timeline = timeline.reload(); assertEquals("Total instants should be 5", 5, timeline.countInstants()); + HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream + .of(instant1_complete, instant2_complete, instant3_complete, instant4_complete, + instant5), timeline.getInstants()); + HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream + .of(instant1_complete, instant2_complete, instant3_complete, instant4_complete, + instant5), timeline.getCommitTimeline().getInstants()); HoodieTestUtils.assertStreamEquals("Check the instants stream", - Stream.of(instant1, instant2, instant3, instant4, instant5), timeline.getInstants()); - HoodieTestUtils.assertStreamEquals("Check the instants stream", - Stream.of(instant1, instant2, instant3, instant4, instant5), - timeline.getCommitTimeline().getInstants()); - HoodieTestUtils.assertStreamEquals("Check the instants stream", - Stream.of(instant1, instant2, instant3, instant4), + Stream.of(instant1_complete, instant2_complete, instant3_complete, instant4_complete), timeline.getCommitTimeline().filterCompletedInstants().getInstants()); HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5), timeline.getCommitTimeline().filterInflights().getInstants()); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/ReadOptimizedTableViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/ReadOptimizedTableViewTest.java index e912f2b86..309cc3cf5 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/ReadOptimizedTableViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/ReadOptimizedTableViewTest.java @@ -84,7 +84,7 @@ public class ReadOptimizedTableViewTest { // Make this commit safe HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); HoodieInstant instant1 = - new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime1); + new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); commitTimeline.saveAsComplete(instant1, Optional.empty()); refreshFsView(); assertEquals("", fileName1, @@ -102,7 +102,7 @@ public class ReadOptimizedTableViewTest { // Make it safe HoodieInstant instant2 = - new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime2); + new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2); commitTimeline.saveAsComplete(instant2, Optional.empty()); refreshFsView(); assertEquals("", fileName2,