1
0

fix for cleaning log files(mor)

This commit is contained in:
Nishith Agarwal
2017-06-29 00:32:11 -07:00
committed by prazanna
parent 19c22b231e
commit 0b26b60a5c
4 changed files with 103 additions and 40 deletions

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.io; package com.uber.hoodie.io;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroup;
@@ -23,9 +24,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -89,29 +88,28 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
for (HoodieFileGroup fileGroup : fileGroups) { for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained(); int keepVersions = config.getCleanerFileVersionsRetained();
Iterator<HoodieDataFile> commitItr = fileGroup.getAllDataFiles().iterator(); Iterator<FileSlice> fileSliceIterator = fileGroup.getAllFileSlices().iterator();
while (commitItr.hasNext() && keepVersions > 0) { while (fileSliceIterator.hasNext() && keepVersions > 0) {
// Skip this most recent version // Skip this most recent version
HoodieDataFile next = commitItr.next(); FileSlice nextSlice = fileSliceIterator.next();
if(savepointedFiles.contains(next.getFileName())) { HoodieDataFile dataFile = nextSlice.getDataFile().get();
if(savepointedFiles.contains(dataFile.getFileName())) {
// do not clean up a savepoint data file // do not clean up a savepoint data file
continue; continue;
} }
keepVersions--; keepVersions--;
} }
// Delete the remaining files // Delete the remaining files
while (commitItr.hasNext()) { while (fileSliceIterator.hasNext()) {
HoodieDataFile nextRecord = commitItr.next(); FileSlice nextSlice = fileSliceIterator.next();
deletePaths.add(nextRecord.getFileStatus().getPath().toString()); HoodieDataFile dataFile = nextSlice.getDataFile().get();
deletePaths.add(dataFile.getFileStatus().getPath().toString());
if (hoodieTable.getMetaClient().getTableType() if (hoodieTable.getMetaClient().getTableType()
== HoodieTableType.MERGE_ON_READ) { == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well // If merge on read, then clean the log files for the commits as well
// todo: fix below for MERGE_ON_READ deletePaths.addAll(nextSlice.getLogFiles()
deletePaths.add(String .map(file -> file.getPath().toString())
.format("%s/%s/%s", config.getBasePath(), partitionPath, .collect(Collectors.toList()));
FSUtils.maskWithoutLogVersion(nextRecord.getCommitTime(),
nextRecord.getFileId(),
HoodieLogFile.DELTA_EXTENSION)));
} }
} }
} }
@@ -155,16 +153,18 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
fileSystemView.getAllFileGroups(partitionPath) fileSystemView.getAllFileGroups(partitionPath)
.collect(Collectors.toList()); .collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) { for (HoodieFileGroup fileGroup : fileGroups) {
List<HoodieDataFile> fileList = fileGroup.getAllDataFiles().collect(Collectors.toList()); List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
String lastVersion = FSUtils.getCommitTime(fileList.get(0).getFileName()); HoodieDataFile dataFile = fileSliceList.get(0).getDataFile().get();
String lastVersion = dataFile.getCommitTime();
String lastVersionBeforeEarliestCommitToRetain = String lastVersionBeforeEarliestCommitToRetain =
getLatestVersionBeforeCommit(fileList, earliestCommitToRetain); getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
// Ensure there are more than 1 version of the file (we only clean old files from updates) // Ensure there are more than 1 version of the file (we only clean old files from updates)
// i.e always spare the last commit. // i.e always spare the last commit.
for (HoodieDataFile afile : fileList) { for (FileSlice aSlice : fileSliceList) {
String fileCommitTime = afile.getCommitTime(); HoodieDataFile aFile = aSlice.getDataFile().get();
if(savepointedFiles.contains(afile.getFileName())) { String fileCommitTime = aFile.getCommitTime();
if(savepointedFiles.contains(aFile.getFileName())) {
// do not clean up a savepoint data file // do not clean up a savepoint data file
continue; continue;
} }
@@ -184,15 +184,13 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
fileCommitTime, fileCommitTime,
HoodieTimeline.GREATER)) { HoodieTimeline.GREATER)) {
// this is a commit, that should be cleaned. // this is a commit, that should be cleaned.
deletePaths.add(afile.getFileStatus().getPath().toString()); deletePaths.add(aFile.getFileStatus().getPath().toString());
if (hoodieTable.getMetaClient().getTableType() if (hoodieTable.getMetaClient().getTableType()
== HoodieTableType.MERGE_ON_READ) { == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well // If merge on read, then clean the log files for the commits as well
// todo: fix below for MERGE_ON_READ deletePaths.addAll(aSlice.getLogFiles()
deletePaths.add(String .map(file -> file.getPath().toString())
.format("%s/%s/%s", config.getBasePath(), partitionPath, .collect(Collectors.toList()));
FSUtils.maskWithoutLogVersion(fileCommitTime, afile.getFileId(),
HoodieLogFile.DELTA_EXTENSION)));
} }
} }
} }
@@ -205,10 +203,10 @@ public class HoodieCleanHelper<T extends HoodieRecordPayload<T>> {
/** /**
* Gets the latest version < commitTime. This version file could still be used by queries. * Gets the latest version < commitTime. This version file could still be used by queries.
*/ */
private String getLatestVersionBeforeCommit(List<HoodieDataFile> fileList, private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList,
HoodieInstant commitTime) { HoodieInstant commitTime) {
for (HoodieDataFile file : fileList) { for (FileSlice file : fileSliceList) {
String fileCommitTime = FSUtils.getCommitTime(file.getFileName()); String fileCommitTime = file.getDataFile().get().getCommitTime();
if (HoodieTimeline.compareTimestamps(commitTime.getTimestamp(), fileCommitTime, if (HoodieTimeline.compareTimestamps(commitTime.getTimestamp(), fileCommitTime,
HoodieTimeline.GREATER)) { HoodieTimeline.GREATER)) {
// fileList is sorted on the reverse, so the first commit we find <= commitTime is the one we want // fileList is sorted on the reverse, so the first commit we find <= commitTime is the one we want

View File

@@ -17,7 +17,6 @@
package com.uber.hoodie; package com.uber.hoodie;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.HoodieTestDataGenerator;
@@ -28,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieFileGroup;
import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -43,9 +43,6 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.util.Collection;
import java.util.Map;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -61,21 +58,23 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import scala.collection.Iterator;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import scala.collection.Iterator;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@@ -1134,6 +1133,42 @@ public class TestHoodieClient implements Serializable {
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2));
} }
@Test
public void testKeepLatestFileVersionsMOR() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
.retainFileVersions(1).build()).build();
HoodieTableMetaClient metaClient = HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ);
// Make 3 files, one base file and 2 log files associated with base file
String file1P0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000");
String file2P0L0 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.empty());
String file2P0L1 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.of(2));
// make 1 compaction commit
HoodieTestUtils.createCompactionCommitFiles(basePath, "000");
// Make 4 files, one base file and 3 log files associated with base file
HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0);
file2P0L0 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.empty());
file2P0L0 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(2));
file2P0L0 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(3));
// make 1 compaction commit
HoodieTestUtils.createCompactionCommitFiles(basePath, "001");
HoodieTable table = HoodieTable
.getHoodieTable(metaClient, config);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
assertEquals("Must clean three files, one parquet and 2 log files" , 3, getCleanStat(hoodieCleanStats, partitionPaths[0]).getSuccessDeleteFiles().size());
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0));
assertFalse(HoodieTestUtils.doesLogFileExist(basePath, partitionPaths[0], "000", file2P0L0, Optional.empty()));
assertFalse(HoodieTestUtils.doesLogFileExist(basePath, partitionPaths[0], "000", file2P0L0, Optional.of(2)));
}
@Test @Test
public void testKeepLatestCommits() throws IOException { public void testKeepLatestCommits() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)

View File

@@ -18,13 +18,11 @@ package com.uber.hoodie.common.util;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.InvalidHoodiePathException; import com.uber.hoodie.exception.InvalidHoodiePathException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +52,7 @@ public class FSUtils {
private static final Logger LOG = LogManager.getLogger(FSUtils.class); private static final Logger LOG = LogManager.getLogger(FSUtils.class);
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1 // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
private static final Pattern LOG_FILE_PATTERN = Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)"); private static final Pattern LOG_FILE_PATTERN = Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)");
private static final String LOG_FILE_PREFIX = ".";
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final long MIN_CLEAN_TO_KEEP = 10; private static final long MIN_CLEAN_TO_KEEP = 10;
private static final long MIN_ROLLBACK_TO_KEEP = 10; private static final long MIN_ROLLBACK_TO_KEEP = 10;
@@ -228,11 +227,11 @@ public class FSUtils {
public static String makeLogFileName(String fileId, String logFileExtension, public static String makeLogFileName(String fileId, String logFileExtension,
String baseCommitTime, int version) { String baseCommitTime, int version) {
return "." + String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version); return LOG_FILE_PREFIX + String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
} }
public static String maskWithoutLogVersion(String commitTime, String fileId, String logFileExtension) { public static String maskWithoutLogVersion(String commitTime, String fileId, String logFileExtension) {
return String.format("%s_%s%s*", fileId, commitTime, logFileExtension); return LOG_FILE_PREFIX + String.format("%s_%s%s*", fileId, commitTime, logFileExtension);
} }

View File

@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.StringUtils;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@@ -125,10 +126,36 @@ public class HoodieTestUtils {
return fileID; return fileID;
} }
public static final String createNewLogFile(String basePath, String partitionPath, String commitTime, String fileID, Optional<Integer> version) throws IOException {
String folderPath = basePath + "/" + partitionPath + "/";
boolean makeDir = fs.mkdirs(new Path(folderPath));
if(!makeDir) {
throw new IOException("cannot create directory for path " + folderPath);
}
boolean createFile = fs.createNewFile(new Path(folderPath + FSUtils.makeLogFileName(fileID, ".log",commitTime, version.orElse(DEFAULT_TASK_PARTITIONID))));
if(!createFile) {
throw new IOException(StringUtils.format("cannot create data file for commit %s and fileId %s", commitTime, fileID));
}
return fileID;
}
public static final void createCompactionCommitFiles(String basePath, String... commitTimes) throws IOException {
for (String commitTime: commitTimes) {
boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME+ "/" + HoodieTimeline.makeCompactionFileName(commitTime)));
if(!createFile) {
throw new IOException("cannot create commit file for commit " + commitTime);
}
}
}
public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) throws IOException {
return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID); return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID);
} }
public static final String getLogFilePath(String basePath, String partitionPath, String commitTime, String fileID, Optional<Integer> version) throws IOException {
return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", commitTime, version.orElse(DEFAULT_TASK_PARTITIONID));
}
public static final String getCommitFilePath(String basePath, String commitTime) throws IOException { public static final String getCommitFilePath(String basePath, String commitTime) throws IOException {
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION; return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION;
} }
@@ -137,6 +164,10 @@ public class HoodieTestUtils {
return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists(); return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists();
} }
public static final boolean doesLogFileExist(String basePath, String partitionPath, String commitTime, String fileID, Optional<Integer> version) throws IOException {
return new File(getLogFilePath(basePath, partitionPath, commitTime, fileID, version)).exists();
}
public static final boolean doesCommitExist(String basePath, String commitTime) { public static final boolean doesCommitExist(String basePath, String commitTime) {
return new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME+ "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION).exists(); return new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME+ "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION).exists();
} }