diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index 6668d338f..056ac130c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -16,6 +16,7 @@ package com.uber.hoodie.io; +import com.google.common.collect.Sets; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -60,14 +61,39 @@ public class HoodieCommitArchiveLog { */ public boolean archiveIfRequired() { List commitsToArchive = getCommitsToArchive().collect(Collectors.toList()); + boolean success = true; if (commitsToArchive.iterator().hasNext()) { log.info("Archiving commits " + commitsToArchive); archive(commitsToArchive); - return deleteCommits(commitsToArchive); + success = deleteInstants(commitsToArchive); } else { log.info("No Commits to archive"); - return true; } + return success & deleteOtherInstants(); + } + + private boolean deleteOtherInstants() { + // Delete clean and rollback files + List toDelete = getInstantsToDelete().collect(Collectors.toList()); + if(!toDelete.isEmpty()) { + log.info("Deleting actions " + toDelete); + return deleteInstants(toDelete); + } + return true; + } + + private Stream getInstantsToDelete() { + + int maxCommitsToKeep = config.getMaxCommitsToKeep(); + int minCommitsToKeep = config.getMinCommitsToKeep(); + + HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + HoodieTimeline cleanTimeline = table.getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)); + if (!cleanTimeline.empty() && cleanTimeline.countInstants() > maxCommitsToKeep) { + // Actually do the commits + return cleanTimeline.getInstants().limit(cleanTimeline.countInstants() - minCommitsToKeep); + } + return Stream.empty(); } private Stream getCommitsToArchive() { @@ -87,8 +113,8 @@ public class HoodieCommitArchiveLog { return Stream.empty(); } - private boolean deleteCommits(List commitsToArchive) { - log.info("Deleting commits " + commitsToArchive); + private boolean deleteInstants(List commitsToArchive) { + log.info("Deleting instant " + commitsToArchive); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, config.getBasePath(), true); @@ -99,10 +125,10 @@ public class HoodieCommitArchiveLog { try { if (fs.exists(commitFile)) { success &= fs.delete(commitFile, false); - log.info("Archived and deleted commit file " + commitFile); + log.info("Archived and deleted instant file " + commitFile); } } catch (IOException e) { - throw new HoodieIOException("Failed to delete archived commit " + commitToArchive, + throw new HoodieIOException("Failed to delete archived instant " + commitToArchive, e); } }