[HUDI-349]: Added new cleaning policy based on number of hours (#3646)
This commit is contained in:
@@ -83,6 +83,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
||||
.withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
|
||||
+ "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");
|
||||
|
||||
public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key("hoodie.cleaner.hours.retained")
|
||||
.defaultValue("24")
|
||||
.withDocumentation("Number of hours for which commits need to be retained. This config provides a more flexible option as"
|
||||
+ "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
|
||||
+ " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
|
||||
|
||||
public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
|
||||
.key("hoodie.cleaner.policy")
|
||||
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
|
||||
@@ -585,6 +591,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder cleanerNumHoursRetained(int cleanerHoursRetained) {
|
||||
compactionConfig.setValue(CLEANER_HOURS_RETAINED, String.valueOf(cleanerHoursRetained));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
|
||||
compactionConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
|
||||
compactionConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
|
||||
|
||||
@@ -1085,6 +1085,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return getInt(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED);
|
||||
}
|
||||
|
||||
public int getCleanerHoursRetained() {
|
||||
return getInt(HoodieCompactionConfig.CLEANER_HOURS_RETAINED);
|
||||
}
|
||||
|
||||
public int getMaxCommitsToKeep() {
|
||||
return getInt(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP);
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
@@ -50,8 +51,12 @@ import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -123,6 +128,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
|
||||
switch (config.getCleanerPolicy()) {
|
||||
case KEEP_LATEST_COMMITS:
|
||||
case KEEP_LATEST_BY_HOURS:
|
||||
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
|
||||
case KEEP_LATEST_FILE_VERSIONS:
|
||||
return getPartitionPathsForFullCleaning();
|
||||
@@ -251,6 +257,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
return deletePaths;
|
||||
}
|
||||
|
||||
private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath) {
|
||||
return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Selects the versions for file for cleaning, such that it
|
||||
* <p>
|
||||
@@ -265,8 +275,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
* <p>
|
||||
* This policy is the default.
|
||||
*/
|
||||
private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath) {
|
||||
int commitsRetained = config.getCleanerCommitsRetained();
|
||||
private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
|
||||
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
|
||||
List<CleanFileInfo> deletePaths = new ArrayList<>();
|
||||
|
||||
@@ -303,14 +312,24 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
// do not clean up a savepoint data file
|
||||
continue;
|
||||
}
|
||||
// Dont delete the latest commit and also the last commit before the earliest commit we
|
||||
// are retaining
|
||||
// The window of commit retain == max query run time. So a query could be running which
|
||||
// still
|
||||
// uses this file.
|
||||
if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
|
||||
// move on to the next file
|
||||
continue;
|
||||
|
||||
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
|
||||
// Dont delete the latest commit and also the last commit before the earliest commit we
|
||||
// are retaining
|
||||
// The window of commit retain == max query run time. So a query could be running which
|
||||
// still
|
||||
// uses this file.
|
||||
if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
|
||||
// move on to the next file
|
||||
continue;
|
||||
}
|
||||
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
|
||||
// This block corresponds to KEEP_LATEST_BY_HOURS policy
|
||||
// Do not delete the latest commit.
|
||||
if (fileCommitTime.equals(lastVersion)) {
|
||||
// move on to the next file
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Always keep the last commit
|
||||
@@ -334,6 +353,18 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
}
|
||||
return deletePaths;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method finds the files to be cleaned based on the number of hours. If {@code config.getCleanerHoursRetained()} is set to 5,
|
||||
* all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained.
|
||||
* This policy gives much more flexibility to users for retaining data for running incremental queries as compared to
|
||||
* KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5.
|
||||
* @param partitionPath partition path to check
|
||||
* @return list of files to clean
|
||||
*/
|
||||
private List<CleanFileInfo> getFilesToCleanKeepingLatestHours(String partitionPath) {
|
||||
return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
|
||||
}
|
||||
|
||||
private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
|
||||
final Stream<HoodieFileGroup> replacedGroups;
|
||||
@@ -392,6 +423,8 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
|
||||
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
|
||||
deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
|
||||
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
|
||||
deletePaths = getFilesToCleanKeepingLatestHours(partitionPath);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
|
||||
}
|
||||
@@ -406,9 +439,16 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
|
||||
public Option<HoodieInstant> getEarliestCommitToRetain() {
|
||||
Option<HoodieInstant> earliestCommitToRetain = Option.empty();
|
||||
int commitsRetained = config.getCleanerCommitsRetained();
|
||||
int hoursRetained = config.getCleanerHoursRetained();
|
||||
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
|
||||
&& commitTimeline.countInstants() > commitsRetained) {
|
||||
earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
|
||||
earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
|
||||
} else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
|
||||
Instant instant = Instant.now();
|
||||
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
|
||||
String earliestTimeToRetain = HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(hoursRetained).toInstant()));
|
||||
earliestCommitToRetain = Option.fromJavaOptional(commitTimeline.getInstants().filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(),
|
||||
HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
|
||||
}
|
||||
return earliestCommitToRetain;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user