1
0

HUDI-148 Small File selection logic for MOR must skip fileIds selected for pending compaction correctly

This commit is contained in:
Balaji Varadarajan
2019-06-08 12:40:08 -07:00
committed by vinoth chandar
parent 8c9980f4f5
commit a1483f2c5f
11 changed files with 48 additions and 27 deletions

View File

@@ -442,7 +442,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
.sorted(HoodieLogFile.getLogFileComparator()) .sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList()); .collect(Collectors.toList());
FileSlice fileSliceForCompaction = FileSlice fileSliceForCompaction =
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime()) fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime(), true)
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get(); .filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
int maxUsedVersion = int maxUsedVersion =
fileSliceForCompaction.getLogFiles().findFirst().map(HoodieLogFile::getLogVersion) fileSliceForCompaction.getLogFiles().findFirst().map(HoodieLogFile::getLogVersion)

View File

@@ -384,7 +384,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
// TODO : choose last N small files since there can be multiple small files written to a single partition // TODO : choose last N small files since there can be multiple small files written to a single partition
// by different spark partitions in a single batch // by different spark partitions in a single batch
Optional<FileSlice> smallFileSlice = getRTFileSystemView() Optional<FileSlice> smallFileSlice = getRTFileSystemView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter( .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false).filter(
fileSlice -> fileSlice.getLogFiles().count() < 1 fileSlice -> fileSlice.getLogFiles().count() < 1
&& fileSlice.getDataFile().get().getFileSize() < config && fileSlice.getDataFile().get().getFileSize() < config
.getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) -> .getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) ->
@@ -394,9 +394,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
allSmallFileSlices.add(smallFileSlice.get()); allSmallFileSlices.add(smallFileSlice.get());
} }
} else { } else {
// If we can index log files, we can add more inserts to log files. // If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction.
List<FileSlice> allFileSlices = getRTFileSystemView() List<FileSlice> allFileSlices = getRTFileSystemView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()) .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList()); .collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) { for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(partitionPath, fileSlice)) { if (isSmallFile(partitionPath, fileSlice)) {

View File

@@ -865,7 +865,7 @@ public class TestCleaner extends TestHoodieClientBase {
String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId); String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId);
Optional<FileSlice> fileSliceForCompaction = Optional<FileSlice> fileSliceForCompaction =
hoodieTable.getRTFileSystemView().getLatestFileSlicesBeforeOrOn(DEFAULT_FIRST_PARTITION_PATH, hoodieTable.getRTFileSystemView().getLatestFileSlicesBeforeOrOn(DEFAULT_FIRST_PARTITION_PATH,
baseInstantForCompaction).filter(fs -> fs.getFileId().equals(fileId)).findFirst(); baseInstantForCompaction, true).filter(fs -> fs.getFileId().equals(fileId)).findFirst();
Assert.assertTrue("Base Instant for Compaction must be preserved", fileSliceForCompaction.isPresent()); Assert.assertTrue("Base Instant for Compaction must be preserved", fileSliceForCompaction.isPresent());
Assert.assertTrue("FileSlice has data-file", fileSliceForCompaction.get().getDataFile().isPresent()); Assert.assertTrue("FileSlice has data-file", fileSliceForCompaction.get().getDataFile().isPresent());
Assert.assertEquals("FileSlice has log-files", 2, Assert.assertEquals("FileSlice has log-files", 2,

View File

@@ -280,7 +280,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
final HoodieTableFileSystemView newFsView = final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant) newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> { .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent()); Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent());
Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0); Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0);
@@ -342,7 +342,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
final HoodieTableFileSystemView newFsView = final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files // Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant) newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)) .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> { .filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> {
Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent()); Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent());

View File

@@ -104,13 +104,15 @@ public interface TableFileSystemView {
*/ */
Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath); Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath);
/** /**
* Stream all the latest file slices in the given partition with precondition that * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime
* commitTime(file) before maxCommitTime *
* @param partitionPath Partition path
* @param maxCommitTime Max Instant Time
* @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
*/ */
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime); String maxCommitTime, boolean includeFileSlicesInPendingCompaction);
/** /**
* Stream all "merged" file-slices before on an instant time * Stream all "merged" file-slices before on an instant time

View File

@@ -476,13 +476,19 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
@Override @Override
public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime) { public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime,
boolean includeFileSlicesInPendingCompaction) {
try { try {
readLock.lock(); readLock.lock();
String partitionPath = formatPartitionKey(partitionStr); String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath); ensurePartitionLoadedCorrectly(partitionPath);
return fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime) Stream<FileSlice> fileSliceStream =
.map(fs -> filterDataFileAfterPendingCompaction(fs)); fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime);
if (includeFileSlicesInPendingCompaction) {
return fileSliceStream.map(fs -> filterDataFileAfterPendingCompaction(fs));
} else {
return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId()));
}
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@@ -166,9 +166,10 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
} }
@Override @Override
public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) { public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
return execute(partitionPath, maxCommitTime, preferredView::getLatestFileSlicesBeforeOrOn, boolean includeFileSlicesInPendingCompaction) {
secondaryView::getLatestFileSlicesBeforeOrOn); return execute(partitionPath, maxCommitTime, includeFileSlicesInPendingCompaction,
preferredView::getLatestFileSlicesBeforeOrOn, secondaryView::getLatestFileSlicesBeforeOrOn);
} }
@Override @Override

View File

@@ -108,6 +108,8 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String LAST_INSTANT_TS = "lastinstantts"; public static final String LAST_INSTANT_TS = "lastinstantts";
public static final String TIMELINE_HASH = "timelinehash"; public static final String TIMELINE_HASH = "timelinehash";
public static final String REFRESH_OFF = "refreshoff"; public static final String REFRESH_OFF = "refreshoff";
public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
private static Logger log = LogManager.getLogger(RemoteHoodieTableFileSystemView.class); private static Logger log = LogManager.getLogger(RemoteHoodieTableFileSystemView.class);
@@ -327,8 +329,11 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
} }
@Override @Override
public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) { public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); boolean includeFileSlicesInPendingCompaction) {
Map<String, String> paramsMap = getParamsWithAdditionalParams(partitionPath,
new String[]{MAX_INSTANT_PARAM, INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM},
new String[]{maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction)});
try { try {
List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
new TypeReference<List<FileSliceDTO>>() { new TypeReference<List<FileSliceDTO>>() {

View File

@@ -351,7 +351,7 @@ public class HoodieTableFileSystemViewTest {
assertEquals("Log File Order check", fileName2, logFiles.get(2).getFileName()); assertEquals("Log File Order check", fileName2, logFiles.get(2).getFileName());
assertEquals("Log File Order check", fileName1, logFiles.get(3).getFileName()); assertEquals("Log File Order check", fileName1, logFiles.get(3).getFileName());
fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5) fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5, true)
.collect(Collectors.toList()); .collect(Collectors.toList());
assertEquals("Expect only one file-id", 1, fileSliceList.size()); assertEquals("Expect only one file-id", 1, fileSliceList.size());
fileSlice = fileSliceList.get(0); fileSlice = fileSliceList.get(0);
@@ -672,7 +672,7 @@ public class HoodieTableFileSystemViewTest {
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)));
filenames = Sets.newHashSet(); filenames = Sets.newHashSet();
List<HoodieLogFile> logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4) List<HoodieLogFile> logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4, true)
.map(slice -> slice.getLogFiles()).flatMap(logFileList -> logFileList) .map(slice -> slice.getLogFiles()).flatMap(logFileList -> logFileList)
.collect(Collectors.toList()); .collect(Collectors.toList());
assertEquals(logFilesList.size(), 4); assertEquals(logFilesList.size(), 4);
@@ -706,7 +706,7 @@ public class HoodieTableFileSystemViewTest {
} }
logFilesList = logFilesList =
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3).map(slice -> slice.getLogFiles()) rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3, true).map(slice -> slice.getLogFiles())
.flatMap(logFileList -> logFileList).collect(Collectors.toList()); .flatMap(logFileList -> logFileList).collect(Collectors.toList());
assertEquals(logFilesList.size(), 1); assertEquals(logFilesList.size(), 1);
assertTrue(logFilesList.get(0).getFileName() assertTrue(logFilesList.get(0).getFileName()
@@ -1135,7 +1135,7 @@ public class HoodieTableFileSystemViewTest {
assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
assertEquals("Log File Order check", fileName1, logFiles.get(2).getFileName()); assertEquals("Log File Order check", fileName1, logFiles.get(2).getFileName());
fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5) fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5, true)
.collect(Collectors.toList()); .collect(Collectors.toList());
assertEquals("Expect only one file-id", 1, fileSliceList.size()); assertEquals("Expect only one file-id", 1, fileSliceList.size());
fileSlice = fileSliceList.get(0); fileSlice = fileSliceList.get(0);
@@ -1147,6 +1147,11 @@ public class HoodieTableFileSystemViewTest {
assertEquals("Log files must include only those after compaction request", 2, logFiles.size()); assertEquals("Log files must include only those after compaction request", 2, logFiles.size());
assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
// Check getLatestFileSlicesBeforeOrOn excluding fileIds in pending compaction
fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5, false)
.collect(Collectors.toList());
assertEquals("Expect empty list as file-id is in pending compaction", 0, fileSliceList.size());
}); });
Assert.assertEquals(3, fsView.getPendingCompactionOperations().count()); Assert.assertEquals(3, fsView.getPendingCompactionOperations().count());

View File

@@ -265,7 +265,8 @@ public class FileSystemViewHandler {
List<FileSliceDTO> dtos = List<FileSliceDTO> dtos =
sliceHandler.getLatestFileSlicesBeforeOrOn(ctx.validatedQueryParam(BASEPATH_PARAM).getOrThrow(), sliceHandler.getLatestFileSlicesBeforeOrOn(ctx.validatedQueryParam(BASEPATH_PARAM).getOrThrow(),
ctx.validatedQueryParam(PARTITION_PARAM).getOrThrow(), ctx.validatedQueryParam(PARTITION_PARAM).getOrThrow(),
ctx.validatedQueryParam(MAX_INSTANT_PARAM).getOrThrow()); ctx.validatedQueryParam(MAX_INSTANT_PARAM).getOrThrow(),
Boolean.valueOf(ctx.validatedQueryParam(INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM).getOrThrow()));
writeValueAsString(ctx, dtos); writeValueAsString(ctx, dtos);
}, true)); }, true));

View File

@@ -56,9 +56,9 @@ public class FileSliceHandler extends Handler {
} }
public List<FileSliceDTO> getLatestFileSlicesBeforeOrOn(String basePath, String partitionPath, public List<FileSliceDTO> getLatestFileSlicesBeforeOrOn(String basePath, String partitionPath,
String maxInstantTime) { String maxInstantTime, boolean includeFileSlicesInPendingCompaction) {
return viewManager.getFileSystemView(basePath).getLatestFileSlicesBeforeOrOn(partitionPath, maxInstantTime) return viewManager.getFileSystemView(basePath).getLatestFileSlicesBeforeOrOn(partitionPath, maxInstantTime,
.map(FileSliceDTO::fromFileSlice).collect(Collectors.toList()); includeFileSlicesInPendingCompaction).map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
} }
public List<FileSliceDTO> getLatestUnCompactedFileSlices(String basePath, String partitionPath) { public List<FileSliceDTO> getLatestUnCompactedFileSlices(String basePath, String partitionPath) {