From a1483f2c5f3b921d1117d31f75453e45e5717259 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Sat, 8 Jun 2019 12:40:08 -0700 Subject: [PATCH] HUDI-148 Small File selection logic for MOR must skip fileIds selected for pending compaction correctly --- .../java/com/uber/hoodie/CompactionAdminClient.java | 2 +- .../uber/hoodie/table/HoodieMergeOnReadTable.java | 7 ++++--- .../src/test/java/com/uber/hoodie/TestCleaner.java | 2 +- .../com/uber/hoodie/TestCompactionAdminClient.java | 4 ++-- .../hoodie/common/table/TableFileSystemView.java | 10 ++++++---- .../table/view/AbstractTableFileSystemView.java | 12 +++++++++--- .../table/view/PriorityBasedFileSystemView.java | 7 ++++--- .../table/view/RemoteHoodieTableFileSystemView.java | 9 +++++++-- .../table/view/HoodieTableFileSystemViewTest.java | 13 +++++++++---- .../timeline/service/FileSystemViewHandler.java | 3 ++- .../timeline/service/handlers/FileSliceHandler.java | 6 +++--- 11 files changed, 48 insertions(+), 27 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java index 34f7dc133..751e7dc79 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java @@ -442,7 +442,7 @@ public class CompactionAdminClient extends AbstractHoodieClient { .sorted(HoodieLogFile.getLogFileComparator()) .collect(Collectors.toList()); FileSlice fileSliceForCompaction = - fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime()) + fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime(), true) .filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get(); int maxUsedVersion = fileSliceForCompaction.getLogFiles().findFirst().map(HoodieLogFile::getLogVersion) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index bcc219015..589b7090e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -384,7 +384,7 @@ public class HoodieMergeOnReadTable extends // TODO : choose last N small files since there can be multiple small files written to a single partition // by different spark partitions in a single batch Optional smallFileSlice = getRTFileSystemView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter( + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false).filter( fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getDataFile().get().getFileSize() < config .getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) -> @@ -394,9 +394,10 @@ public class HoodieMergeOnReadTable extends allSmallFileSlices.add(smallFileSlice.get()); } } else { - // If we can index log files, we can add more inserts to log files. + // If we can index log files, we can add more inserts to log files for fileIds including those under + // pending compaction. List allFileSlices = getRTFileSystemView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()) + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(partitionPath, fileSlice)) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index afecd6987..7798805ba 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -865,7 +865,7 @@ public class TestCleaner extends TestHoodieClientBase { String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId); Optional fileSliceForCompaction = hoodieTable.getRTFileSystemView().getLatestFileSlicesBeforeOrOn(DEFAULT_FIRST_PARTITION_PATH, - baseInstantForCompaction).filter(fs -> fs.getFileId().equals(fileId)).findFirst(); + baseInstantForCompaction, true).filter(fs -> fs.getFileId().equals(fileId)).findFirst(); Assert.assertTrue("Base Instant for Compaction must be preserved", fileSliceForCompaction.isPresent()); Assert.assertTrue("FileSlice has data-file", fileSliceForCompaction.get().getDataFile().isPresent()); Assert.assertEquals("FileSlice has log-files", 2, diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java index 91f2d333b..4509db6b5 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCompactionAdminClient.java @@ -280,7 +280,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { final HoodieTableFileSystemView newFsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files - newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant) + newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> { Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent()); Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0); @@ -342,7 +342,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase { final HoodieTableFileSystemView newFsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files - newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant) + newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) .filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> { Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent()); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java index 01ff0a5fc..5605e3ae2 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java @@ -104,13 +104,15 @@ public interface TableFileSystemView { */ Stream getLatestUnCompactedFileSlices(String partitionPath); - /** - * Stream all the latest file slices in the given partition with precondition that - * commitTime(file) before maxCommitTime + * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime + * + * @param partitionPath Partition path + * @param maxCommitTime Max Instant Time + * @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction */ Stream getLatestFileSlicesBeforeOrOn(String partitionPath, - String maxCommitTime); + String maxCommitTime, boolean includeFileSlicesInPendingCompaction); /** * Stream all "merged" file-slices before on an instant time diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java index d9a5abe71..129a584c1 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java @@ -476,13 +476,19 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV } @Override - public final Stream getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime) { + public final Stream getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime, + boolean includeFileSlicesInPendingCompaction) { try { readLock.lock(); String partitionPath = formatPartitionKey(partitionStr); ensurePartitionLoadedCorrectly(partitionPath); - return fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime) - .map(fs -> filterDataFileAfterPendingCompaction(fs)); + Stream fileSliceStream = + fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime); + if (includeFileSlicesInPendingCompaction) { + return fileSliceStream.map(fs -> filterDataFileAfterPendingCompaction(fs)); + } else { + return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId())); + } } finally { readLock.unlock(); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java index 2e49ac526..601c5f20f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/PriorityBasedFileSystemView.java @@ -166,9 +166,10 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri } @Override - public Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) { - return execute(partitionPath, maxCommitTime, preferredView::getLatestFileSlicesBeforeOrOn, - secondaryView::getLatestFileSlicesBeforeOrOn); + public Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, + boolean includeFileSlicesInPendingCompaction) { + return execute(partitionPath, maxCommitTime, includeFileSlicesInPendingCompaction, + preferredView::getLatestFileSlicesBeforeOrOn, secondaryView::getLatestFileSlicesBeforeOrOn); } @Override diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java index fa7f36c5c..9d6972f79 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RemoteHoodieTableFileSystemView.java @@ -108,6 +108,8 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public static final String LAST_INSTANT_TS = "lastinstantts"; public static final String TIMELINE_HASH = "timelinehash"; public static final String REFRESH_OFF = "refreshoff"; + public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction"; + private static Logger log = LogManager.getLogger(RemoteHoodieTableFileSystemView.class); @@ -327,8 +329,11 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, } @Override - public Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) { - Map paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); + public Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, + boolean includeFileSlicesInPendingCompaction) { + Map paramsMap = getParamsWithAdditionalParams(partitionPath, + new String[]{MAX_INSTANT_PARAM, INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM}, + new String[]{maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction)}); try { List dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, new TypeReference>() { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java index 8d09c605d..4cae7a657 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java @@ -351,7 +351,7 @@ public class HoodieTableFileSystemViewTest { assertEquals("Log File Order check", fileName2, logFiles.get(2).getFileName()); assertEquals("Log File Order check", fileName1, logFiles.get(3).getFileName()); - fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5) + fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5, true) .collect(Collectors.toList()); assertEquals("Expect only one file-id", 1, fileSliceList.size()); fileSlice = fileSliceList.get(0); @@ -672,7 +672,7 @@ public class HoodieTableFileSystemViewTest { assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3))); filenames = Sets.newHashSet(); - List logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4) + List logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4, true) .map(slice -> slice.getLogFiles()).flatMap(logFileList -> logFileList) .collect(Collectors.toList()); assertEquals(logFilesList.size(), 4); @@ -706,7 +706,7 @@ public class HoodieTableFileSystemViewTest { } logFilesList = - rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3).map(slice -> slice.getLogFiles()) + rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3, true).map(slice -> slice.getLogFiles()) .flatMap(logFileList -> logFileList).collect(Collectors.toList()); assertEquals(logFilesList.size(), 1); assertTrue(logFilesList.get(0).getFileName() @@ -1135,7 +1135,7 @@ public class HoodieTableFileSystemViewTest { assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); assertEquals("Log File Order check", fileName1, logFiles.get(2).getFileName()); - fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5) + fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5, true) .collect(Collectors.toList()); assertEquals("Expect only one file-id", 1, fileSliceList.size()); fileSlice = fileSliceList.get(0); @@ -1147,6 +1147,11 @@ public class HoodieTableFileSystemViewTest { assertEquals("Log files must include only those after compaction request", 2, logFiles.size()); assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); + + // Check getLatestFileSlicesBeforeOrOn excluding fileIds in pending compaction + fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5, false) + .collect(Collectors.toList()); + assertEquals("Expect empty list as file-id is in pending compaction", 0, fileSliceList.size()); }); Assert.assertEquals(3, fsView.getPendingCompactionOperations().count()); diff --git a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java index b8426267a..483bc6f52 100644 --- a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java +++ b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/FileSystemViewHandler.java @@ -265,7 +265,8 @@ public class FileSystemViewHandler { List dtos = sliceHandler.getLatestFileSlicesBeforeOrOn(ctx.validatedQueryParam(BASEPATH_PARAM).getOrThrow(), ctx.validatedQueryParam(PARTITION_PARAM).getOrThrow(), - ctx.validatedQueryParam(MAX_INSTANT_PARAM).getOrThrow()); + ctx.validatedQueryParam(MAX_INSTANT_PARAM).getOrThrow(), + Boolean.valueOf(ctx.validatedQueryParam(INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM).getOrThrow())); writeValueAsString(ctx, dtos); }, true)); diff --git a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java index c2fbb0c6c..01aceae2b 100644 --- a/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java +++ b/hoodie-timeline-service/src/main/java/com/uber/hoodie/timeline/service/handlers/FileSliceHandler.java @@ -56,9 +56,9 @@ public class FileSliceHandler extends Handler { } public List getLatestFileSlicesBeforeOrOn(String basePath, String partitionPath, - String maxInstantTime) { - return viewManager.getFileSystemView(basePath).getLatestFileSlicesBeforeOrOn(partitionPath, maxInstantTime) - .map(FileSliceDTO::fromFileSlice).collect(Collectors.toList()); + String maxInstantTime, boolean includeFileSlicesInPendingCompaction) { + return viewManager.getFileSystemView(basePath).getLatestFileSlicesBeforeOrOn(partitionPath, maxInstantTime, + includeFileSlicesInPendingCompaction).map(FileSliceDTO::fromFileSlice).collect(Collectors.toList()); } public List getLatestUnCompactedFileSlices(String basePath, String partitionPath) {