Metadata timeline marks an already complete instant as complete again (#98)
This commit is contained in:
@@ -488,7 +488,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
AvroUtils.convertSavepointMetadata(user, comment, latestFilesMap);
|
||||
// Nothing to save in the savepoint
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
|
||||
new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime),
|
||||
AvroUtils.serializeSavepointMetadata(metadata));
|
||||
logger.info("Savepoint " + commitTime + " created");
|
||||
return true;
|
||||
@@ -687,7 +687,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
HoodieRollbackMetadata rollbackMetadata =
|
||||
AvroUtils.convertRollbackMetadata(startRollbackTime, durationInMs, commits, stats);
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(false, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
|
||||
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime),
|
||||
AvroUtils.serializeRollbackMetadata(rollbackMetadata));
|
||||
logger.info("Commits " + commits + " rollback is complete");
|
||||
|
||||
@@ -757,7 +757,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
metadata.getTotalFilesDeleted());
|
||||
|
||||
table.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, startCleanTime),
|
||||
new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, startCleanTime),
|
||||
AvroUtils.serializeCleanMetadata(metadata));
|
||||
logger.info("Marked clean started on " + startCleanTime + " as complete");
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package com.uber.hoodie.common.table.timeline;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
@@ -194,6 +195,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
|
||||
public void saveAsComplete(HoodieInstant instant, Optional<byte[]> data) {
|
||||
log.info("Marking instant complete " + instant);
|
||||
Preconditions.checkArgument(instant.isInflight(),
|
||||
"Could not mark an already completed instant as complete again " + instant);
|
||||
moveInflightToComplete(instant, HoodieTimeline.getCompletedInstant(instant), data);
|
||||
log.info("Completed " + instant);
|
||||
}
|
||||
|
||||
@@ -53,13 +53,22 @@ public class HoodieActiveTimelineTest {
|
||||
@Test
|
||||
public void testLoadingInstantsFromFiles() throws IOException {
|
||||
HoodieInstant instant1 =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "1");
|
||||
HoodieInstant instant2 =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3");
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "3");
|
||||
HoodieInstant instant3 =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5");
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "5");
|
||||
HoodieInstant instant4 =
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "8");
|
||||
HoodieInstant instant1_complete =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
|
||||
HoodieInstant instant2_complete =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3");
|
||||
HoodieInstant instant3_complete =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5");
|
||||
HoodieInstant instant4_complete =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "8");
|
||||
|
||||
HoodieInstant instant5 =
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "9");
|
||||
|
||||
@@ -72,13 +81,14 @@ public class HoodieActiveTimelineTest {
|
||||
timeline = timeline.reload();
|
||||
|
||||
assertEquals("Total instants should be 5", 5, timeline.countInstants());
|
||||
HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream
|
||||
.of(instant1_complete, instant2_complete, instant3_complete, instant4_complete,
|
||||
instant5), timeline.getInstants());
|
||||
HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream
|
||||
.of(instant1_complete, instant2_complete, instant3_complete, instant4_complete,
|
||||
instant5), timeline.getCommitTimeline().getInstants());
|
||||
HoodieTestUtils.assertStreamEquals("Check the instants stream",
|
||||
Stream.of(instant1, instant2, instant3, instant4, instant5), timeline.getInstants());
|
||||
HoodieTestUtils.assertStreamEquals("Check the instants stream",
|
||||
Stream.of(instant1, instant2, instant3, instant4, instant5),
|
||||
timeline.getCommitTimeline().getInstants());
|
||||
HoodieTestUtils.assertStreamEquals("Check the instants stream",
|
||||
Stream.of(instant1, instant2, instant3, instant4),
|
||||
Stream.of(instant1_complete, instant2_complete, instant3_complete, instant4_complete),
|
||||
timeline.getCommitTimeline().filterCompletedInstants().getInstants());
|
||||
HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5),
|
||||
timeline.getCommitTimeline().filterInflights().getInstants());
|
||||
|
||||
@@ -84,7 +84,7 @@ public class ReadOptimizedTableViewTest {
|
||||
// Make this commit safe
|
||||
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
|
||||
HoodieInstant instant1 =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime1);
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);
|
||||
commitTimeline.saveAsComplete(instant1, Optional.empty());
|
||||
refreshFsView();
|
||||
assertEquals("", fileName1,
|
||||
@@ -102,7 +102,7 @@ public class ReadOptimizedTableViewTest {
|
||||
|
||||
// Make it safe
|
||||
HoodieInstant instant2 =
|
||||
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime2);
|
||||
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2);
|
||||
commitTimeline.saveAsComplete(instant2, Optional.empty());
|
||||
refreshFsView();
|
||||
assertEquals("", fileName2,
|
||||
|
||||
Reference in New Issue
Block a user