Delete other instant files (.clean) as well during commit archival
This commit is contained in:
committed by
prazanna
parent
e1d13f2bc8
commit
bae98efeee
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
package com.uber.hoodie.io;
|
package com.uber.hoodie.io;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
||||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||||
@@ -60,14 +61,39 @@ public class HoodieCommitArchiveLog {
|
|||||||
*/
|
*/
|
||||||
public boolean archiveIfRequired() {
|
public boolean archiveIfRequired() {
|
||||||
List<HoodieInstant> commitsToArchive = getCommitsToArchive().collect(Collectors.toList());
|
List<HoodieInstant> commitsToArchive = getCommitsToArchive().collect(Collectors.toList());
|
||||||
|
boolean success = true;
|
||||||
if (commitsToArchive.iterator().hasNext()) {
|
if (commitsToArchive.iterator().hasNext()) {
|
||||||
log.info("Archiving commits " + commitsToArchive);
|
log.info("Archiving commits " + commitsToArchive);
|
||||||
archive(commitsToArchive);
|
archive(commitsToArchive);
|
||||||
return deleteCommits(commitsToArchive);
|
success = deleteInstants(commitsToArchive);
|
||||||
} else {
|
} else {
|
||||||
log.info("No Commits to archive");
|
log.info("No Commits to archive");
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
return success & deleteOtherInstants();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean deleteOtherInstants() {
|
||||||
|
// Delete clean and rollback files
|
||||||
|
List<HoodieInstant> toDelete = getInstantsToDelete().collect(Collectors.toList());
|
||||||
|
if(!toDelete.isEmpty()) {
|
||||||
|
log.info("Deleting actions " + toDelete);
|
||||||
|
return deleteInstants(toDelete);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Stream<HoodieInstant> getInstantsToDelete() {
|
||||||
|
|
||||||
|
int maxCommitsToKeep = config.getMaxCommitsToKeep();
|
||||||
|
int minCommitsToKeep = config.getMinCommitsToKeep();
|
||||||
|
|
||||||
|
HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||||
|
HoodieTimeline cleanTimeline = table.getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION));
|
||||||
|
if (!cleanTimeline.empty() && cleanTimeline.countInstants() > maxCommitsToKeep) {
|
||||||
|
// Actually do the commits
|
||||||
|
return cleanTimeline.getInstants().limit(cleanTimeline.countInstants() - minCommitsToKeep);
|
||||||
|
}
|
||||||
|
return Stream.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stream<HoodieInstant> getCommitsToArchive() {
|
private Stream<HoodieInstant> getCommitsToArchive() {
|
||||||
@@ -87,8 +113,8 @@ public class HoodieCommitArchiveLog {
|
|||||||
return Stream.empty();
|
return Stream.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean deleteCommits(List<HoodieInstant> commitsToArchive) {
|
private boolean deleteInstants(List<HoodieInstant> commitsToArchive) {
|
||||||
log.info("Deleting commits " + commitsToArchive);
|
log.info("Deleting instant " + commitsToArchive);
|
||||||
HoodieTableMetaClient metaClient =
|
HoodieTableMetaClient metaClient =
|
||||||
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
new HoodieTableMetaClient(fs, config.getBasePath(), true);
|
||||||
|
|
||||||
@@ -99,10 +125,10 @@ public class HoodieCommitArchiveLog {
|
|||||||
try {
|
try {
|
||||||
if (fs.exists(commitFile)) {
|
if (fs.exists(commitFile)) {
|
||||||
success &= fs.delete(commitFile, false);
|
success &= fs.delete(commitFile, false);
|
||||||
log.info("Archived and deleted commit file " + commitFile);
|
log.info("Archived and deleted instant file " + commitFile);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new HoodieIOException("Failed to delete archived commit " + commitToArchive,
|
throw new HoodieIOException("Failed to delete archived instant " + commitToArchive,
|
||||||
e);
|
e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user