From 5cc071f74e518b5b37ce953434d6445c2a97024b Mon Sep 17 00:00:00 2001 From: Prasanna Rajaperumal Date: Thu, 22 Jun 2017 15:00:27 -0700 Subject: [PATCH] Savepoint should not create a hole in the commit timeline --- .../hoodie/io/HoodieCommitArchiveLog.java | 14 +++++--- .../common/HoodieTestDataGenerator.java | 24 ++++++++++++-- .../hoodie/io/TestHoodieCommitArchiveLog.java | 33 +++++++++++++++++++ .../hoodie/common/model/HoodieTestUtils.java | 1 - 4 files changed, 64 insertions(+), 8 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index 056ac130c..d48a2f8e7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -27,6 +27,8 @@ import com.uber.hoodie.common.file.HoodieAppendLog; import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; +import java.util.Optional; +import java.util.function.Function; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -103,12 +105,16 @@ public class HoodieCommitArchiveLog { HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); - List savepoints = table.getSavepoints(); - + // We cannot have any holes in the commit timeline. We cannot archive any commits which are made after the first savepoint present. + Optional firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) { // Actually do the commits - return commitTimeline.getInstants().filter(s -> !savepoints.contains(s.getTimestamp())) - .limit(commitTimeline.countInstants() - minCommitsToKeep); + return commitTimeline.getInstants().filter(s -> { + // if no savepoint present, then dont filter + return !(firstSavepoint.isPresent() && HoodieTimeline + .compareTimestamps(firstSavepoint.get().getTimestamp(), s.getTimestamp(), + HoodieTimeline.LESSER_OR_EQUAL)); + }).limit(commitTimeline.countInstants() - minCommitsToKeep); } return Stream.empty(); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index e9ed3b1ae..858b781f5 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -43,7 +43,8 @@ import java.util.*; * Test data uses a toy Uber trips, data model. */ public class HoodieTestDataGenerator { - static class KeyPartition { + + static class KeyPartition { HoodieKey key; String partitionPath; } @@ -190,10 +191,27 @@ public class HoodieTestDataGenerator { } finally { os.close(); } - } - public String[] getPartitionPaths() { + public static void createSavepointFile(String basePath, String commitTime) throws IOException { + Path commitFile = + new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline + .makeSavePointFileName(commitTime)); + FileSystem fs = FSUtils.getFs(); + FSDataOutputStream os = fs.create(commitFile, true); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + try { + // Write empty commit metadata + os.writeBytes(new String(commitMetadata.toJsonString().getBytes( + StandardCharsets.UTF_8))); + } finally { + os.close(); + } + } + + + + public String[] getPartitionPaths() { return partitionPaths; } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 40926c2e8..64ea24b03 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -175,6 +175,39 @@ public class TestHoodieCommitArchiveLog { timeline.containsOrBeforeTimelineStarts("103")); } + @Test + public void testArchiveCommitSavepointNoHole() throws IOException { + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .forTable("test-trip-table").withCompactionConfig( + HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build(); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs); + HoodieTestDataGenerator.createCommitFile(basePath, "100"); + HoodieTestDataGenerator.createCommitFile(basePath, "101"); + HoodieTestDataGenerator.createSavepointFile(basePath, "101"); + HoodieTestDataGenerator.createCommitFile(basePath, "102"); + HoodieTestDataGenerator.createCommitFile(basePath, "103"); + HoodieTestDataGenerator.createCommitFile(basePath, "104"); + HoodieTestDataGenerator.createCommitFile(basePath, "105"); + + HoodieTimeline timeline = + metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); + assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants()); + boolean result = archiveLog.archiveIfRequired(); + assertTrue(result); + timeline = + metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline().filterCompletedInstants(); + assertEquals( + "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)", + 5, timeline.countInstants()); + assertTrue("Archived commits should always be safe", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101"))); + assertTrue("Archived commits should always be safe", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102"))); + assertTrue("Archived commits should always be safe", + timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103"))); + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 0d1f4030c..8771a7bb5 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -30,7 +30,6 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord;