diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java index a5a5129cb..81530eb7a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCleanHelper.java @@ -16,6 +16,7 @@ package com.uber.hoodie.io; +import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieCleaningPolicy; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; @@ -23,9 +24,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; -import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.fs.FileSystem; @@ -89,29 +88,28 @@ public class HoodieCleanHelper> { for (HoodieFileGroup fileGroup : fileGroups) { int keepVersions = config.getCleanerFileVersionsRetained(); - Iterator commitItr = fileGroup.getAllDataFiles().iterator(); - while (commitItr.hasNext() && keepVersions > 0) { + Iterator fileSliceIterator = fileGroup.getAllFileSlices().iterator(); + while (fileSliceIterator.hasNext() && keepVersions > 0) { // Skip this most recent version - HoodieDataFile next = commitItr.next(); - if(savepointedFiles.contains(next.getFileName())) { + FileSlice nextSlice = fileSliceIterator.next(); + HoodieDataFile dataFile = nextSlice.getDataFile().get(); + if(savepointedFiles.contains(dataFile.getFileName())) { // do not clean up a savepoint data file continue; } keepVersions--; } // Delete the remaining files - while (commitItr.hasNext()) { - HoodieDataFile nextRecord = commitItr.next(); - deletePaths.add(nextRecord.getFileStatus().getPath().toString()); + while (fileSliceIterator.hasNext()) { + FileSlice nextSlice = fileSliceIterator.next(); + HoodieDataFile dataFile = nextSlice.getDataFile().get(); + deletePaths.add(dataFile.getFileStatus().getPath().toString()); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - // todo: fix below for MERGE_ON_READ - deletePaths.add(String - .format("%s/%s/%s", config.getBasePath(), partitionPath, - FSUtils.maskWithoutLogVersion(nextRecord.getCommitTime(), - nextRecord.getFileId(), - HoodieLogFile.DELTA_EXTENSION))); + deletePaths.addAll(nextSlice.getLogFiles() + .map(file -> file.getPath().toString()) + .collect(Collectors.toList())); } } } @@ -155,16 +153,18 @@ public class HoodieCleanHelper> { fileSystemView.getAllFileGroups(partitionPath) .collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { - List fileList = fileGroup.getAllDataFiles().collect(Collectors.toList()); - String lastVersion = FSUtils.getCommitTime(fileList.get(0).getFileName()); + List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); + HoodieDataFile dataFile = fileSliceList.get(0).getDataFile().get(); + String lastVersion = dataFile.getCommitTime(); String lastVersionBeforeEarliestCommitToRetain = - getLatestVersionBeforeCommit(fileList, earliestCommitToRetain); + getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); // Ensure there are more than 1 version of the file (we only clean old files from updates) // i.e always spare the last commit. - for (HoodieDataFile afile : fileList) { - String fileCommitTime = afile.getCommitTime(); - if(savepointedFiles.contains(afile.getFileName())) { + for (FileSlice aSlice : fileSliceList) { + HoodieDataFile aFile = aSlice.getDataFile().get(); + String fileCommitTime = aFile.getCommitTime(); + if(savepointedFiles.contains(aFile.getFileName())) { // do not clean up a savepoint data file continue; } @@ -184,15 +184,13 @@ public class HoodieCleanHelper> { fileCommitTime, HoodieTimeline.GREATER)) { // this is a commit, that should be cleaned. - deletePaths.add(afile.getFileStatus().getPath().toString()); + deletePaths.add(aFile.getFileStatus().getPath().toString()); if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well - // todo: fix below for MERGE_ON_READ - deletePaths.add(String - .format("%s/%s/%s", config.getBasePath(), partitionPath, - FSUtils.maskWithoutLogVersion(fileCommitTime, afile.getFileId(), - HoodieLogFile.DELTA_EXTENSION))); + deletePaths.addAll(aSlice.getLogFiles() + .map(file -> file.getPath().toString()) + .collect(Collectors.toList())); } } } @@ -205,10 +203,10 @@ public class HoodieCleanHelper> { /** * Gets the latest version < commitTime. This version file could still be used by queries. */ - private String getLatestVersionBeforeCommit(List fileList, + private String getLatestVersionBeforeCommit(List fileSliceList, HoodieInstant commitTime) { - for (HoodieDataFile file : fileList) { - String fileCommitTime = FSUtils.getCommitTime(file.getFileName()); + for (FileSlice file : fileSliceList) { + String fileCommitTime = file.getDataFile().get().getCommitTime(); if (HoodieTimeline.compareTimestamps(commitTime.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) { // fileList is sorted on the reverse, so the first commit we find <= commitTime is the one we want diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index c70516305..ef013622e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -17,7 +17,6 @@ package com.uber.hoodie; import com.google.common.collect.Iterables; - import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; @@ -28,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -43,9 +43,6 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; - -import java.util.Collection; -import java.util.Map; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; @@ -61,21 +58,23 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import scala.collection.Iterator; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; -import scala.collection.Iterator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1134,6 +1133,42 @@ public class TestHoodieClient implements Serializable { assertTrue(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "002", file3P0C2)); } + @Test + public void testKeepLatestFileVersionsMOR() throws IOException { + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) + .retainFileVersions(1).build()).build(); + + + HoodieTableMetaClient metaClient = HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); + + // Make 3 files, one base file and 2 log files associated with base file + String file1P0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000"); + String file2P0L0 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.empty()); + String file2P0L1 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "000", file1P0, Optional.of(2)); + // make 1 compaction commit + HoodieTestUtils.createCompactionCommitFiles(basePath, "000"); + + // Make 4 files, one base file and 3 log files associated with base file + HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "001", file1P0); + file2P0L0 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.empty()); + file2P0L0 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(2)); + file2P0L0 = HoodieTestUtils.createNewLogFile(basePath, partitionPaths[0], "001", file1P0, Optional.of(3)); + // make 1 compaction commit + HoodieTestUtils.createCompactionCommitFiles(basePath, "001"); + + HoodieTable table = HoodieTable + .getHoodieTable(metaClient, config); + List hoodieCleanStats = table.clean(jsc); + assertEquals("Must clean three files, one parquet and 2 log files" , 3, getCleanStat(hoodieCleanStats, partitionPaths[0]).getSuccessDeleteFiles().size()); + assertFalse(HoodieTestUtils.doesDataFileExist(basePath, partitionPaths[0], "000", file1P0)); + assertFalse(HoodieTestUtils.doesLogFileExist(basePath, partitionPaths[0], "000", file2P0L0, Optional.empty())); + assertFalse(HoodieTestUtils.doesLogFileExist(basePath, partitionPaths[0], "000", file2P0L0, Optional.of(2))); + } + @Test public void testKeepLatestCommits() throws IOException { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 6a5ec6286..18ec2b3fb 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -18,13 +18,11 @@ package com.uber.hoodie.common.util; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - -import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.model.HoodiePartitionMetadata; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.InvalidHoodiePathException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -54,6 +52,7 @@ public class FSUtils { private static final Logger LOG = LogManager.getLogger(FSUtils.class); // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1 private static final Pattern LOG_FILE_PATTERN = Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)"); + private static final String LOG_FILE_PREFIX = "."; private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final long MIN_CLEAN_TO_KEEP = 10; private static final long MIN_ROLLBACK_TO_KEEP = 10; @@ -228,11 +227,11 @@ public class FSUtils { public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version) { - return "." + String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version); + return LOG_FILE_PREFIX + String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version); } public static String maskWithoutLogVersion(String commitTime, String fileId, String logFileExtension) { - return String.format("%s_%s%s*", fileId, commitTime, logFileExtension); + return LOG_FILE_PREFIX + String.format("%s_%s%s*", fileId, commitTime, logFileExtension); } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index f1380e07e..f3ed87476 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.util.StringUtils; import org.junit.rules.TemporaryFolder; import java.io.ByteArrayInputStream; @@ -125,10 +126,36 @@ public class HoodieTestUtils { return fileID; } + public static final String createNewLogFile(String basePath, String partitionPath, String commitTime, String fileID, Optional version) throws IOException { + String folderPath = basePath + "/" + partitionPath + "/"; + boolean makeDir = fs.mkdirs(new Path(folderPath)); + if(!makeDir) { + throw new IOException("cannot create directory for path " + folderPath); + } + boolean createFile = fs.createNewFile(new Path(folderPath + FSUtils.makeLogFileName(fileID, ".log",commitTime, version.orElse(DEFAULT_TASK_PARTITIONID)))); + if(!createFile) { + throw new IOException(StringUtils.format("cannot create data file for commit %s and fileId %s", commitTime, fileID)); + } + return fileID; + } + + public static final void createCompactionCommitFiles(String basePath, String... commitTimes) throws IOException { + for (String commitTime: commitTimes) { + boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME+ "/" + HoodieTimeline.makeCompactionFileName(commitTime))); + if(!createFile) { + throw new IOException("cannot create commit file for commit " + commitTime); + } + } + } + public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID); } + public static final String getLogFilePath(String basePath, String partitionPath, String commitTime, String fileID, Optional version) throws IOException { + return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", commitTime, version.orElse(DEFAULT_TASK_PARTITIONID)); + } + public static final String getCommitFilePath(String basePath, String commitTime) throws IOException { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION; } @@ -137,6 +164,10 @@ public class HoodieTestUtils { return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists(); } + public static final boolean doesLogFileExist(String basePath, String partitionPath, String commitTime, String fileID, Optional version) throws IOException { + return new File(getLogFilePath(basePath, partitionPath, commitTime, fileID, version)).exists(); + } + public static final boolean doesCommitExist(String basePath, String commitTime) { return new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME+ "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION).exists(); }