From 6d01ae8ca0e31c83550faacf2eeb34d9672a7985 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Wed, 23 May 2018 16:54:53 -0700 Subject: [PATCH] FileSystemView and Timeline level changes to support Async Compaction --- .../com/uber/hoodie/HoodieWriteClient.java | 10 +- .../uber/hoodie/io/HoodieAppendHandle.java | 2 +- .../compact/HoodieRealtimeTableCompactor.java | 2 +- .../hoodie/table/HoodieMergeOnReadTable.java | 5 +- .../com/uber/hoodie/table/HoodieTable.java | 5 +- .../hoodie/table/TestMergeOnReadTable.java | 6 +- .../uber/hoodie/common/model/FileSlice.java | 14 +- .../hoodie/common/model/HoodieFileGroup.java | 87 ++-- .../hoodie/common/model/HoodieLogFile.java | 12 +- .../common/table/HoodieTableMetaClient.java | 17 + .../hoodie/common/table/HoodieTimeline.java | 70 ++- .../common/table/TableFileSystemView.java | 16 + .../log/AbstractHoodieLogRecordScanner.java | 1 + .../table/timeline/HoodieActiveTimeline.java | 65 ++- .../table/timeline/HoodieDefaultTimeline.java | 23 + .../common/table/timeline/HoodieInstant.java | 60 ++- .../table/view/HoodieTableFileSystemView.java | 183 +++++++- .../string/HoodieActiveTimelineTest.java | 4 +- .../view/HoodieTableFileSystemViewTest.java | 433 +++++++++++++++++- .../realtime/HoodieRealtimeInputFormat.java | 9 +- 20 files changed, 892 insertions(+), 132 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index ef805a94f..061fa908e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -398,7 +398,7 @@ public class HoodieWriteClient implements Seriali }); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - Optional instant = activeTimeline.filterInflights().lastInstant(); + Optional instant = activeTimeline.filterInflightsExcludingCompaction().lastInstant(); activeTimeline.saveToInflight(instant.get(), Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } catch (IOException io) { @@ -692,7 +692,7 @@ public class HoodieWriteClient implements Seriali HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline(); + HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline(); HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); @@ -709,8 +709,8 @@ public class HoodieWriteClient implements Seriali rollback(commitsToRollback); // Make sure the rollback was successful - Optional lastInstant = activeTimeline.reload().getCommitsTimeline() - .filterCompletedInstants().lastInstant(); + Optional lastInstant = activeTimeline.reload().getCommitsAndCompactionTimeline() + .filterCompletedAndCompactionInstants().lastInstant(); Preconditions.checkArgument(lastInstant.isPresent()); Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime), savepointTime + "is not the last commit after rolling back " + commitsToRollback @@ -1051,7 +1051,7 @@ public class HoodieWriteClient implements Seriali private void rollbackInflightCommits() { HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterInflights(); + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsExcludingCompaction(); List commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); Collections.reverse(commits); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index d53c19bc2..fa7d857d6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -113,7 +113,7 @@ public class HoodieAppendHandle extends HoodieIOH .filter(fileSlice1 -> fileSlice1.getFileId().equals(fileId)).findFirst(); String baseInstantTime = commitTime; if (fileSlice.isPresent()) { - baseInstantTime = fileSlice.get().getBaseCommitTime(); + baseInstantTime = fileSlice.get().getBaseInstantTime(); } else { // This means there is no base data file, start appending to a new log file fileSlice = Optional.of(new FileSlice(baseInstantTime, this.fileId)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index fc03b6cad..5a60f0d8a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -182,7 +182,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .getLatestFileSlices(partitionPath).map( s -> { List logFiles = s.getLogFiles().sorted(HoodieLogFile - .getLogVersionComparator().reversed()).collect(Collectors.toList()); + .getBaseInstantAndLogVersionComparator().reversed()).collect(Collectors.toList()); totalLogFiles.add((long) logFiles.size()); totalFileSlices.add(1L); return new CompactionOperation(s.getDataFile(), partitionPath, logFiles, config); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 109397363..74e83e92c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -230,7 +230,7 @@ public class HoodieMergeOnReadTable extends // This needs to be done since GlobalIndex at the moment does not store the latest commit time Map fileIdToLatestCommitTimeMap = hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath) - .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseCommitTime)) : null; + .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)) : null; commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() .filter(wStat -> { if (wStat != null @@ -341,7 +341,8 @@ public class HoodieMergeOnReadTable extends // TODO - check if index.isglobal then small files are log files too Optional smallFileSlice = getRTFileSystemView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter( + // Use the merged file-slice for small file selection + .getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter( fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getDataFile().get().getFileSize() < config .getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) -> diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 383743a22..762313318 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -119,7 +119,8 @@ public abstract class HoodieTable implements Seri * Get the real time view of the file system for this table */ public TableFileSystemView.RealtimeView getRTFileSystemView() { - return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline()); + return new HoodieTableFileSystemView(metaClient, + metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants()); } /** @@ -140,7 +141,7 @@ public abstract class HoodieTable implements Seri * Get only the inflights (no-completed) commit timeline */ public HoodieTimeline getInflightCommitTimeline() { - return metaClient.getCommitsTimeline().filterInflights(); + return metaClient.getCommitsTimeline().filterInflightsExcludingCompaction(); } /** diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 863ff1998..980b3d077 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -595,8 +595,9 @@ public class TestMergeOnReadTable { roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); + List dataFilesList = dataFilesToRead.collect(Collectors.toList()); assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", - dataFilesToRead.findAny().isPresent()); + dataFilesList.size() > 0); /** * Write 2 (only updates + inserts, written to .log file + correction of existing parquet @@ -624,7 +625,8 @@ public class TestMergeOnReadTable { roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); - Map parquetFileIdToNewSize = dataFilesToRead.collect( + List newDataFilesList = dataFilesToRead.collect(Collectors.toList()); + Map parquetFileIdToNewSize = newDataFilesList.stream().collect( Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize)); assertTrue(parquetFileIdToNewSize.entrySet().stream() diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java index b0f4c4182..0720fc99b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java @@ -37,7 +37,7 @@ public class FileSlice implements Serializable { /** * Point in the timeline, at which the slice was created */ - private String baseCommitTime; + private String baseInstantTime; /** * data file, with the compacted data, for this slice @@ -50,11 +50,11 @@ public class FileSlice implements Serializable { */ private final TreeSet logFiles; - public FileSlice(String baseCommitTime, String fileId) { + public FileSlice(String baseInstantTime, String fileId) { this.fileId = fileId; - this.baseCommitTime = baseCommitTime; + this.baseInstantTime = baseInstantTime; this.dataFile = null; - this.logFiles = new TreeSet<>(HoodieLogFile.getLogVersionComparator()); + this.logFiles = new TreeSet<>(HoodieLogFile.getBaseInstantAndLogVersionComparator()); } public void setDataFile(HoodieDataFile dataFile) { @@ -69,8 +69,8 @@ public class FileSlice implements Serializable { return logFiles.stream(); } - public String getBaseCommitTime() { - return baseCommitTime; + public String getBaseInstantTime() { + return baseInstantTime; } public String getFileId() { @@ -84,7 +84,7 @@ public class FileSlice implements Serializable { @Override public String toString() { final StringBuilder sb = new StringBuilder("FileSlice {"); - sb.append("baseCommitTime=").append(baseCommitTime); + sb.append("baseCommitTime=").append(baseInstantTime); sb.append(", dataFile='").append(dataFile).append('\''); sb.append(", logFiles='").append(logFiles).append('\''); sb.append('}'); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java index f3111a109..1f30cfad4 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieFileGroup.java @@ -72,6 +72,16 @@ public class HoodieFileGroup implements Serializable { this.lastInstant = timeline.lastInstant(); } + /** + * Potentially add a new file-slice by adding base-instant time + * A file-slice without any data-file and log-files can exist (if a compaction just got requested) + */ + public void addNewFileSliceAtInstant(String baseInstantTime) { + if (!fileSlices.containsKey(baseInstantTime)) { + fileSlices.put(baseInstantTime, new FileSlice(baseInstantTime, id)); + } + } + /** * Add a new datafile into the file group */ @@ -106,13 +116,27 @@ public class HoodieFileGroup implements Serializable { */ private boolean isFileSliceCommitted(FileSlice slice) { String maxCommitTime = lastInstant.get().getTimestamp(); - return timeline.containsOrBeforeTimelineStarts(slice.getBaseCommitTime()) - && HoodieTimeline.compareTimestamps(slice.getBaseCommitTime(), + return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()) + && HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), maxCommitTime, HoodieTimeline.LESSER_OR_EQUAL); } + /** + * Get all the the file slices including in-flight ones as seen in underlying file-system + */ + public Stream getAllFileSlicesIncludingInflight() { + return fileSlices.entrySet().stream().map(sliceEntry -> sliceEntry.getValue()); + } + + /** + * Get latest file slices including in-flight ones + */ + public Optional getLatestFileSlicesIncludingInflight() { + return getAllFileSlicesIncludingInflight().findFirst(); + } + /** * Provides a stream of committed file slices, sorted reverse base commit time. */ @@ -141,15 +165,29 @@ public class HoodieFileGroup implements Serializable { public Optional getLatestFileSliceBeforeOrOn(String maxCommitTime) { return getAllFileSlices() .filter(slice -> - HoodieTimeline.compareTimestamps(slice.getBaseCommitTime(), + HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), maxCommitTime, HoodieTimeline.LESSER_OR_EQUAL)) .findFirst(); } + /** + * Obtain the latest file slice, upto a commitTime i.e < maxInstantTime + * @param maxInstantTime Max Instant Time + * @return + */ + public Optional getLatestFileSliceBefore(String maxInstantTime) { + return getAllFileSlices() + .filter(slice -> + HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), + maxInstantTime, + HoodieTimeline.LESSER)) + .findFirst(); + } + public Optional getLatestFileSliceInRange(List commitRange) { return getAllFileSlices() - .filter(slice -> commitRange.contains(slice.getBaseCommitTime())) + .filter(slice -> commitRange.contains(slice.getBaseInstantTime())) .findFirst(); } @@ -162,47 +200,6 @@ public class HoodieFileGroup implements Serializable { .map(slice -> slice.getDataFile().get()); } - /** - * Get the latest committed data file - */ - public Optional getLatestDataFile() { - return getAllDataFiles().findFirst(); - } - - /** - * Get the latest data file, that is <= max commit time - */ - public Optional getLatestDataFileBeforeOrOn(String maxCommitTime) { - return getAllDataFiles() - .filter(dataFile -> - HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), - maxCommitTime, - HoodieTimeline.LESSER_OR_EQUAL)) - .findFirst(); - } - - /** - * Get the latest data file, that is contained within the provided commit range. - */ - public Optional getLatestDataFileInRange(List commitRange) { - return getAllDataFiles() - .filter(dataFile -> commitRange.contains(dataFile.getCommitTime())) - .findFirst(); - } - - /** - * Obtain the latest log file (based on latest committed data file), currently being appended to - * - * @return logfile if present, empty if no log file has been opened already. - */ - public Optional getLatestLogFile() { - Optional latestSlice = getLatestFileSlice(); - if (latestSlice.isPresent() && latestSlice.get().getLogFiles().count() > 0) { - return latestSlice.get().getLogFiles().findFirst(); - } - return Optional.empty(); - } - @Override public String toString() { final StringBuilder sb = new StringBuilder("HoodieFileGroup {"); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java index 7dfaf0bdf..9f159d6ca 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java @@ -94,10 +94,16 @@ public class HoodieLogFile implements Serializable { FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion))); } - public static Comparator getLogVersionComparator() { + public static Comparator getBaseInstantAndLogVersionComparator() { return (o1, o2) -> { - // reverse the order - return new Integer(o2.getLogVersion()).compareTo(o1.getLogVersion()); + String baseInstantTime1 = o1.getBaseCommitTime(); + String baseInstantTime2 = o2.getBaseCommitTime(); + if (baseInstantTime1.equals(baseInstantTime2)) { + // reverse the order by log-version when base-commit is same + return new Integer(o2.getLogVersion()).compareTo(o1.getLogVersion()); + } + // reverse the order by base-commits + return new Integer(baseInstantTime2.compareTo(baseInstantTime1)); }; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index 3a89f96d0..1a0edfc76 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -274,6 +274,23 @@ public class HoodieTableMetaClient implements Serializable { } } + /** + * Get the commit + pending-compaction timeline visible for this table. + * A RT filesystem view is constructed with this timeline so that file-slice after pending compaction-requested + * instant-time is also considered valid. A RT file-system view for reading must then merge the file-slices before + * and after pending compaction instant so that all delta-commits are read. + */ + public HoodieTimeline getCommitsAndCompactionTimeline() { + switch (this.getTableType()) { + case COPY_ON_WRITE: + return getActiveTimeline().getCommitTimeline(); + case MERGE_ON_READ: + return getActiveTimeline().getCommitsAndCompactionTimeline(); + default: + throw new HoodieException("Unsupported table type :" + this.getTableType()); + } + } + /** * Get the compacted commit timeline visible for this table */ diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java index 867fa5a10..790ba770e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java @@ -18,10 +18,12 @@ package com.uber.hoodie.common.table; import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import java.io.Serializable; import java.util.Optional; import java.util.function.BiPredicate; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; /** * HoodieTimeline is a view of meta-data instants in the hoodie dataset. Instants are specific @@ -42,6 +44,10 @@ public interface HoodieTimeline extends Serializable { String ROLLBACK_ACTION = "rollback"; String SAVEPOINT_ACTION = "savepoint"; String INFLIGHT_EXTENSION = ".inflight"; + // With Async Compaction, compaction instant can be in 3 states : + // (compaction-requested), (compaction-inflight), (completed) + String COMPACTION_ACTION = "compaction"; + String REQUESTED_EXTENSION = ".requested"; String COMMIT_EXTENSION = "." + COMMIT_ACTION; String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; @@ -54,6 +60,12 @@ public interface HoodieTimeline extends Serializable { String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION; String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_COMPACTION_SUFFIX = + StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION); + String REQUESTED_COMPACTION_EXTENSION = + StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX); + String INFLIGHT_COMPACTION_EXTENSION = + StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION); /** * Filter this timeline to just include the in-flights @@ -62,6 +74,13 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filterInflights(); + /** + * Filter this timeline to just include the in-flights excluding compaction instants + * + * @return New instance of HoodieTimeline with just in-flights excluding compaction inflights + */ + HoodieTimeline filterInflightsExcludingCompaction(); + /** * Filter this timeline to just include the completed instants * @@ -69,6 +88,20 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filterCompletedInstants(); + /** + * Filter this timeline to just include the completed + compaction (inflight + requested) instants + * A RT filesystem view is constructed with this timeline so that file-slice after pending compaction-requested + * instant-time is also considered valid. A RT file-system view for reading must then merge the file-slices before + * and after pending compaction instant so that all delta-commits are read. + * @return New instance of HoodieTimeline with just completed instants + */ + HoodieTimeline filterCompletedAndCompactionInstants(); + + /** + * Filter this timeline to just include inflight and requested compaction instants + * @return + */ + HoodieTimeline filterPendingCompactionTimeline(); /** * Create a new Timeline with instants after startTs and before or on endTs @@ -157,45 +190,60 @@ public interface HoodieTimeline extends Serializable { return new HoodieInstant(false, instant.getAction(), instant.getTimestamp()); } + static HoodieInstant getCompactionRequestedInstant(final String timestamp) { + return new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, timestamp); + } + + static HoodieInstant getCompactionInflightInstant(final String timestamp) { + return new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, timestamp); + } static HoodieInstant getInflightInstant(final HoodieInstant instant) { return new HoodieInstant(true, instant.getAction(), instant.getTimestamp()); } static String makeCommitFileName(String commitTime) { - return commitTime + HoodieTimeline.COMMIT_EXTENSION; + return StringUtils.join(commitTime, HoodieTimeline.COMMIT_EXTENSION); } static String makeInflightCommitFileName(String commitTime) { - return commitTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; + return StringUtils.join(commitTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION); } static String makeCleanerFileName(String instant) { - return instant + HoodieTimeline.CLEAN_EXTENSION; + return StringUtils.join(instant, HoodieTimeline.CLEAN_EXTENSION); } static String makeInflightCleanerFileName(String instant) { - return instant + HoodieTimeline.INFLIGHT_CLEAN_EXTENSION; + return StringUtils.join(instant, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION); } static String makeRollbackFileName(String instant) { - return instant + HoodieTimeline.ROLLBACK_EXTENSION; + return StringUtils.join(instant, HoodieTimeline.ROLLBACK_EXTENSION); } static String makeInflightRollbackFileName(String instant) { - return instant + HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION; + return StringUtils.join(instant, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION); } static String makeInflightSavePointFileName(String commitTime) { - return commitTime + HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION; + return StringUtils.join(commitTime, HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION); } static String makeSavePointFileName(String commitTime) { - return commitTime + HoodieTimeline.SAVEPOINT_EXTENSION; + return StringUtils.join(commitTime, HoodieTimeline.SAVEPOINT_EXTENSION); } static String makeInflightDeltaFileName(String commitTime) { - return commitTime + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION; + return StringUtils.join(commitTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); + } + + static String makeInflightCompactionFileName(String commitTime) { + return StringUtils.join(commitTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION); + } + + static String makeRequestedCompactionFileName(String commitTime) { + return StringUtils.join(commitTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION); } static String makeDeltaFileName(String commitTime) { @@ -211,8 +259,6 @@ public interface HoodieTimeline extends Serializable { } static String makeFileNameAsInflight(String fileName) { - return fileName + HoodieTimeline.INFLIGHT_EXTENSION; + return StringUtils.join(fileName, HoodieTimeline.INFLIGHT_EXTENSION); } - - } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java index d9ffae790..637f6e874 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java @@ -73,6 +73,12 @@ public interface TableFileSystemView { */ Stream getLatestFileSlices(String partitionPath); + /** + * Stream all the latest uncompacted file slices in the given partition + */ + Stream getLatestUnCompactedFileSlices(String partitionPath); + + /** * Stream all the latest file slices in the given partition with precondition that * commitTime(file) before maxCommitTime @@ -80,6 +86,16 @@ public interface TableFileSystemView { Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime); + /** + * Stream all "merged" file-slices before on an instant time + * If a file-group has a pending compaction request, the file-slice before and after compaction request instant + * is merged and returned. + * @param partitionPath Partition Path + * @param maxInstantTime Max Instant Time + * @return + */ + public Stream getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime); + /** * Stream all the latest file slices, in the given range */ diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java index a1309c73f..427adda65 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java @@ -64,6 +64,7 @@ public abstract class AbstractHoodieLogRecordScanner { // Reader schema for the records private final Schema readerSchema; // Latest valid instant time + // Log-Blocks belonging to inflight delta-instants are filtered-out using this high-watermark. private final String latestInstantTime; private final HoodieTableMetaClient hoodieTableMetaClient; // Merge strategy to use when combining records from log diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index a8d35411e..20dea2805 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -95,7 +95,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { this(metaClient, new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, - CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION}); + CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION}); } /** @@ -118,19 +118,31 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { /** * Get all instants (commits, delta commits) that produce new data, in the active timeline * + * */ public HoodieTimeline getCommitsTimeline() { return getTimelineOfActions( Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); } + /** + * Get all instants (commits, delta commits, in-flight/request compaction) that produce new data, in the active + * timeline * + * With Async compaction a requested/inflight compaction-instant is a valid baseInstant for a file-slice as there + * could be delta-commits with that baseInstant. + */ + public HoodieTimeline getCommitsAndCompactionTimeline() { + return getTimelineOfActions( + Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION)); + } + /** * Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions, * in the active timeline * */ public HoodieTimeline getAllCommitsTimeline() { return getTimelineOfActions( - Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, + Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION)); } @@ -200,7 +212,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { log.info("Marking instant complete " + instant); Preconditions.checkArgument(instant.isInflight(), "Could not mark an already completed instant as complete again " + instant); - moveInflightToComplete(instant, HoodieTimeline.getCompletedInstant(instant), data); + transitionState(instant, HoodieTimeline.getCompletedInstant(instant), data); log.info("Completed " + instant); } @@ -211,7 +223,18 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } public void deleteInflight(HoodieInstant instant) { - log.info("Deleting in-flight " + instant); + Preconditions.checkArgument(instant.isInflight()); + deleteInstantFile(instant); + } + + public void deleteCompactionRequested(HoodieInstant instant) { + Preconditions.checkArgument(instant.isRequested()); + Preconditions.checkArgument(instant.getAction() == HoodieTimeline.COMPACTION_ACTION); + deleteInstantFile(instant); + } + + private void deleteInstantFile(HoodieInstant instant) { + log.info("Deleting instant " + instant); Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName()); try { boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false); @@ -232,24 +255,43 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return readDataFromPath(detailPath); } - protected void moveInflightToComplete(HoodieInstant inflight, HoodieInstant completed, + public void revertFromInflightToRequested(HoodieInstant inflightInstant, HoodieInstant requestedInstant, Optional data) { - Path commitFilePath = new Path(metaClient.getMetaPath(), completed.getFileName()); + Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + transitionState(inflightInstant, requestedInstant, data); + } + + public void transitionFromRequestedToInflight(HoodieInstant requestedInstant, HoodieInstant inflightInstant, + Optional data) { + Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + transitionState(requestedInstant, inflightInstant, data); + } + + protected void moveInflightToComplete(HoodieInstant inflightInstant, HoodieInstant commitInstant, + Optional data) { + transitionState(inflightInstant, commitInstant, data); + } + + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, + Optional data) { + Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); + Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName()); try { // open a new file and write the commit metadata in - Path inflightCommitFile = new Path(metaClient.getMetaPath(), inflight.getFileName()); - createFileInMetaPath(inflight.getFileName(), data); + Path inflightCommitFile = new Path(metaClient.getMetaPath(), fromInstant.getFileName()); + createFileInMetaPath(fromInstant.getFileName(), data); boolean success = metaClient.getFs().rename(inflightCommitFile, commitFilePath); if (!success) { throw new HoodieIOException( "Could not rename " + inflightCommitFile + " to " + commitFilePath); } } catch (IOException e) { - throw new HoodieIOException("Could not complete " + inflight, e); + throw new HoodieIOException("Could not complete " + fromInstant, e); } } protected void moveCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { + Preconditions.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp())); Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName()); try { if (!metaClient.getFs().exists(inFlightCommitFilePath)) { @@ -269,6 +311,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { createFileInMetaPath(instant.getFileName(), content); } + public void saveToRequested(HoodieInstant instant, Optional content) { + Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + createFileInMetaPath(instant.getFileName(), content); + } + protected void createFileInMetaPath(String filename, Optional content) { Path fullPath = new Path(metaClient.getMetaPath(), filename); try { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java index c855203a3..87c664c60 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java @@ -53,15 +53,38 @@ public class HoodieDefaultTimeline implements HoodieTimeline { public HoodieDefaultTimeline() { } + @Override public HoodieTimeline filterInflights() { return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details); } + @Override + public HoodieTimeline filterInflightsExcludingCompaction() { + return new HoodieDefaultTimeline(instants.stream().filter(instant -> { + return instant.isInflight() && (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + }), details); + } + + @Override public HoodieTimeline filterCompletedInstants() { return new HoodieDefaultTimeline(instants.stream().filter(s -> !s.isInflight()), details); } + @Override + public HoodieTimeline filterCompletedAndCompactionInstants() { + return new HoodieDefaultTimeline(instants.stream().filter(s -> { + return !s.isInflight() || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION); + }), details); + } + + @Override + public HoodieTimeline filterPendingCompactionTimeline() { + return new HoodieDefaultTimeline( + instants.stream().filter(s -> s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), + details); + } + @Override public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) { return new HoodieDefaultTimeline(instants.stream().filter( diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java index 59be65555..f87d614da 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java @@ -30,7 +30,19 @@ import org.apache.hadoop.fs.FileStatus; */ public class HoodieInstant implements Serializable { - private boolean isInflight = false; + /** + * Instant State + */ + public enum State { + // Requested State (valid state for Compaction) + REQUESTED, + // Inflight instant + INFLIGHT, + // Committed instant + COMPLETED + } + + private State state = State.COMPLETED; private String action; private String timestamp; @@ -49,21 +61,35 @@ public class HoodieInstant implements Serializable { // This is to support backwards compatibility on how in-flight commit files were written // General rule is inflight extension is ..inflight, but for commit it is .inflight action = "commit"; - isInflight = true; + state = State.INFLIGHT; } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) { - isInflight = true; + state = State.INFLIGHT; action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, ""); + } else if (action.equals(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)) { + state = State.REQUESTED; + action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, ""); } } public HoodieInstant(boolean isInflight, String action, String timestamp) { - this.isInflight = isInflight; + //TODO: vb - Preserving for avoiding cascading changes. This constructor will be updated in subsequent PR + this.state = isInflight ? State.INFLIGHT : State.COMPLETED; + this.action = action; + this.timestamp = timestamp; + } + + public HoodieInstant(State state, String action, String timestamp) { + this.state = state; this.action = action; this.timestamp = timestamp; } public boolean isInflight() { - return isInflight; + return state == State.INFLIGHT; + } + + public boolean isRequested() { + return state == State.REQUESTED; } public String getAction() { @@ -79,20 +105,28 @@ public class HoodieInstant implements Serializable { */ public String getFileName() { if (HoodieTimeline.COMMIT_ACTION.equals(action)) { - return isInflight ? HoodieTimeline.makeInflightCommitFileName(timestamp) + return isInflight() ? HoodieTimeline.makeInflightCommitFileName(timestamp) : HoodieTimeline.makeCommitFileName(timestamp); } else if (HoodieTimeline.CLEAN_ACTION.equals(action)) { - return isInflight ? HoodieTimeline.makeInflightCleanerFileName(timestamp) + return isInflight() ? HoodieTimeline.makeInflightCleanerFileName(timestamp) : HoodieTimeline.makeCleanerFileName(timestamp); } else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) { - return isInflight ? HoodieTimeline.makeInflightRollbackFileName(timestamp) + return isInflight() ? HoodieTimeline.makeInflightRollbackFileName(timestamp) : HoodieTimeline.makeRollbackFileName(timestamp); } else if (HoodieTimeline.SAVEPOINT_ACTION.equals(action)) { - return isInflight ? HoodieTimeline.makeInflightSavePointFileName(timestamp) + return isInflight() ? HoodieTimeline.makeInflightSavePointFileName(timestamp) : HoodieTimeline.makeSavePointFileName(timestamp); } else if (HoodieTimeline.DELTA_COMMIT_ACTION.equals(action)) { - return isInflight ? HoodieTimeline.makeInflightDeltaFileName(timestamp) + return isInflight() ? HoodieTimeline.makeInflightDeltaFileName(timestamp) : HoodieTimeline.makeDeltaFileName(timestamp); + } else if (HoodieTimeline.COMPACTION_ACTION.equals(action)) { + if (isInflight()) { + return HoodieTimeline.makeInflightCompactionFileName(timestamp); + } else if (isRequested()) { + return HoodieTimeline.makeRequestedCompactionFileName(timestamp); + } else { + return HoodieTimeline.makeCommitFileName(timestamp); + } } throw new IllegalArgumentException("Cannot get file name for unknown action " + action); } @@ -106,18 +140,18 @@ public class HoodieInstant implements Serializable { return false; } HoodieInstant that = (HoodieInstant) o; - return isInflight == that.isInflight + return state == that.state && Objects.equals(action, that.action) && Objects.equals(timestamp, that.timestamp); } @Override public int hashCode() { - return Objects.hash(isInflight, action, timestamp); + return Objects.hash(state, action, timestamp); } @Override public String toString() { - return "[" + ((isInflight) ? "==>" : "") + timestamp + "__" + action + "]"; + return "[" + ((isInflight() || isRequested()) ? "==>" : "") + timestamp + "__" + action + "__" + state + "]"; } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 8978bf91a..fe6e326a0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -64,6 +64,11 @@ public class HoodieTableFileSystemView implements TableFileSystemView, // mapping from file id to the file group. protected HashMap fileGroupMap; + /** + * File Id to pending compaction instant time + */ + private final Map fileIdToPendingCompactionInstantTime; + /** * Create a file system view, as of the given timeline */ @@ -73,6 +78,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView, this.visibleActiveTimeline = visibleActiveTimeline; this.fileGroupMap = new HashMap<>(); this.partitionToFileGroupsMap = new HashMap<>(); + //TODO: vb Will be implemented in next PR + this.fileIdToPendingCompactionInstantTime = new HashMap<>(); } @@ -128,14 +135,19 @@ public class HoodieTableFileSystemView implements TableFileSystemView, List fileGroups = new ArrayList<>(); fileIdSet.forEach(pair -> { - HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), pair.getValue(), - visibleActiveTimeline); + String fileId = pair.getValue(); + HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, visibleActiveTimeline); if (dataFiles.containsKey(pair)) { dataFiles.get(pair).forEach(dataFile -> group.addDataFile(dataFile)); } if (logFiles.containsKey(pair)) { logFiles.get(pair).forEach(logFile -> group.addLogFile(logFile)); } + if (fileIdToPendingCompactionInstantTime.containsKey(fileId)) { + // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears + // so that any new ingestion uses the correct base-instant + group.addNewFileSliceAtInstant(fileIdToPendingCompactionInstantTime.get(fileId)); + } fileGroups.add(group); }); @@ -165,19 +177,37 @@ public class HoodieTableFileSystemView implements TableFileSystemView, return Arrays.stream(statuses).filter(rtFilePredicate).map(HoodieLogFile::new); } + /** + * With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore + * those data-files + * + * @param dataFile Data File + */ + private boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) { + String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(dataFile.getFileId()); + if ((null != compactionInstantTime) && dataFile.getCommitTime().equals(compactionInstantTime)) { + return true; + } + return false; + } + @Override public Stream getLatestDataFiles(final String partitionPath) { return getAllFileGroups(partitionPath) - .map(fileGroup -> fileGroup.getLatestDataFile()) - .filter(dataFileOpt -> dataFileOpt.isPresent()) + .map(fileGroup -> { + return fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst(); + }) + .filter(Optional::isPresent) .map(Optional::get); } @Override public Stream getLatestDataFiles() { return fileGroupMap.values().stream() - .map(fileGroup -> fileGroup.getLatestDataFile()) - .filter(dataFileOpt -> dataFileOpt.isPresent()) + .map(fileGroup -> { + return fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst(); + }) + .filter(Optional::isPresent) .map(Optional::get); } @@ -185,16 +215,29 @@ public class HoodieTableFileSystemView implements TableFileSystemView, public Stream getLatestDataFilesBeforeOrOn(String partitionPath, String maxCommitTime) { return getAllFileGroups(partitionPath) - .map(fileGroup -> fileGroup.getLatestDataFileBeforeOrOn(maxCommitTime)) - .filter(dataFileOpt -> dataFileOpt.isPresent()) + .map(fileGroup -> { + return fileGroup.getAllDataFiles() + .filter(dataFile -> + HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), + maxCommitTime, + HoodieTimeline.LESSER_OR_EQUAL)) + .filter(df -> !isDataFileDueToPendingCompaction(df)) + .findFirst(); + }) + .filter(Optional::isPresent) .map(Optional::get); } @Override public Stream getLatestDataFilesInRange(List commitsToReturn) { return fileGroupMap.values().stream() - .map(fileGroup -> fileGroup.getLatestDataFileInRange(commitsToReturn)) - .filter(dataFileOpt -> dataFileOpt.isPresent()) + .map(fileGroup -> { + return fileGroup.getAllDataFiles() + .filter(dataFile -> commitsToReturn.contains(dataFile.getCommitTime()) + && !isDataFileDueToPendingCompaction(dataFile)) + .findFirst(); + }) + .filter(Optional::isPresent) .map(Optional::get); } @@ -202,23 +245,125 @@ public class HoodieTableFileSystemView implements TableFileSystemView, public Stream getAllDataFiles(String partitionPath) { return getAllFileGroups(partitionPath) .map(fileGroup -> fileGroup.getAllDataFiles()) - .flatMap(dataFileList -> dataFileList); + .flatMap(dataFileList -> dataFileList) + .filter(df -> !isDataFileDueToPendingCompaction(df)); } @Override public Stream getLatestFileSlices(String partitionPath) { return getAllFileGroups(partitionPath) .map(fileGroup -> fileGroup.getLatestFileSlice()) - .filter(dataFileOpt -> dataFileOpt.isPresent()) + .filter(Optional::isPresent) + .map(Optional::get) + .map(this::filterDataFileAfterPendingCompaction); + } + + @Override + public Stream getLatestUnCompactedFileSlices(String partitionPath) { + return getAllFileGroups(partitionPath) + .map(fileGroup -> { + FileSlice fileSlice = fileGroup.getLatestFileSlice().get(); + // if the file-group is under compaction, pick the latest before compaction instant time. + if (isFileSliceAfterPendingCompaction(fileSlice)) { + String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId()); + return fileGroup.getLatestFileSliceBefore(compactionInstantTime); + } + return Optional.of(fileSlice); + }) .map(Optional::get); } + /** + * Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches + * compaction Instant + * @param fileSlice File Slice + * @return + */ + private boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) { + String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId()); + if ((null != compactionInstantTime) && fileSlice.getBaseInstantTime().equals(compactionInstantTime)) { + return true; + } + return false; + } + + /** + * With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, + * Ignore those data-files + * @param fileSlice File Slice + * @return + */ + private FileSlice filterDataFileAfterPendingCompaction(FileSlice fileSlice) { + if (isFileSliceAfterPendingCompaction(fileSlice)) { + // Data file is filtered out of the file-slice as the corresponding compaction + // instant not completed yet. + FileSlice transformed = new FileSlice(fileSlice.getBaseInstantTime(), fileSlice.getFileId()); + fileSlice.getLogFiles().forEach(lf -> transformed.addLogFile(lf)); + return transformed; + } + return fileSlice; + } + @Override public Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) { return getAllFileGroups(partitionPath) .map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime)) - .filter(dataFileOpt -> dataFileOpt.isPresent()) + .filter(Optional::isPresent) + .map(Optional::get) + .map(this::filterDataFileAfterPendingCompaction); + } + + /** + * Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet. + * + * @param lastSlice Latest File slice for a file-group + * @param penultimateSlice Penultimate file slice for a file-group in commit timeline order + */ + private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) { + FileSlice merged = new FileSlice(penultimateSlice.getBaseInstantTime(), penultimateSlice.getFileId()); + if (penultimateSlice.getDataFile().isPresent()) { + merged.setDataFile(penultimateSlice.getDataFile().get()); + } + // Add Log files from penultimate and last slices + penultimateSlice.getLogFiles().forEach(lf -> merged.addLogFile(lf)); + lastSlice.getLogFiles().forEach(lf -> merged.addLogFile(lf)); + return merged; + } + + /** + * If the file-slice is because of pending compaction instant, this method merges the file-slice with the one before + * the compaction instant time + * @param fileGroup File Group for which the file slice belongs to + * @param fileSlice File Slice which needs to be merged + * @return + */ + private FileSlice getMergedFileSlice(HoodieFileGroup fileGroup, FileSlice fileSlice) { + // if the file-group is under construction, pick the latest before compaction instant time. + if (fileIdToPendingCompactionInstantTime.containsKey(fileSlice.getFileId())) { + String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId()); + if (fileSlice.getBaseInstantTime().equals(compactionInstantTime)) { + Optional prevFileSlice = fileGroup.getLatestFileSliceBefore(compactionInstantTime); + if (prevFileSlice.isPresent()) { + return mergeCompactionPendingFileSlices(fileSlice, prevFileSlice.get()); + } + } + } + return fileSlice; + } + + @Override + public Stream getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime) { + return getAllFileGroups(partitionPath) + .map(fileGroup -> { + Optional fileSlice = fileGroup.getLatestFileSliceBeforeOrOn(maxInstantTime); + // if the file-group is under construction, pick the latest before compaction instant time. + if (fileSlice.isPresent()) { + fileSlice = Optional.of(getMergedFileSlice(fileGroup, fileSlice.get())); + } + return fileSlice; + }) + .filter(Optional::isPresent) .map(Optional::get); } @@ -226,7 +371,6 @@ public class HoodieTableFileSystemView implements TableFileSystemView, public Stream getLatestFileSliceInRange(List commitsToReturn) { return fileGroupMap.values().stream() .map(fileGroup -> fileGroup.getLatestFileSliceInRange(commitsToReturn)) - .filter(dataFileOpt -> dataFileOpt.isPresent()) .map(Optional::get); } @@ -260,4 +404,15 @@ public class HoodieTableFileSystemView implements TableFileSystemView, "Failed to list data files in partition " + partitionPathStr, e); } } + + /** + * Used by tests to add pending compaction entries TODO: This method is temporary and should go away in subsequent + * Async Compaction PR + * + * @param fileId File Id + * @param compactionInstantTime Compaction Instant Time + */ + protected void addPendingCompactionFileId(String fileId, String compactionInstantTime) { + fileIdToPendingCompactionInstantTime.put(fileId, compactionInstantTime); + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java index df642490b..37a0152e4 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/string/HoodieActiveTimelineTest.java @@ -80,7 +80,7 @@ public class HoodieActiveTimelineTest { Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete), timeline.getCommitTimeline().filterCompletedInstants().getInstants()); HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5), - timeline.getCommitTimeline().filterInflights().getInstants()); + timeline.getCommitTimeline().filterInflightsExcludingCompaction().getInstants()); } @Test @@ -106,7 +106,7 @@ public class HoodieActiveTimelineTest { timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 2).getInstants() .map(HoodieInstant::getTimestamp)); assertFalse(timeline.empty()); - assertFalse(timeline.getCommitTimeline().filterInflights().empty()); + assertFalse(timeline.getCommitTimeline().filterInflightsExcludingCompaction().empty()); assertEquals("", 12, timeline.countInstants()); HoodieTimeline activeCommitTimeline = timeline.getCommitTimeline().filterCompletedInstants(); assertEquals("", 10, activeCommitTimeline.countInstants()); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java index 686241271..757a3caae 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java @@ -32,15 +32,18 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.common.util.FSUtils; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.junit.Before; @@ -53,7 +56,7 @@ public class HoodieTableFileSystemViewTest { private HoodieTableMetaClient metaClient; private String basePath; - private TableFileSystemView fsView; + private HoodieTableFileSystemView fsView; private TableFileSystemView.ReadOptimizedView roView; private TableFileSystemView.RealtimeView rtView; @@ -74,15 +77,419 @@ public class HoodieTableFileSystemViewTest { metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); if (statuses != null) { fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), statuses); + metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), + statuses); } else { fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); + metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants()); } roView = (TableFileSystemView.ReadOptimizedView) fsView; rtView = (TableFileSystemView.RealtimeView) fsView; } + /** + * Test case for view generation on a file group where + * the only file-slice does not have data-file. This is the case where upserts directly go to log-files + */ + @Test + public void testViewForFileSlicesWithNoBaseFile() throws Exception { + String partitionPath = "2016/05/01"; + new File(basePath + "/" + partitionPath).mkdirs(); + String fileId = UUID.randomUUID().toString(); + + String instantTime1 = "1"; + String deltaInstantTime1 = "2"; + String deltaInstantTime2 = "3"; + String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0); + String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 1); + new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile(); + new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile(); + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1); + HoodieInstant deltaInstant2 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1); + HoodieInstant deltaInstant3 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2); + + commitTimeline.saveAsComplete(instant1, Optional.empty()); + commitTimeline.saveAsComplete(deltaInstant2, Optional.empty()); + commitTimeline.saveAsComplete(deltaInstant3, Optional.empty()); + + refreshFsView(null); + + List dataFiles = roView.getLatestDataFiles().collect(Collectors.toList()); + assertTrue("No data file expected", dataFiles.isEmpty()); + List fileSliceList = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); + assertEquals(1, fileSliceList.size()); + FileSlice fileSlice = fileSliceList.get(0); + assertEquals("File-Id must be set correctly", fileId, fileSlice.getFileId()); + assertFalse("Data file for base instant must be present", fileSlice.getDataFile().isPresent()); + assertEquals("Base Instant for file-group set correctly", instantTime1, fileSlice.getBaseInstantTime()); + List logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals("Correct number of log-files shows up in file-slice", 2, logFiles.size()); + assertEquals("Log File Order check", fileName2, logFiles.get(0).getFileName()); + assertEquals("Log File Order check", fileName1, logFiles.get(1).getFileName()); + + // Check Merged File Slices API + fileSliceList = rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime2) + .collect(Collectors.toList()); + assertEquals(1, fileSliceList.size()); + fileSlice = fileSliceList.get(0); + assertEquals("File-Id must be set correctly", fileId, fileSlice.getFileId()); + assertFalse("Data file for base instant must be present", fileSlice.getDataFile().isPresent()); + assertEquals("Base Instant for file-group set correctly", instantTime1, fileSlice.getBaseInstantTime()); + logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals("Correct number of log-files shows up in file-slice", 2, logFiles.size()); + assertEquals("Log File Order check", fileName2, logFiles.get(0).getFileName()); + assertEquals("Log File Order check", fileName1, logFiles.get(1).getFileName()); + + // Check UnCompacted File Slices API + fileSliceList = rtView.getLatestUnCompactedFileSlices(partitionPath).collect(Collectors.toList()); + assertEquals(1, fileSliceList.size()); + fileSlice = fileSliceList.get(0); + assertEquals("File-Id must be set correctly", fileId, fileSlice.getFileId()); + assertFalse("Data file for base instant must be present", fileSlice.getDataFile().isPresent()); + assertEquals("Base Instant for file-group set correctly", instantTime1, fileSlice.getBaseInstantTime()); + logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals("Correct number of log-files shows up in file-slice", 2, logFiles.size()); + assertEquals("Log File Order check", fileName2, logFiles.get(0).getFileName()); + assertEquals("Log File Order check", fileName1, logFiles.get(1).getFileName()); + } + + @Test + public void testViewForFileSlicesWithNoBaseFileAndRequestedCompaction() throws Exception { + testViewForFileSlicesWithAsyncCompaction(true, false); + } + + @Test + public void testViewForFileSlicesWithBaseFileAndRequestedCompaction() throws Exception { + testViewForFileSlicesWithAsyncCompaction(false, false); + } + + @Test + public void testViewForFileSlicesWithNoBaseFileAndInflightCompaction() throws Exception { + testViewForFileSlicesWithAsyncCompaction(true, true); + } + + @Test + public void testViewForFileSlicesWithBaseFileAndInflightCompaction() throws Exception { + testViewForFileSlicesWithAsyncCompaction(false, true); + } + + /** + * Returns all file-slices including uncommitted ones. + * @param partitionPath + * @return + */ + private Stream getAllRawFileSlices(String partitionPath) { + return fsView.getAllFileGroups(partitionPath) + .map(group -> group.getAllFileSlicesIncludingInflight()) + .flatMap(sliceList -> sliceList); + } + + /** + * Returns latest raw file-slices including uncommitted ones. + * @param partitionPath + * @return + */ + public Stream getLatestRawFileSlices(String partitionPath) { + return fsView.getAllFileGroups(partitionPath) + .map(fileGroup -> fileGroup.getLatestFileSlicesIncludingInflight()) + .filter(fileSliceOpt -> fileSliceOpt.isPresent()) + .map(Optional::get); + } + + /** + * Helper method to test Views in the presence of concurrent compaction + * @param skipCreatingDataFile if set, first File Slice will not have data-file set. This would + * simulate inserts going directly to log files + * @param isCompactionInFlight if set, compaction was inflight (running) when view was tested first time, + * otherwise compaction was in requested state + * @throws Exception + */ + private void testViewForFileSlicesWithAsyncCompaction(boolean skipCreatingDataFile, + boolean isCompactionInFlight) throws Exception { + String partitionPath = "2016/05/01"; + new File(basePath + "/" + partitionPath).mkdirs(); + String fileId = UUID.randomUUID().toString(); + + // if skipCreatingDataFile, then instantTime1 below acts like delta-commit, otherwise it is base-commit + String instantTime1 = "1"; + String deltaInstantTime1 = "2"; + String deltaInstantTime2 = "3"; + + String dataFileName = null; + if (!skipCreatingDataFile) { + dataFileName = FSUtils.makeDataFileName(instantTime1, 1, fileId); + new File(basePath + "/" + partitionPath + "/" + dataFileName).createNewFile(); + } + String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0); + String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 1); + new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile(); + new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile(); + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1); + HoodieInstant deltaInstant2 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1); + HoodieInstant deltaInstant3 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2); + + commitTimeline.saveAsComplete(instant1, Optional.empty()); + commitTimeline.saveAsComplete(deltaInstant2, Optional.empty()); + commitTimeline.saveAsComplete(deltaInstant3, Optional.empty()); + + // Fake delta-ingestion after compaction-requested + String compactionRequestedTime = "4"; + String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId); + HoodieInstant compactionInstant = null; + if (isCompactionInFlight) { + // Create a Data-file but this should be skipped by view + new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile(); + compactionInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime); + commitTimeline.saveToInflight(compactionInstant, Optional.empty()); + } else { + compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime); + commitTimeline.saveToRequested(compactionInstant, Optional.empty()); + } + String deltaInstantTime4 = "5"; + String deltaInstantTime5 = "6"; + List allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2, + compactionRequestedTime, deltaInstantTime4, deltaInstantTime5); + String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0); + String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 1); + new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile(); + new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile(); + HoodieInstant deltaInstant4 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime4); + HoodieInstant deltaInstant5 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime5); + commitTimeline.saveAsComplete(deltaInstant4, Optional.empty()); + commitTimeline.saveAsComplete(deltaInstant5, Optional.empty()); + refreshFsView(null); + fsView.addPendingCompactionFileId(fileId, compactionRequestedTime); + + List dataFiles = roView.getAllDataFiles(partitionPath).collect(Collectors.toList()); + if (skipCreatingDataFile) { + assertTrue("No data file expected", dataFiles.isEmpty()); + } else { + assertEquals("One data-file is expected as there is only one file-group", 1, dataFiles.size()); + assertEquals("Expect only valid data-file", dataFileName, dataFiles.get(0).getFileName()); + } + + /** Merge API Tests **/ + List fileSliceList = rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5) + .collect(Collectors.toList()); + assertEquals("Expect file-slice to be merged", 1, fileSliceList.size()); + FileSlice fileSlice = fileSliceList.get(0); + assertEquals(fileId, fileSlice.getFileId()); + if (!skipCreatingDataFile) { + assertEquals("Data file must be present", dataFileName, fileSlice.getDataFile().get().getFileName()); + } else { + assertFalse("No data-file expected as it was not created", fileSlice.getDataFile().isPresent()); + } + assertEquals("Base Instant of penultimate file-slice must be base instant", instantTime1, + fileSlice.getBaseInstantTime()); + List logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals("Log files must include those after compaction request", 4, logFiles.size()); + assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); + assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); + assertEquals("Log File Order check", fileName2, logFiles.get(2).getFileName()); + assertEquals("Log File Order check", fileName1, logFiles.get(3).getFileName()); + + fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5) + .collect(Collectors.toList()); + assertEquals("Expect only one file-id", 1, fileSliceList.size()); + fileSlice = fileSliceList.get(0); + assertEquals(fileId, fileSlice.getFileId()); + assertFalse("No data-file expected in latest file-slice", fileSlice.getDataFile().isPresent()); + assertEquals("Compaction requested instant must be base instant", compactionRequestedTime, + fileSlice.getBaseInstantTime()); + logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals("Log files must include only those after compaction request", 2, logFiles.size()); + assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); + assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); + + /** Data Files API tests */ + dataFiles = roView.getLatestDataFiles().collect(Collectors.toList()); + if (skipCreatingDataFile) { + assertEquals("Expect no data file to be returned", 0, dataFiles.size()); + } else { + assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); + dataFiles.stream().forEach(df -> { + assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1); + }); + } + dataFiles = roView.getLatestDataFiles(partitionPath).collect(Collectors.toList()); + if (skipCreatingDataFile) { + assertEquals("Expect no data file to be returned", 0, dataFiles.size()); + } else { + assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); + dataFiles.stream().forEach(df -> { + assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1); + }); + } + dataFiles = roView.getLatestDataFilesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList()); + if (skipCreatingDataFile) { + assertEquals("Expect no data file to be returned", 0, dataFiles.size()); + } else { + assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); + dataFiles.stream().forEach(df -> { + assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1); + }); + } + dataFiles = roView.getLatestDataFilesInRange(allInstantTimes).collect(Collectors.toList()); + if (skipCreatingDataFile) { + assertEquals("Expect no data file to be returned", 0, dataFiles.size()); + } else { + assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); + dataFiles.stream().forEach(df -> { + assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1); + }); + } + + /** Inflight/Orphan File-groups needs to be in the view **/ + + // There is a data-file with this inflight file-id + final String inflightFileId1 = UUID.randomUUID().toString(); + // There is a log-file with this inflight file-id + final String inflightFileId2 = UUID.randomUUID().toString(); + // There is an orphan data file with this file-id + final String orphanFileId1 = UUID.randomUUID().toString(); + // There is an orphan log data file with this file-id + final String orphanFileId2 = UUID.randomUUID().toString(); + final String invalidInstantId = "INVALIDTIME"; + String inflightDeltaInstantTime = "7"; + String orphanDataFileName = FSUtils.makeDataFileName(invalidInstantId, 1, orphanFileId1); + new File(basePath + "/" + partitionPath + "/" + orphanDataFileName).createNewFile(); + String orphanLogFileName = + FSUtils.makeLogFileName(orphanFileId2, HoodieLogFile.DELTA_EXTENSION, invalidInstantId, 0); + new File(basePath + "/" + partitionPath + "/" + orphanLogFileName).createNewFile(); + String inflightDataFileName = FSUtils.makeDataFileName(inflightDeltaInstantTime, 1, inflightFileId1); + new File(basePath + "/" + partitionPath + "/" + inflightDataFileName).createNewFile(); + String inflightLogFileName = + FSUtils.makeLogFileName(inflightFileId2, HoodieLogFile.DELTA_EXTENSION, inflightDeltaInstantTime, 0); + new File(basePath + "/" + partitionPath + "/" + inflightLogFileName).createNewFile(); + // Mark instant as inflight + commitTimeline.saveToInflight(new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, + inflightDeltaInstantTime), Optional.empty()); + refreshFsView(null); + fsView.addPendingCompactionFileId(fileId, compactionRequestedTime); + + List allRawFileSlices = getAllRawFileSlices(partitionPath).collect(Collectors.toList()); + dataFiles = allRawFileSlices.stream().flatMap(slice -> { + if (slice.getDataFile().isPresent()) { + return Stream.of(slice.getDataFile().get()); + } + return Stream.empty(); + }).collect(Collectors.toList()); + assertEquals("Inflight/Orphan data-file is also expected", 2 + + (isCompactionInFlight ? 1 : 0) + (skipCreatingDataFile ? 0 : 1), dataFiles.size()); + Set fileNames = dataFiles.stream().map(HoodieDataFile::getFileName).collect(Collectors.toSet()); + assertTrue("Expect orphan data-file to be present", fileNames.contains(orphanDataFileName)); + assertTrue("Expect inflight data-file to be present", fileNames.contains(inflightDataFileName)); + if (!skipCreatingDataFile) { + assertTrue("Expect old committed data-file", fileNames.contains(dataFileName)); + } + + if (isCompactionInFlight) { + assertTrue("Expect inflight compacted data file to be present", fileNames.contains(compactDataFileName)); + } + + fileSliceList = getLatestRawFileSlices(partitionPath).collect(Collectors.toList()); + assertEquals("Expect both inflight and orphan file-slice to be included", + 5, fileSliceList.size()); + Map fileSliceMap = + fileSliceList.stream().collect(Collectors.toMap(FileSlice::getFileId, r -> r)); + FileSlice orphanFileSliceWithDataFile = fileSliceMap.get(orphanFileId1); + FileSlice orphanFileSliceWithLogFile = fileSliceMap.get(orphanFileId2); + FileSlice inflightFileSliceWithDataFile = fileSliceMap.get(inflightFileId1); + FileSlice inflightFileSliceWithLogFile = fileSliceMap.get(inflightFileId2); + + assertEquals("Orphan File Slice with data-file check base-commit", invalidInstantId, + orphanFileSliceWithDataFile.getBaseInstantTime()); + assertEquals("Orphan File Slice with data-file check data-file", orphanDataFileName, + orphanFileSliceWithDataFile.getDataFile().get().getFileName()); + assertEquals("Orphan File Slice with data-file check data-file", 0, + orphanFileSliceWithDataFile.getLogFiles().count()); + assertEquals("Inflight File Slice with data-file check base-commit", inflightDeltaInstantTime, + inflightFileSliceWithDataFile.getBaseInstantTime()); + assertEquals("Inflight File Slice with data-file check data-file", inflightDataFileName, + inflightFileSliceWithDataFile.getDataFile().get().getFileName()); + assertEquals("Inflight File Slice with data-file check data-file", 0, + inflightFileSliceWithDataFile.getLogFiles().count()); + assertEquals("Orphan File Slice with log-file check base-commit", invalidInstantId, + orphanFileSliceWithLogFile.getBaseInstantTime()); + assertFalse("Orphan File Slice with log-file check data-file", + orphanFileSliceWithLogFile.getDataFile().isPresent()); + logFiles = orphanFileSliceWithLogFile.getLogFiles().collect(Collectors.toList()); + assertEquals("Orphan File Slice with log-file check data-file", 1, logFiles.size()); + assertEquals("Orphan File Slice with log-file check data-file", orphanLogFileName, + logFiles.get(0).getFileName()); + assertEquals("Inflight File Slice with log-file check base-commit", inflightDeltaInstantTime, + inflightFileSliceWithLogFile.getBaseInstantTime()); + assertFalse("Inflight File Slice with log-file check data-file", + inflightFileSliceWithLogFile.getDataFile().isPresent()); + logFiles = inflightFileSliceWithLogFile.getLogFiles().collect(Collectors.toList()); + assertEquals("Inflight File Slice with log-file check data-file", 1, logFiles.size()); + assertEquals("Inflight File Slice with log-file check data-file", inflightLogFileName, + logFiles.get(0).getFileName()); + + // Now simulate Compaction completing - Check the view + if (!isCompactionInFlight) { + // For inflight compaction, we already create a data-file to test concurrent inflight case. + // If we skipped creating data file corresponding to compaction commit, create it now + new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile(); + } + if (isCompactionInFlight) { + commitTimeline.deleteInflight(compactionInstant); + } else { + commitTimeline.deleteCompactionRequested(compactionInstant); + } + compactionInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime); + commitTimeline.saveAsComplete(compactionInstant, Optional.empty()); + refreshFsView(null); + // populate the cache + roView.getAllDataFiles(partitionPath); + + fileSliceList = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList()); + dataFiles = fileSliceList.stream().map(FileSlice::getDataFile) + .filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()); + System.out.println("fileSliceList : " + fileSliceList); + assertEquals("Expect only one data-files in latest view as there is only one file-group", 1, dataFiles.size()); + assertEquals("Data Filename must match", compactDataFileName, dataFiles.get(0).getFileName()); + assertEquals("Only one latest file-slice in the partition", 1, fileSliceList.size()); + fileSlice = fileSliceList.get(0); + assertEquals("Check file-Id is set correctly", fileId, fileSlice.getFileId()); + assertEquals("Check data-filename is set correctly", + compactDataFileName, fileSlice.getDataFile().get().getFileName()); + assertEquals("Ensure base-instant is now compaction request instant", + compactionRequestedTime, fileSlice.getBaseInstantTime()); + logFiles = fileSlice.getLogFiles().collect(Collectors.toList()); + assertEquals("Only log-files after compaction request shows up", 2, logFiles.size()); + assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName()); + assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName()); + + /** Data Files API tests */ + dataFiles = roView.getLatestDataFiles().collect(Collectors.toList()); + assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); + dataFiles.stream().forEach(df -> { + assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), + compactionRequestedTime); + }); + dataFiles = roView.getLatestDataFiles(partitionPath).collect(Collectors.toList()); + assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); + dataFiles.stream().forEach(df -> { + assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), + compactionRequestedTime); + }); + dataFiles = roView.getLatestDataFilesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList()); + assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); + dataFiles.stream().forEach(df -> { + assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), + compactionRequestedTime); + }); + dataFiles = roView.getLatestDataFilesInRange(allInstantTimes).collect(Collectors.toList()); + assertEquals("Expect only one data-file to be sent", 1, dataFiles.size()); + dataFiles.stream().forEach(df -> { + assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(), + compactionRequestedTime); + }); + } + @Test public void testGetLatestDataFilesForFileId() throws IOException { String partitionPath = "2016/05/01"; @@ -328,15 +735,15 @@ public class HoodieTableFileSystemViewTest { assertEquals(3, slices.size()); for (FileSlice slice : slices) { if (slice.getFileId().equals(fileId1)) { - assertEquals(slice.getBaseCommitTime(), commitTime3); + assertEquals(slice.getBaseInstantTime(), commitTime3); assertTrue(slice.getDataFile().isPresent()); assertEquals(slice.getLogFiles().count(), 0); } else if (slice.getFileId().equals(fileId2)) { - assertEquals(slice.getBaseCommitTime(), commitTime4); + assertEquals(slice.getBaseInstantTime(), commitTime4); assertFalse(slice.getDataFile().isPresent()); assertEquals(slice.getLogFiles().count(), 1); } else if (slice.getFileId().equals(fileId3)) { - assertEquals(slice.getBaseCommitTime(), commitTime4); + assertEquals(slice.getBaseInstantTime(), commitTime4); assertTrue(slice.getDataFile().isPresent()); assertEquals(slice.getLogFiles().count(), 0); } @@ -433,17 +840,17 @@ public class HoodieTableFileSystemViewTest { List slices = fileGroup.getAllFileSlices().collect(Collectors.toList()); if (fileGroup.getId().equals(fileId1)) { assertEquals(2, slices.size()); - assertEquals(commitTime4, slices.get(0).getBaseCommitTime()); - assertEquals(commitTime1, slices.get(1).getBaseCommitTime()); + assertEquals(commitTime4, slices.get(0).getBaseInstantTime()); + assertEquals(commitTime1, slices.get(1).getBaseInstantTime()); } else if (fileGroup.getId().equals(fileId2)) { assertEquals(3, slices.size()); - assertEquals(commitTime3, slices.get(0).getBaseCommitTime()); - assertEquals(commitTime2, slices.get(1).getBaseCommitTime()); - assertEquals(commitTime1, slices.get(2).getBaseCommitTime()); + assertEquals(commitTime3, slices.get(0).getBaseInstantTime()); + assertEquals(commitTime2, slices.get(1).getBaseInstantTime()); + assertEquals(commitTime1, slices.get(2).getBaseInstantTime()); } else if (fileGroup.getId().equals(fileId3)) { assertEquals(2, slices.size()); - assertEquals(commitTime4, slices.get(0).getBaseCommitTime()); - assertEquals(commitTime3, slices.get(1).getBaseCommitTime()); + assertEquals(commitTime4, slices.get(0).getBaseInstantTime()); + assertEquals(commitTime3, slices.get(1).getBaseInstantTime()); } } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java index a6f12b8f9..e423bd018 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieException; @@ -108,7 +109,13 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf .getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath); try { - Stream latestFileSlices = fsView.getLatestFileSlices(relPartitionPath); + // Both commit and delta-commits are included - pick the latest completed one + Optional latestCompletedInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); + + Stream latestFileSlices = latestCompletedInstant.map(instant -> + fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp())) + .orElse(Stream.empty()); // subgroup splits again by file id & match with log files. Map> groupedInputSplits = partitionsToParquetSplits