1
0

Savepoint should not create a hole in the commit timeline

This commit is contained in:
Prasanna Rajaperumal
2017-06-22 15:00:27 -07:00
committed by prazanna
parent 29b906b763
commit 5cc071f74e
4 changed files with 64 additions and 8 deletions

View File

@@ -27,6 +27,8 @@ import com.uber.hoodie.common.file.HoodieAppendLog;
import com.uber.hoodie.exception.HoodieCommitException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.util.Optional;
import java.util.function.Function;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -103,12 +105,16 @@ public class HoodieCommitArchiveLog {
HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
HoodieTimeline commitTimeline = table.getCompletedCommitTimeline();
List<String> savepoints = table.getSavepoints();
// We cannot have any holes in the commit timeline. We cannot archive any commits which are made after the first savepoint present.
Optional<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxCommitsToKeep) {
// Actually do the commits
return commitTimeline.getInstants().filter(s -> !savepoints.contains(s.getTimestamp()))
.limit(commitTimeline.countInstants() - minCommitsToKeep);
return commitTimeline.getInstants().filter(s -> {
// if no savepoint present, then dont filter
return !(firstSavepoint.isPresent() && HoodieTimeline
.compareTimestamps(firstSavepoint.get().getTimestamp(), s.getTimestamp(),
HoodieTimeline.LESSER_OR_EQUAL));
}).limit(commitTimeline.countInstants() - minCommitsToKeep);
}
return Stream.empty();
}

View File

@@ -43,7 +43,8 @@ import java.util.*;
* Test data uses a toy Uber trips, data model.
*/
public class HoodieTestDataGenerator {
static class KeyPartition {
static class KeyPartition {
HoodieKey key;
String partitionPath;
}
@@ -190,10 +191,27 @@ public class HoodieTestDataGenerator {
} finally {
os.close();
}
}
public String[] getPartitionPaths() {
public static void createSavepointFile(String basePath, String commitTime) throws IOException {
Path commitFile =
new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline
.makeSavePointFileName(commitTime));
FileSystem fs = FSUtils.getFs();
FSDataOutputStream os = fs.create(commitFile, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
try {
// Write empty commit metadata
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(
StandardCharsets.UTF_8)));
} finally {
os.close();
}
}
public String[] getPartitionPaths() {
return partitionPaths;
}
}

View File

@@ -175,6 +175,39 @@ public class TestHoodieCommitArchiveLog {
timeline.containsOrBeforeTimelineStarts("103"));
}
@Test
public void testArchiveCommitSavepointNoHole() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 5).build()).build();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, fs);
HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createSavepointFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102");
HoodieTestDataGenerator.createCommitFile(basePath, "103");
HoodieTestDataGenerator.createCommitFile(basePath, "104");
HoodieTestDataGenerator.createCommitFile(basePath, "105");
HoodieTimeline timeline =
metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
boolean result = archiveLog.archiveIfRequired();
assertTrue(result);
timeline =
metadata.getActiveTimeline().reload().getCommitsAndCompactionsTimeline().filterCompletedInstants();
assertEquals(
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)",
5, timeline.countInstants());
assertTrue("Archived commits should always be safe",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")));
assertTrue("Archived commits should always be safe",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")));
assertTrue("Archived commits should always be safe",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")));
}
}

View File

@@ -30,7 +30,6 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;