1
0

Implement Savepoints and required metadata timeline - Part 2

This commit is contained in:
Prasanna Rajaperumal
2017-03-13 23:09:29 -07:00
parent 6f36e1eaaf
commit d83b671ada
7 changed files with 135 additions and 29 deletions

View File

@@ -21,6 +21,7 @@ import com.google.common.collect.Maps;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.io.IOException;
import java.util.ArrayList;
@@ -39,7 +41,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Cleaner is responsible for garbage collecting older files in a given partition path, such that
@@ -50,16 +54,16 @@ import java.util.stream.Collectors;
* <p>
* TODO: Should all cleaning be done based on {@link com.uber.hoodie.common.model.HoodieCommitMetadata}
*/
public class HoodieCleaner {
public class HoodieCleaner<T extends HoodieRecordPayload<T>> {
private static Logger logger = LogManager.getLogger(HoodieCleaner.class);
private final TableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
private HoodieTable hoodieTable;
private HoodieTable<T> hoodieTable;
private HoodieWriteConfig config;
private FileSystem fs;
public HoodieCleaner(HoodieTable hoodieTable, HoodieWriteConfig config) {
public HoodieCleaner(HoodieTable<T> hoodieTable, HoodieWriteConfig config) {
this.hoodieTable = hoodieTable;
this.fileSystemView = hoodieTable.getCompactedFileSystemView();
this.commitTimeline = hoodieTable.getCompletedCommitTimeline();
@@ -85,7 +89,9 @@ public class HoodieCleaner {
fileSystemView.getEveryVersionInPartition(partitionPath)
.collect(Collectors.toList());
List<String> deletePaths = new ArrayList<>();
List<String> savepoints = hoodieTable.getSavepoints();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
for (List<HoodieDataFile> versionsForFileId : fileVersions) {
int keepVersions = config.getCleanerFileVersionsRetained();
@@ -93,8 +99,8 @@ public class HoodieCleaner {
while (commitItr.hasNext() && keepVersions > 0) {
// Skip this most recent version
HoodieDataFile next = commitItr.next();
if(savepoints.contains(next.getCommitTime())) {
// do not clean datafiles that are savepointed
if(savepointedFiles.contains(next.getFileName())) {
// do not clean up a savepoint data file
continue;
}
keepVersions--;
@@ -134,7 +140,9 @@ public class HoodieCleaner {
"Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
List<String> deletePaths = new ArrayList<>();
List<String> savepoints = hoodieTable.getSavepoints();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(s -> hoodieTable.getSavepointedDataFiles(s)).collect(Collectors.toList());
// determine if we have enough commits, to start cleaning.
if (commitTimeline.countInstants() > commitsRetained) {
@@ -152,7 +160,7 @@ public class HoodieCleaner {
// i.e always spare the last commit.
for (HoodieDataFile afile : fileList) {
String fileCommitTime = afile.getCommitTime();
if(savepoints.contains(fileCommitTime)) {
if(savepointedFiles.contains(afile.getFileName())) {
// do not clean up a savepoint data file
continue;
}