1
0

FileSystemView and Timeline level changes to support Async Compaction

This commit is contained in:
Balaji Varadarajan
2018-05-23 16:54:53 -07:00
committed by vinoth chandar
parent 44caf0d40c
commit 6d01ae8ca0
20 changed files with 892 additions and 132 deletions

View File

@@ -398,7 +398,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
});
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
Optional<HoodieInstant> instant = activeTimeline.filterInflights().lastInstant();
Optional<HoodieInstant> instant = activeTimeline.filterInflightsExcludingCompaction().lastInstant();
activeTimeline.saveToInflight(instant.get(),
Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException io) {
@@ -692,7 +692,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline();
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsAndCompactionTimeline();
HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION,
savepointTime);
@@ -709,8 +709,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
rollback(commitsToRollback);
// Make sure the rollback was successful
Optional<HoodieInstant> lastInstant = activeTimeline.reload().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
Optional<HoodieInstant> lastInstant = activeTimeline.reload().getCommitsAndCompactionTimeline()
.filterCompletedAndCompactionInstants().lastInstant();
Preconditions.checkArgument(lastInstant.isPresent());
Preconditions.checkArgument(lastInstant.get().getTimestamp().equals(savepointTime),
savepointTime + "is not the last commit after rolling back " + commitsToRollback
@@ -1051,7 +1051,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
private void rollbackInflightCommits() {
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterInflights();
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterInflightsExcludingCompaction();
List<String> commits = inflightTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
Collections.reverse(commits);

View File

@@ -113,7 +113,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
.filter(fileSlice1 -> fileSlice1.getFileId().equals(fileId)).findFirst();
String baseInstantTime = commitTime;
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseCommitTime();
baseInstantTime = fileSlice.get().getBaseInstantTime();
} else {
// This means there is no base data file, start appending to a new log file
fileSlice = Optional.of(new FileSlice(baseInstantTime, this.fileId));

View File

@@ -182,7 +182,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
.getLatestFileSlices(partitionPath).map(
s -> {
List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile
.getLogVersionComparator().reversed()).collect(Collectors.toList());
.getBaseInstantAndLogVersionComparator().reversed()).collect(Collectors.toList());
totalLogFiles.add((long) logFiles.size());
totalFileSlices.add(1L);
return new CompactionOperation(s.getDataFile(), partitionPath, logFiles, config);

View File

@@ -230,7 +230,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
// This needs to be done since GlobalIndex at the moment does not store the latest commit time
Map<String, String> fileIdToLatestCommitTimeMap =
hoodieIndex.isGlobal() ? this.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseCommitTime)) : null;
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)) : null;
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
.filter(wStat -> {
if (wStat != null
@@ -341,7 +341,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
// TODO - check if index.isglobal then small files are log files too
Optional<FileSlice> smallFileSlice = getRTFileSystemView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter(
// Use the merged file-slice for small file selection
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).filter(
fileSlice -> fileSlice.getLogFiles().count() < 1
&& fileSlice.getDataFile().get().getFileSize() < config
.getParquetSmallFileLimit()).sorted((FileSlice left, FileSlice right) ->

View File

@@ -119,7 +119,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* Get the real time view of the file system for this table
*/
public TableFileSystemView.RealtimeView getRTFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
return new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants());
}
/**
@@ -140,7 +141,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* Get only the inflights (no-completed) commit timeline
*/
public HoodieTimeline getInflightCommitTimeline() {
return metaClient.getCommitsTimeline().filterInflights();
return metaClient.getCommitsTimeline().filterInflightsExcludingCompaction();
}
/**

View File

@@ -595,8 +595,9 @@ public class TestMergeOnReadTable {
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
List<HoodieDataFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
dataFilesList.size() > 0);
/**
* Write 2 (only updates + inserts, written to .log file + correction of existing parquet
@@ -624,7 +625,8 @@ public class TestMergeOnReadTable {
roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = roView.getLatestDataFiles();
Map<String, Long> parquetFileIdToNewSize = dataFilesToRead.collect(
List<HoodieDataFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
Map<String, Long> parquetFileIdToNewSize = newDataFilesList.stream().collect(
Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize));
assertTrue(parquetFileIdToNewSize.entrySet().stream()

View File

@@ -37,7 +37,7 @@ public class FileSlice implements Serializable {
/**
* Point in the timeline, at which the slice was created
*/
private String baseCommitTime;
private String baseInstantTime;
/**
* data file, with the compacted data, for this slice
@@ -50,11 +50,11 @@ public class FileSlice implements Serializable {
*/
private final TreeSet<HoodieLogFile> logFiles;
public FileSlice(String baseCommitTime, String fileId) {
public FileSlice(String baseInstantTime, String fileId) {
this.fileId = fileId;
this.baseCommitTime = baseCommitTime;
this.baseInstantTime = baseInstantTime;
this.dataFile = null;
this.logFiles = new TreeSet<>(HoodieLogFile.getLogVersionComparator());
this.logFiles = new TreeSet<>(HoodieLogFile.getBaseInstantAndLogVersionComparator());
}
public void setDataFile(HoodieDataFile dataFile) {
@@ -69,8 +69,8 @@ public class FileSlice implements Serializable {
return logFiles.stream();
}
public String getBaseCommitTime() {
return baseCommitTime;
public String getBaseInstantTime() {
return baseInstantTime;
}
public String getFileId() {
@@ -84,7 +84,7 @@ public class FileSlice implements Serializable {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("FileSlice {");
sb.append("baseCommitTime=").append(baseCommitTime);
sb.append("baseCommitTime=").append(baseInstantTime);
sb.append(", dataFile='").append(dataFile).append('\'');
sb.append(", logFiles='").append(logFiles).append('\'');
sb.append('}');

View File

@@ -72,6 +72,16 @@ public class HoodieFileGroup implements Serializable {
this.lastInstant = timeline.lastInstant();
}
/**
* Potentially add a new file-slice by adding base-instant time
* A file-slice without any data-file and log-files can exist (if a compaction just got requested)
*/
public void addNewFileSliceAtInstant(String baseInstantTime) {
if (!fileSlices.containsKey(baseInstantTime)) {
fileSlices.put(baseInstantTime, new FileSlice(baseInstantTime, id));
}
}
/**
* Add a new datafile into the file group
*/
@@ -106,13 +116,27 @@ public class HoodieFileGroup implements Serializable {
*/
private boolean isFileSliceCommitted(FileSlice slice) {
String maxCommitTime = lastInstant.get().getTimestamp();
return timeline.containsOrBeforeTimelineStarts(slice.getBaseCommitTime())
&& HoodieTimeline.compareTimestamps(slice.getBaseCommitTime(),
return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime())
&& HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(),
maxCommitTime,
HoodieTimeline.LESSER_OR_EQUAL);
}
/**
* Get all the the file slices including in-flight ones as seen in underlying file-system
*/
public Stream<FileSlice> getAllFileSlicesIncludingInflight() {
return fileSlices.entrySet().stream().map(sliceEntry -> sliceEntry.getValue());
}
/**
* Get latest file slices including in-flight ones
*/
public Optional<FileSlice> getLatestFileSlicesIncludingInflight() {
return getAllFileSlicesIncludingInflight().findFirst();
}
/**
* Provides a stream of committed file slices, sorted reverse base commit time.
*/
@@ -141,15 +165,29 @@ public class HoodieFileGroup implements Serializable {
public Optional<FileSlice> getLatestFileSliceBeforeOrOn(String maxCommitTime) {
return getAllFileSlices()
.filter(slice ->
HoodieTimeline.compareTimestamps(slice.getBaseCommitTime(),
HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(),
maxCommitTime,
HoodieTimeline.LESSER_OR_EQUAL))
.findFirst();
}
/**
* Obtain the latest file slice, upto a commitTime i.e < maxInstantTime
* @param maxInstantTime Max Instant Time
* @return
*/
public Optional<FileSlice> getLatestFileSliceBefore(String maxInstantTime) {
return getAllFileSlices()
.filter(slice ->
HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(),
maxInstantTime,
HoodieTimeline.LESSER))
.findFirst();
}
public Optional<FileSlice> getLatestFileSliceInRange(List<String> commitRange) {
return getAllFileSlices()
.filter(slice -> commitRange.contains(slice.getBaseCommitTime()))
.filter(slice -> commitRange.contains(slice.getBaseInstantTime()))
.findFirst();
}
@@ -162,47 +200,6 @@ public class HoodieFileGroup implements Serializable {
.map(slice -> slice.getDataFile().get());
}
/**
* Get the latest committed data file
*/
public Optional<HoodieDataFile> getLatestDataFile() {
return getAllDataFiles().findFirst();
}
/**
* Get the latest data file, that is <= max commit time
*/
public Optional<HoodieDataFile> getLatestDataFileBeforeOrOn(String maxCommitTime) {
return getAllDataFiles()
.filter(dataFile ->
HoodieTimeline.compareTimestamps(dataFile.getCommitTime(),
maxCommitTime,
HoodieTimeline.LESSER_OR_EQUAL))
.findFirst();
}
/**
* Get the latest data file, that is contained within the provided commit range.
*/
public Optional<HoodieDataFile> getLatestDataFileInRange(List<String> commitRange) {
return getAllDataFiles()
.filter(dataFile -> commitRange.contains(dataFile.getCommitTime()))
.findFirst();
}
/**
* Obtain the latest log file (based on latest committed data file), currently being appended to
*
* @return logfile if present, empty if no log file has been opened already.
*/
public Optional<HoodieLogFile> getLatestLogFile() {
Optional<FileSlice> latestSlice = getLatestFileSlice();
if (latestSlice.isPresent() && latestSlice.get().getLogFiles().count() > 0) {
return latestSlice.get().getLogFiles().findFirst();
}
return Optional.empty();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieFileGroup {");

View File

@@ -94,10 +94,16 @@ public class HoodieLogFile implements Serializable {
FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion)));
}
public static Comparator<HoodieLogFile> getLogVersionComparator() {
public static Comparator<HoodieLogFile> getBaseInstantAndLogVersionComparator() {
return (o1, o2) -> {
// reverse the order
return new Integer(o2.getLogVersion()).compareTo(o1.getLogVersion());
String baseInstantTime1 = o1.getBaseCommitTime();
String baseInstantTime2 = o2.getBaseCommitTime();
if (baseInstantTime1.equals(baseInstantTime2)) {
// reverse the order by log-version when base-commit is same
return new Integer(o2.getLogVersion()).compareTo(o1.getLogVersion());
}
// reverse the order by base-commits
return new Integer(baseInstantTime2.compareTo(baseInstantTime1));
};
}

View File

@@ -274,6 +274,23 @@ public class HoodieTableMetaClient implements Serializable {
}
}
/**
* Get the commit + pending-compaction timeline visible for this table.
* A RT filesystem view is constructed with this timeline so that file-slice after pending compaction-requested
* instant-time is also considered valid. A RT file-system view for reading must then merge the file-slices before
* and after pending compaction instant so that all delta-commits are read.
*/
public HoodieTimeline getCommitsAndCompactionTimeline() {
switch (this.getTableType()) {
case COPY_ON_WRITE:
return getActiveTimeline().getCommitTimeline();
case MERGE_ON_READ:
return getActiveTimeline().getCommitsAndCompactionTimeline();
default:
throw new HoodieException("Unsupported table type :" + this.getTableType());
}
}
/**
* Get the compacted commit timeline visible for this table
*/

View File

@@ -18,10 +18,12 @@ package com.uber.hoodie.common.table;
import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import java.io.Serializable;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
/**
* HoodieTimeline is a view of meta-data instants in the hoodie dataset. Instants are specific
@@ -42,6 +44,10 @@ public interface HoodieTimeline extends Serializable {
String ROLLBACK_ACTION = "rollback";
String SAVEPOINT_ACTION = "savepoint";
String INFLIGHT_EXTENSION = ".inflight";
// With Async Compaction, compaction instant can be in 3 states :
// (compaction-requested), (compaction-inflight), (completed)
String COMPACTION_ACTION = "compaction";
String REQUESTED_EXTENSION = ".requested";
String COMMIT_EXTENSION = "." + COMMIT_ACTION;
String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION;
@@ -54,6 +60,12 @@ public interface HoodieTimeline extends Serializable {
String INFLIGHT_CLEAN_EXTENSION = "." + CLEAN_ACTION + INFLIGHT_EXTENSION;
String INFLIGHT_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + INFLIGHT_EXTENSION;
String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION;
String REQUESTED_COMPACTION_SUFFIX =
StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION);
String REQUESTED_COMPACTION_EXTENSION =
StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX);
String INFLIGHT_COMPACTION_EXTENSION =
StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION);
/**
* Filter this timeline to just include the in-flights
@@ -62,6 +74,13 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterInflights();
/**
* Filter this timeline to just include the in-flights excluding compaction instants
*
* @return New instance of HoodieTimeline with just in-flights excluding compaction inflights
*/
HoodieTimeline filterInflightsExcludingCompaction();
/**
* Filter this timeline to just include the completed instants
*
@@ -69,6 +88,20 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterCompletedInstants();
/**
* Filter this timeline to just include the completed + compaction (inflight + requested) instants
* A RT filesystem view is constructed with this timeline so that file-slice after pending compaction-requested
* instant-time is also considered valid. A RT file-system view for reading must then merge the file-slices before
* and after pending compaction instant so that all delta-commits are read.
* @return New instance of HoodieTimeline with just completed instants
*/
HoodieTimeline filterCompletedAndCompactionInstants();
/**
* Filter this timeline to just include inflight and requested compaction instants
* @return
*/
HoodieTimeline filterPendingCompactionTimeline();
/**
* Create a new Timeline with instants after startTs and before or on endTs
@@ -157,45 +190,60 @@ public interface HoodieTimeline extends Serializable {
return new HoodieInstant(false, instant.getAction(), instant.getTimestamp());
}
static HoodieInstant getCompactionRequestedInstant(final String timestamp) {
return new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, timestamp);
}
static HoodieInstant getCompactionInflightInstant(final String timestamp) {
return new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, timestamp);
}
static HoodieInstant getInflightInstant(final HoodieInstant instant) {
return new HoodieInstant(true, instant.getAction(), instant.getTimestamp());
}
static String makeCommitFileName(String commitTime) {
return commitTime + HoodieTimeline.COMMIT_EXTENSION;
return StringUtils.join(commitTime, HoodieTimeline.COMMIT_EXTENSION);
}
static String makeInflightCommitFileName(String commitTime) {
return commitTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
return StringUtils.join(commitTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION);
}
static String makeCleanerFileName(String instant) {
return instant + HoodieTimeline.CLEAN_EXTENSION;
return StringUtils.join(instant, HoodieTimeline.CLEAN_EXTENSION);
}
static String makeInflightCleanerFileName(String instant) {
return instant + HoodieTimeline.INFLIGHT_CLEAN_EXTENSION;
return StringUtils.join(instant, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION);
}
static String makeRollbackFileName(String instant) {
return instant + HoodieTimeline.ROLLBACK_EXTENSION;
return StringUtils.join(instant, HoodieTimeline.ROLLBACK_EXTENSION);
}
static String makeInflightRollbackFileName(String instant) {
return instant + HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION;
return StringUtils.join(instant, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
}
static String makeInflightSavePointFileName(String commitTime) {
return commitTime + HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION;
return StringUtils.join(commitTime, HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION);
}
static String makeSavePointFileName(String commitTime) {
return commitTime + HoodieTimeline.SAVEPOINT_EXTENSION;
return StringUtils.join(commitTime, HoodieTimeline.SAVEPOINT_EXTENSION);
}
static String makeInflightDeltaFileName(String commitTime) {
return commitTime + HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION;
return StringUtils.join(commitTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
}
static String makeInflightCompactionFileName(String commitTime) {
return StringUtils.join(commitTime, HoodieTimeline.INFLIGHT_COMPACTION_EXTENSION);
}
static String makeRequestedCompactionFileName(String commitTime) {
return StringUtils.join(commitTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION);
}
static String makeDeltaFileName(String commitTime) {
@@ -211,8 +259,6 @@ public interface HoodieTimeline extends Serializable {
}
static String makeFileNameAsInflight(String fileName) {
return fileName + HoodieTimeline.INFLIGHT_EXTENSION;
return StringUtils.join(fileName, HoodieTimeline.INFLIGHT_EXTENSION);
}
}

View File

@@ -73,6 +73,12 @@ public interface TableFileSystemView {
*/
Stream<FileSlice> getLatestFileSlices(String partitionPath);
/**
* Stream all the latest uncompacted file slices in the given partition
*/
Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath);
/**
* Stream all the latest file slices in the given partition with precondition that
* commitTime(file) before maxCommitTime
@@ -80,6 +86,16 @@ public interface TableFileSystemView {
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime);
/**
* Stream all "merged" file-slices before on an instant time
* If a file-group has a pending compaction request, the file-slice before and after compaction request instant
* is merged and returned.
* @param partitionPath Partition Path
* @param maxInstantTime Max Instant Time
* @return
*/
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime);
/**
* Stream all the latest file slices, in the given range
*/

View File

@@ -64,6 +64,7 @@ public abstract class AbstractHoodieLogRecordScanner {
// Reader schema for the records
private final Schema readerSchema;
// Latest valid instant time
// Log-Blocks belonging to inflight delta-instants are filtered-out using this high-watermark.
private final String latestInstantTime;
private final HoodieTableMetaClient hoodieTableMetaClient;
// Merge strategy to use when combining records from log

View File

@@ -95,7 +95,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
this(metaClient,
new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION,
CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION});
CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION});
}
/**
@@ -118,19 +118,31 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
/**
* Get all instants (commits, delta commits) that produce new data, in the active timeline *
*
*/
public HoodieTimeline getCommitsTimeline() {
return getTimelineOfActions(
Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION));
}
/**
* Get all instants (commits, delta commits, in-flight/request compaction) that produce new data, in the active
* timeline *
* With Async compaction a requested/inflight compaction-instant is a valid baseInstant for a file-slice as there
* could be delta-commits with that baseInstant.
*/
public HoodieTimeline getCommitsAndCompactionTimeline() {
return getTimelineOfActions(
Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION));
}
/**
* Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions,
* in the active timeline *
*/
public HoodieTimeline getAllCommitsTimeline() {
return getTimelineOfActions(
Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION,
Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION,
SAVEPOINT_ACTION, ROLLBACK_ACTION));
}
@@ -200,7 +212,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
log.info("Marking instant complete " + instant);
Preconditions.checkArgument(instant.isInflight(),
"Could not mark an already completed instant as complete again " + instant);
moveInflightToComplete(instant, HoodieTimeline.getCompletedInstant(instant), data);
transitionState(instant, HoodieTimeline.getCompletedInstant(instant), data);
log.info("Completed " + instant);
}
@@ -211,7 +223,18 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
public void deleteInflight(HoodieInstant instant) {
log.info("Deleting in-flight " + instant);
Preconditions.checkArgument(instant.isInflight());
deleteInstantFile(instant);
}
public void deleteCompactionRequested(HoodieInstant instant) {
Preconditions.checkArgument(instant.isRequested());
Preconditions.checkArgument(instant.getAction() == HoodieTimeline.COMPACTION_ACTION);
deleteInstantFile(instant);
}
private void deleteInstantFile(HoodieInstant instant) {
log.info("Deleting instant " + instant);
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
try {
boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false);
@@ -232,24 +255,43 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return readDataFromPath(detailPath);
}
protected void moveInflightToComplete(HoodieInstant inflight, HoodieInstant completed,
public void revertFromInflightToRequested(HoodieInstant inflightInstant, HoodieInstant requestedInstant,
Optional<byte[]> data) {
Path commitFilePath = new Path(metaClient.getMetaPath(), completed.getFileName());
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
transitionState(inflightInstant, requestedInstant, data);
}
public void transitionFromRequestedToInflight(HoodieInstant requestedInstant, HoodieInstant inflightInstant,
Optional<byte[]> data) {
Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
transitionState(requestedInstant, inflightInstant, data);
}
protected void moveInflightToComplete(HoodieInstant inflightInstant, HoodieInstant commitInstant,
Optional<byte[]> data) {
transitionState(inflightInstant, commitInstant, data);
}
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant,
Optional<byte[]> data) {
Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
Path commitFilePath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
try {
// open a new file and write the commit metadata in
Path inflightCommitFile = new Path(metaClient.getMetaPath(), inflight.getFileName());
createFileInMetaPath(inflight.getFileName(), data);
Path inflightCommitFile = new Path(metaClient.getMetaPath(), fromInstant.getFileName());
createFileInMetaPath(fromInstant.getFileName(), data);
boolean success = metaClient.getFs().rename(inflightCommitFile, commitFilePath);
if (!success) {
throw new HoodieIOException(
"Could not rename " + inflightCommitFile + " to " + commitFilePath);
}
} catch (IOException e) {
throw new HoodieIOException("Could not complete " + inflight, e);
throw new HoodieIOException("Could not complete " + fromInstant, e);
}
}
protected void moveCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
Preconditions.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp()));
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName());
try {
if (!metaClient.getFs().exists(inFlightCommitFilePath)) {
@@ -269,6 +311,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
createFileInMetaPath(instant.getFileName(), content);
}
public void saveToRequested(HoodieInstant instant, Optional<byte[]> content) {
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
createFileInMetaPath(instant.getFileName(), content);
}
protected void createFileInMetaPath(String filename, Optional<byte[]> content) {
Path fullPath = new Path(metaClient.getMetaPath(), filename);
try {

View File

@@ -53,15 +53,38 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
public HoodieDefaultTimeline() {
}
@Override
public HoodieTimeline filterInflights() {
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight),
details);
}
@Override
public HoodieTimeline filterInflightsExcludingCompaction() {
return new HoodieDefaultTimeline(instants.stream().filter(instant -> {
return instant.isInflight() && (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
}), details);
}
@Override
public HoodieTimeline filterCompletedInstants() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> !s.isInflight()), details);
}
@Override
public HoodieTimeline filterCompletedAndCompactionInstants() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> {
return !s.isInflight() || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION);
}), details);
}
@Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(
instants.stream().filter(s -> s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)),
details);
}
@Override
public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) {
return new HoodieDefaultTimeline(instants.stream().filter(

View File

@@ -30,7 +30,19 @@ import org.apache.hadoop.fs.FileStatus;
*/
public class HoodieInstant implements Serializable {
private boolean isInflight = false;
/**
* Instant State
*/
public enum State {
// Requested State (valid state for Compaction)
REQUESTED,
// Inflight instant
INFLIGHT,
// Committed instant
COMPLETED
}
private State state = State.COMPLETED;
private String action;
private String timestamp;
@@ -49,21 +61,35 @@ public class HoodieInstant implements Serializable {
// This is to support backwards compatibility on how in-flight commit files were written
// General rule is inflight extension is .<action>.inflight, but for commit it is .inflight
action = "commit";
isInflight = true;
state = State.INFLIGHT;
} else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) {
isInflight = true;
state = State.INFLIGHT;
action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, "");
} else if (action.equals(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)) {
state = State.REQUESTED;
action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, "");
}
}
public HoodieInstant(boolean isInflight, String action, String timestamp) {
this.isInflight = isInflight;
//TODO: vb - Preserving for avoiding cascading changes. This constructor will be updated in subsequent PR
this.state = isInflight ? State.INFLIGHT : State.COMPLETED;
this.action = action;
this.timestamp = timestamp;
}
public HoodieInstant(State state, String action, String timestamp) {
this.state = state;
this.action = action;
this.timestamp = timestamp;
}
public boolean isInflight() {
return isInflight;
return state == State.INFLIGHT;
}
public boolean isRequested() {
return state == State.REQUESTED;
}
public String getAction() {
@@ -79,20 +105,28 @@ public class HoodieInstant implements Serializable {
*/
public String getFileName() {
if (HoodieTimeline.COMMIT_ACTION.equals(action)) {
return isInflight ? HoodieTimeline.makeInflightCommitFileName(timestamp)
return isInflight() ? HoodieTimeline.makeInflightCommitFileName(timestamp)
: HoodieTimeline.makeCommitFileName(timestamp);
} else if (HoodieTimeline.CLEAN_ACTION.equals(action)) {
return isInflight ? HoodieTimeline.makeInflightCleanerFileName(timestamp)
return isInflight() ? HoodieTimeline.makeInflightCleanerFileName(timestamp)
: HoodieTimeline.makeCleanerFileName(timestamp);
} else if (HoodieTimeline.ROLLBACK_ACTION.equals(action)) {
return isInflight ? HoodieTimeline.makeInflightRollbackFileName(timestamp)
return isInflight() ? HoodieTimeline.makeInflightRollbackFileName(timestamp)
: HoodieTimeline.makeRollbackFileName(timestamp);
} else if (HoodieTimeline.SAVEPOINT_ACTION.equals(action)) {
return isInflight ? HoodieTimeline.makeInflightSavePointFileName(timestamp)
return isInflight() ? HoodieTimeline.makeInflightSavePointFileName(timestamp)
: HoodieTimeline.makeSavePointFileName(timestamp);
} else if (HoodieTimeline.DELTA_COMMIT_ACTION.equals(action)) {
return isInflight ? HoodieTimeline.makeInflightDeltaFileName(timestamp)
return isInflight() ? HoodieTimeline.makeInflightDeltaFileName(timestamp)
: HoodieTimeline.makeDeltaFileName(timestamp);
} else if (HoodieTimeline.COMPACTION_ACTION.equals(action)) {
if (isInflight()) {
return HoodieTimeline.makeInflightCompactionFileName(timestamp);
} else if (isRequested()) {
return HoodieTimeline.makeRequestedCompactionFileName(timestamp);
} else {
return HoodieTimeline.makeCommitFileName(timestamp);
}
}
throw new IllegalArgumentException("Cannot get file name for unknown action " + action);
}
@@ -106,18 +140,18 @@ public class HoodieInstant implements Serializable {
return false;
}
HoodieInstant that = (HoodieInstant) o;
return isInflight == that.isInflight
return state == that.state
&& Objects.equals(action, that.action)
&& Objects.equals(timestamp, that.timestamp);
}
@Override
public int hashCode() {
return Objects.hash(isInflight, action, timestamp);
return Objects.hash(state, action, timestamp);
}
@Override
public String toString() {
return "[" + ((isInflight) ? "==>" : "") + timestamp + "__" + action + "]";
return "[" + ((isInflight() || isRequested()) ? "==>" : "") + timestamp + "__" + action + "__" + state + "]";
}
}

View File

@@ -64,6 +64,11 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
// mapping from file id to the file group.
protected HashMap<String, HoodieFileGroup> fileGroupMap;
/**
* File Id to pending compaction instant time
*/
private final Map<String, String> fileIdToPendingCompactionInstantTime;
/**
* Create a file system view, as of the given timeline
*/
@@ -73,6 +78,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
this.visibleActiveTimeline = visibleActiveTimeline;
this.fileGroupMap = new HashMap<>();
this.partitionToFileGroupsMap = new HashMap<>();
//TODO: vb Will be implemented in next PR
this.fileIdToPendingCompactionInstantTime = new HashMap<>();
}
@@ -128,14 +135,19 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
List<HoodieFileGroup> fileGroups = new ArrayList<>();
fileIdSet.forEach(pair -> {
HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), pair.getValue(),
visibleActiveTimeline);
String fileId = pair.getValue();
HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, visibleActiveTimeline);
if (dataFiles.containsKey(pair)) {
dataFiles.get(pair).forEach(dataFile -> group.addDataFile(dataFile));
}
if (logFiles.containsKey(pair)) {
logFiles.get(pair).forEach(logFile -> group.addLogFile(logFile));
}
if (fileIdToPendingCompactionInstantTime.containsKey(fileId)) {
// If there is no delta-commit after compaction request, this step would ensure a new file-slice appears
// so that any new ingestion uses the correct base-instant
group.addNewFileSliceAtInstant(fileIdToPendingCompactionInstantTime.get(fileId));
}
fileGroups.add(group);
});
@@ -165,19 +177,37 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
return Arrays.stream(statuses).filter(rtFilePredicate).map(HoodieLogFile::new);
}
/**
* With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore
* those data-files
*
* @param dataFile Data File
*/
private boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) {
String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(dataFile.getFileId());
if ((null != compactionInstantTime) && dataFile.getCommitTime().equals(compactionInstantTime)) {
return true;
}
return false;
}
@Override
public Stream<HoodieDataFile> getLatestDataFiles(final String partitionPath) {
return getAllFileGroups(partitionPath)
.map(fileGroup -> fileGroup.getLatestDataFile())
.filter(dataFileOpt -> dataFileOpt.isPresent())
.map(fileGroup -> {
return fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst();
})
.filter(Optional::isPresent)
.map(Optional::get);
}
@Override
public Stream<HoodieDataFile> getLatestDataFiles() {
return fileGroupMap.values().stream()
.map(fileGroup -> fileGroup.getLatestDataFile())
.filter(dataFileOpt -> dataFileOpt.isPresent())
.map(fileGroup -> {
return fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst();
})
.filter(Optional::isPresent)
.map(Optional::get);
}
@@ -185,16 +215,29 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
public Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath,
String maxCommitTime) {
return getAllFileGroups(partitionPath)
.map(fileGroup -> fileGroup.getLatestDataFileBeforeOrOn(maxCommitTime))
.filter(dataFileOpt -> dataFileOpt.isPresent())
.map(fileGroup -> {
return fileGroup.getAllDataFiles()
.filter(dataFile ->
HoodieTimeline.compareTimestamps(dataFile.getCommitTime(),
maxCommitTime,
HoodieTimeline.LESSER_OR_EQUAL))
.filter(df -> !isDataFileDueToPendingCompaction(df))
.findFirst();
})
.filter(Optional::isPresent)
.map(Optional::get);
}
@Override
public Stream<HoodieDataFile> getLatestDataFilesInRange(List<String> commitsToReturn) {
return fileGroupMap.values().stream()
.map(fileGroup -> fileGroup.getLatestDataFileInRange(commitsToReturn))
.filter(dataFileOpt -> dataFileOpt.isPresent())
.map(fileGroup -> {
return fileGroup.getAllDataFiles()
.filter(dataFile -> commitsToReturn.contains(dataFile.getCommitTime())
&& !isDataFileDueToPendingCompaction(dataFile))
.findFirst();
})
.filter(Optional::isPresent)
.map(Optional::get);
}
@@ -202,23 +245,125 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
public Stream<HoodieDataFile> getAllDataFiles(String partitionPath) {
return getAllFileGroups(partitionPath)
.map(fileGroup -> fileGroup.getAllDataFiles())
.flatMap(dataFileList -> dataFileList);
.flatMap(dataFileList -> dataFileList)
.filter(df -> !isDataFileDueToPendingCompaction(df));
}
@Override
public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
return getAllFileGroups(partitionPath)
.map(fileGroup -> fileGroup.getLatestFileSlice())
.filter(dataFileOpt -> dataFileOpt.isPresent())
.filter(Optional::isPresent)
.map(Optional::get)
.map(this::filterDataFileAfterPendingCompaction);
}
@Override
public Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath) {
return getAllFileGroups(partitionPath)
.map(fileGroup -> {
FileSlice fileSlice = fileGroup.getLatestFileSlice().get();
// if the file-group is under compaction, pick the latest before compaction instant time.
if (isFileSliceAfterPendingCompaction(fileSlice)) {
String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId());
return fileGroup.getLatestFileSliceBefore(compactionInstantTime);
}
return Optional.of(fileSlice);
})
.map(Optional::get);
}
/**
* Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches
* compaction Instant
* @param fileSlice File Slice
* @return
*/
private boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId());
if ((null != compactionInstantTime) && fileSlice.getBaseInstantTime().equals(compactionInstantTime)) {
return true;
}
return false;
}
/**
* With async compaction, it is possible to see partial/complete data-files due to inflight-compactions,
* Ignore those data-files
* @param fileSlice File Slice
* @return
*/
private FileSlice filterDataFileAfterPendingCompaction(FileSlice fileSlice) {
if (isFileSliceAfterPendingCompaction(fileSlice)) {
// Data file is filtered out of the file-slice as the corresponding compaction
// instant not completed yet.
FileSlice transformed = new FileSlice(fileSlice.getBaseInstantTime(), fileSlice.getFileId());
fileSlice.getLogFiles().forEach(lf -> transformed.addLogFile(lf));
return transformed;
}
return fileSlice;
}
@Override
public Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime) {
return getAllFileGroups(partitionPath)
.map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime))
.filter(dataFileOpt -> dataFileOpt.isPresent())
.filter(Optional::isPresent)
.map(Optional::get)
.map(this::filterDataFileAfterPendingCompaction);
}
/**
* Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet.
*
* @param lastSlice Latest File slice for a file-group
* @param penultimateSlice Penultimate file slice for a file-group in commit timeline order
*/
private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) {
FileSlice merged = new FileSlice(penultimateSlice.getBaseInstantTime(), penultimateSlice.getFileId());
if (penultimateSlice.getDataFile().isPresent()) {
merged.setDataFile(penultimateSlice.getDataFile().get());
}
// Add Log files from penultimate and last slices
penultimateSlice.getLogFiles().forEach(lf -> merged.addLogFile(lf));
lastSlice.getLogFiles().forEach(lf -> merged.addLogFile(lf));
return merged;
}
/**
* If the file-slice is because of pending compaction instant, this method merges the file-slice with the one before
* the compaction instant time
* @param fileGroup File Group for which the file slice belongs to
* @param fileSlice File Slice which needs to be merged
* @return
*/
private FileSlice getMergedFileSlice(HoodieFileGroup fileGroup, FileSlice fileSlice) {
// if the file-group is under construction, pick the latest before compaction instant time.
if (fileIdToPendingCompactionInstantTime.containsKey(fileSlice.getFileId())) {
String compactionInstantTime = fileIdToPendingCompactionInstantTime.get(fileSlice.getFileId());
if (fileSlice.getBaseInstantTime().equals(compactionInstantTime)) {
Optional<FileSlice> prevFileSlice = fileGroup.getLatestFileSliceBefore(compactionInstantTime);
if (prevFileSlice.isPresent()) {
return mergeCompactionPendingFileSlices(fileSlice, prevFileSlice.get());
}
}
}
return fileSlice;
}
@Override
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime) {
return getAllFileGroups(partitionPath)
.map(fileGroup -> {
Optional<FileSlice> fileSlice = fileGroup.getLatestFileSliceBeforeOrOn(maxInstantTime);
// if the file-group is under construction, pick the latest before compaction instant time.
if (fileSlice.isPresent()) {
fileSlice = Optional.of(getMergedFileSlice(fileGroup, fileSlice.get()));
}
return fileSlice;
})
.filter(Optional::isPresent)
.map(Optional::get);
}
@@ -226,7 +371,6 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
public Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) {
return fileGroupMap.values().stream()
.map(fileGroup -> fileGroup.getLatestFileSliceInRange(commitsToReturn))
.filter(dataFileOpt -> dataFileOpt.isPresent())
.map(Optional::get);
}
@@ -260,4 +404,15 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
"Failed to list data files in partition " + partitionPathStr, e);
}
}
/**
* Used by tests to add pending compaction entries TODO: This method is temporary and should go away in subsequent
* Async Compaction PR
*
* @param fileId File Id
* @param compactionInstantTime Compaction Instant Time
*/
protected void addPendingCompactionFileId(String fileId, String compactionInstantTime) {
fileIdToPendingCompactionInstantTime.put(fileId, compactionInstantTime);
}
}

View File

@@ -80,7 +80,7 @@ public class HoodieActiveTimelineTest {
Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete),
timeline.getCommitTimeline().filterCompletedInstants().getInstants());
HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5),
timeline.getCommitTimeline().filterInflights().getInstants());
timeline.getCommitTimeline().filterInflightsExcludingCompaction().getInstants());
}
@Test
@@ -106,7 +106,7 @@ public class HoodieActiveTimelineTest {
timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 2).getInstants()
.map(HoodieInstant::getTimestamp));
assertFalse(timeline.empty());
assertFalse(timeline.getCommitTimeline().filterInflights().empty());
assertFalse(timeline.getCommitTimeline().filterInflightsExcludingCompaction().empty());
assertEquals("", 12, timeline.countInstants());
HoodieTimeline activeCommitTimeline = timeline.getCommitTimeline().filterCompletedInstants();
assertEquals("", 10, activeCommitTimeline.countInstants());

View File

@@ -32,15 +32,18 @@ import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.FSUtils;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
@@ -53,7 +56,7 @@ public class HoodieTableFileSystemViewTest {
private HoodieTableMetaClient metaClient;
private String basePath;
private TableFileSystemView fsView;
private HoodieTableFileSystemView fsView;
private TableFileSystemView.ReadOptimizedView roView;
private TableFileSystemView.RealtimeView rtView;
@@ -74,15 +77,419 @@ public class HoodieTableFileSystemViewTest {
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
if (statuses != null) {
fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), statuses);
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
statuses);
} else {
fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants());
}
roView = (TableFileSystemView.ReadOptimizedView) fsView;
rtView = (TableFileSystemView.RealtimeView) fsView;
}
/**
* Test case for view generation on a file group where
* the only file-slice does not have data-file. This is the case where upserts directly go to log-files
*/
@Test
public void testViewForFileSlicesWithNoBaseFile() throws Exception {
String partitionPath = "2016/05/01";
new File(basePath + "/" + partitionPath).mkdirs();
String fileId = UUID.randomUUID().toString();
String instantTime1 = "1";
String deltaInstantTime1 = "2";
String deltaInstantTime2 = "3";
String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 1);
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1);
HoodieInstant deltaInstant2 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1);
HoodieInstant deltaInstant3 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2);
commitTimeline.saveAsComplete(instant1, Optional.empty());
commitTimeline.saveAsComplete(deltaInstant2, Optional.empty());
commitTimeline.saveAsComplete(deltaInstant3, Optional.empty());
refreshFsView(null);
List<HoodieDataFile> dataFiles = roView.getLatestDataFiles().collect(Collectors.toList());
assertTrue("No data file expected", dataFiles.isEmpty());
List<FileSlice> fileSliceList = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
assertEquals(1, fileSliceList.size());
FileSlice fileSlice = fileSliceList.get(0);
assertEquals("File-Id must be set correctly", fileId, fileSlice.getFileId());
assertFalse("Data file for base instant must be present", fileSlice.getDataFile().isPresent());
assertEquals("Base Instant for file-group set correctly", instantTime1, fileSlice.getBaseInstantTime());
List<HoodieLogFile> logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
assertEquals("Correct number of log-files shows up in file-slice", 2, logFiles.size());
assertEquals("Log File Order check", fileName2, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName1, logFiles.get(1).getFileName());
// Check Merged File Slices API
fileSliceList = rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime2)
.collect(Collectors.toList());
assertEquals(1, fileSliceList.size());
fileSlice = fileSliceList.get(0);
assertEquals("File-Id must be set correctly", fileId, fileSlice.getFileId());
assertFalse("Data file for base instant must be present", fileSlice.getDataFile().isPresent());
assertEquals("Base Instant for file-group set correctly", instantTime1, fileSlice.getBaseInstantTime());
logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
assertEquals("Correct number of log-files shows up in file-slice", 2, logFiles.size());
assertEquals("Log File Order check", fileName2, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName1, logFiles.get(1).getFileName());
// Check UnCompacted File Slices API
fileSliceList = rtView.getLatestUnCompactedFileSlices(partitionPath).collect(Collectors.toList());
assertEquals(1, fileSliceList.size());
fileSlice = fileSliceList.get(0);
assertEquals("File-Id must be set correctly", fileId, fileSlice.getFileId());
assertFalse("Data file for base instant must be present", fileSlice.getDataFile().isPresent());
assertEquals("Base Instant for file-group set correctly", instantTime1, fileSlice.getBaseInstantTime());
logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
assertEquals("Correct number of log-files shows up in file-slice", 2, logFiles.size());
assertEquals("Log File Order check", fileName2, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName1, logFiles.get(1).getFileName());
}
@Test
public void testViewForFileSlicesWithNoBaseFileAndRequestedCompaction() throws Exception {
testViewForFileSlicesWithAsyncCompaction(true, false);
}
@Test
public void testViewForFileSlicesWithBaseFileAndRequestedCompaction() throws Exception {
testViewForFileSlicesWithAsyncCompaction(false, false);
}
@Test
public void testViewForFileSlicesWithNoBaseFileAndInflightCompaction() throws Exception {
testViewForFileSlicesWithAsyncCompaction(true, true);
}
@Test
public void testViewForFileSlicesWithBaseFileAndInflightCompaction() throws Exception {
testViewForFileSlicesWithAsyncCompaction(false, true);
}
/**
* Returns all file-slices including uncommitted ones.
* @param partitionPath
* @return
*/
private Stream<FileSlice> getAllRawFileSlices(String partitionPath) {
return fsView.getAllFileGroups(partitionPath)
.map(group -> group.getAllFileSlicesIncludingInflight())
.flatMap(sliceList -> sliceList);
}
/**
* Returns latest raw file-slices including uncommitted ones.
* @param partitionPath
* @return
*/
public Stream<FileSlice> getLatestRawFileSlices(String partitionPath) {
return fsView.getAllFileGroups(partitionPath)
.map(fileGroup -> fileGroup.getLatestFileSlicesIncludingInflight())
.filter(fileSliceOpt -> fileSliceOpt.isPresent())
.map(Optional::get);
}
/**
* Helper method to test Views in the presence of concurrent compaction
* @param skipCreatingDataFile if set, first File Slice will not have data-file set. This would
* simulate inserts going directly to log files
* @param isCompactionInFlight if set, compaction was inflight (running) when view was tested first time,
* otherwise compaction was in requested state
* @throws Exception
*/
private void testViewForFileSlicesWithAsyncCompaction(boolean skipCreatingDataFile,
boolean isCompactionInFlight) throws Exception {
String partitionPath = "2016/05/01";
new File(basePath + "/" + partitionPath).mkdirs();
String fileId = UUID.randomUUID().toString();
// if skipCreatingDataFile, then instantTime1 below acts like delta-commit, otherwise it is base-commit
String instantTime1 = "1";
String deltaInstantTime1 = "2";
String deltaInstantTime2 = "3";
String dataFileName = null;
if (!skipCreatingDataFile) {
dataFileName = FSUtils.makeDataFileName(instantTime1, 1, fileId);
new File(basePath + "/" + partitionPath + "/" + dataFileName).createNewFile();
}
String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 1);
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1);
HoodieInstant deltaInstant2 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1);
HoodieInstant deltaInstant3 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2);
commitTimeline.saveAsComplete(instant1, Optional.empty());
commitTimeline.saveAsComplete(deltaInstant2, Optional.empty());
commitTimeline.saveAsComplete(deltaInstant3, Optional.empty());
// Fake delta-ingestion after compaction-requested
String compactionRequestedTime = "4";
String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId);
HoodieInstant compactionInstant = null;
if (isCompactionInFlight) {
// Create a Data-file but this should be skipped by view
new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile();
compactionInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime);
commitTimeline.saveToInflight(compactionInstant, Optional.empty());
} else {
compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime);
commitTimeline.saveToRequested(compactionInstant, Optional.empty());
}
String deltaInstantTime4 = "5";
String deltaInstantTime5 = "6";
List<String> allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2,
compactionRequestedTime, deltaInstantTime4, deltaInstantTime5);
String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0);
String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 1);
new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile();
HoodieInstant deltaInstant4 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime4);
HoodieInstant deltaInstant5 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime5);
commitTimeline.saveAsComplete(deltaInstant4, Optional.empty());
commitTimeline.saveAsComplete(deltaInstant5, Optional.empty());
refreshFsView(null);
fsView.addPendingCompactionFileId(fileId, compactionRequestedTime);
List<HoodieDataFile> dataFiles = roView.getAllDataFiles(partitionPath).collect(Collectors.toList());
if (skipCreatingDataFile) {
assertTrue("No data file expected", dataFiles.isEmpty());
} else {
assertEquals("One data-file is expected as there is only one file-group", 1, dataFiles.size());
assertEquals("Expect only valid data-file", dataFileName, dataFiles.get(0).getFileName());
}
/** Merge API Tests **/
List<FileSlice> fileSliceList = rtView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5)
.collect(Collectors.toList());
assertEquals("Expect file-slice to be merged", 1, fileSliceList.size());
FileSlice fileSlice = fileSliceList.get(0);
assertEquals(fileId, fileSlice.getFileId());
if (!skipCreatingDataFile) {
assertEquals("Data file must be present", dataFileName, fileSlice.getDataFile().get().getFileName());
} else {
assertFalse("No data-file expected as it was not created", fileSlice.getDataFile().isPresent());
}
assertEquals("Base Instant of penultimate file-slice must be base instant", instantTime1,
fileSlice.getBaseInstantTime());
List<HoodieLogFile> logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
assertEquals("Log files must include those after compaction request", 4, logFiles.size());
assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
assertEquals("Log File Order check", fileName2, logFiles.get(2).getFileName());
assertEquals("Log File Order check", fileName1, logFiles.get(3).getFileName());
fileSliceList = rtView.getLatestFileSlicesBeforeOrOn(partitionPath, deltaInstantTime5)
.collect(Collectors.toList());
assertEquals("Expect only one file-id", 1, fileSliceList.size());
fileSlice = fileSliceList.get(0);
assertEquals(fileId, fileSlice.getFileId());
assertFalse("No data-file expected in latest file-slice", fileSlice.getDataFile().isPresent());
assertEquals("Compaction requested instant must be base instant", compactionRequestedTime,
fileSlice.getBaseInstantTime());
logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
assertEquals("Log files must include only those after compaction request", 2, logFiles.size());
assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
/** Data Files API tests */
dataFiles = roView.getLatestDataFiles().collect(Collectors.toList());
if (skipCreatingDataFile) {
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
} else {
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.stream().forEach(df -> {
assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1);
});
}
dataFiles = roView.getLatestDataFiles(partitionPath).collect(Collectors.toList());
if (skipCreatingDataFile) {
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
} else {
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.stream().forEach(df -> {
assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1);
});
}
dataFiles = roView.getLatestDataFilesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList());
if (skipCreatingDataFile) {
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
} else {
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.stream().forEach(df -> {
assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1);
});
}
dataFiles = roView.getLatestDataFilesInRange(allInstantTimes).collect(Collectors.toList());
if (skipCreatingDataFile) {
assertEquals("Expect no data file to be returned", 0, dataFiles.size());
} else {
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.stream().forEach(df -> {
assertEquals("Expect data-file for instant 1 be returned", df.getCommitTime(), instantTime1);
});
}
/** Inflight/Orphan File-groups needs to be in the view **/
// There is a data-file with this inflight file-id
final String inflightFileId1 = UUID.randomUUID().toString();
// There is a log-file with this inflight file-id
final String inflightFileId2 = UUID.randomUUID().toString();
// There is an orphan data file with this file-id
final String orphanFileId1 = UUID.randomUUID().toString();
// There is an orphan log data file with this file-id
final String orphanFileId2 = UUID.randomUUID().toString();
final String invalidInstantId = "INVALIDTIME";
String inflightDeltaInstantTime = "7";
String orphanDataFileName = FSUtils.makeDataFileName(invalidInstantId, 1, orphanFileId1);
new File(basePath + "/" + partitionPath + "/" + orphanDataFileName).createNewFile();
String orphanLogFileName =
FSUtils.makeLogFileName(orphanFileId2, HoodieLogFile.DELTA_EXTENSION, invalidInstantId, 0);
new File(basePath + "/" + partitionPath + "/" + orphanLogFileName).createNewFile();
String inflightDataFileName = FSUtils.makeDataFileName(inflightDeltaInstantTime, 1, inflightFileId1);
new File(basePath + "/" + partitionPath + "/" + inflightDataFileName).createNewFile();
String inflightLogFileName =
FSUtils.makeLogFileName(inflightFileId2, HoodieLogFile.DELTA_EXTENSION, inflightDeltaInstantTime, 0);
new File(basePath + "/" + partitionPath + "/" + inflightLogFileName).createNewFile();
// Mark instant as inflight
commitTimeline.saveToInflight(new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
inflightDeltaInstantTime), Optional.empty());
refreshFsView(null);
fsView.addPendingCompactionFileId(fileId, compactionRequestedTime);
List<FileSlice> allRawFileSlices = getAllRawFileSlices(partitionPath).collect(Collectors.toList());
dataFiles = allRawFileSlices.stream().flatMap(slice -> {
if (slice.getDataFile().isPresent()) {
return Stream.of(slice.getDataFile().get());
}
return Stream.empty();
}).collect(Collectors.toList());
assertEquals("Inflight/Orphan data-file is also expected", 2
+ (isCompactionInFlight ? 1 : 0) + (skipCreatingDataFile ? 0 : 1), dataFiles.size());
Set<String> fileNames = dataFiles.stream().map(HoodieDataFile::getFileName).collect(Collectors.toSet());
assertTrue("Expect orphan data-file to be present", fileNames.contains(orphanDataFileName));
assertTrue("Expect inflight data-file to be present", fileNames.contains(inflightDataFileName));
if (!skipCreatingDataFile) {
assertTrue("Expect old committed data-file", fileNames.contains(dataFileName));
}
if (isCompactionInFlight) {
assertTrue("Expect inflight compacted data file to be present", fileNames.contains(compactDataFileName));
}
fileSliceList = getLatestRawFileSlices(partitionPath).collect(Collectors.toList());
assertEquals("Expect both inflight and orphan file-slice to be included",
5, fileSliceList.size());
Map<String, FileSlice> fileSliceMap =
fileSliceList.stream().collect(Collectors.toMap(FileSlice::getFileId, r -> r));
FileSlice orphanFileSliceWithDataFile = fileSliceMap.get(orphanFileId1);
FileSlice orphanFileSliceWithLogFile = fileSliceMap.get(orphanFileId2);
FileSlice inflightFileSliceWithDataFile = fileSliceMap.get(inflightFileId1);
FileSlice inflightFileSliceWithLogFile = fileSliceMap.get(inflightFileId2);
assertEquals("Orphan File Slice with data-file check base-commit", invalidInstantId,
orphanFileSliceWithDataFile.getBaseInstantTime());
assertEquals("Orphan File Slice with data-file check data-file", orphanDataFileName,
orphanFileSliceWithDataFile.getDataFile().get().getFileName());
assertEquals("Orphan File Slice with data-file check data-file", 0,
orphanFileSliceWithDataFile.getLogFiles().count());
assertEquals("Inflight File Slice with data-file check base-commit", inflightDeltaInstantTime,
inflightFileSliceWithDataFile.getBaseInstantTime());
assertEquals("Inflight File Slice with data-file check data-file", inflightDataFileName,
inflightFileSliceWithDataFile.getDataFile().get().getFileName());
assertEquals("Inflight File Slice with data-file check data-file", 0,
inflightFileSliceWithDataFile.getLogFiles().count());
assertEquals("Orphan File Slice with log-file check base-commit", invalidInstantId,
orphanFileSliceWithLogFile.getBaseInstantTime());
assertFalse("Orphan File Slice with log-file check data-file",
orphanFileSliceWithLogFile.getDataFile().isPresent());
logFiles = orphanFileSliceWithLogFile.getLogFiles().collect(Collectors.toList());
assertEquals("Orphan File Slice with log-file check data-file", 1, logFiles.size());
assertEquals("Orphan File Slice with log-file check data-file", orphanLogFileName,
logFiles.get(0).getFileName());
assertEquals("Inflight File Slice with log-file check base-commit", inflightDeltaInstantTime,
inflightFileSliceWithLogFile.getBaseInstantTime());
assertFalse("Inflight File Slice with log-file check data-file",
inflightFileSliceWithLogFile.getDataFile().isPresent());
logFiles = inflightFileSliceWithLogFile.getLogFiles().collect(Collectors.toList());
assertEquals("Inflight File Slice with log-file check data-file", 1, logFiles.size());
assertEquals("Inflight File Slice with log-file check data-file", inflightLogFileName,
logFiles.get(0).getFileName());
// Now simulate Compaction completing - Check the view
if (!isCompactionInFlight) {
// For inflight compaction, we already create a data-file to test concurrent inflight case.
// If we skipped creating data file corresponding to compaction commit, create it now
new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile();
}
if (isCompactionInFlight) {
commitTimeline.deleteInflight(compactionInstant);
} else {
commitTimeline.deleteCompactionRequested(compactionInstant);
}
compactionInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime);
commitTimeline.saveAsComplete(compactionInstant, Optional.empty());
refreshFsView(null);
// populate the cache
roView.getAllDataFiles(partitionPath);
fileSliceList = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
dataFiles = fileSliceList.stream().map(FileSlice::getDataFile)
.filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
System.out.println("fileSliceList : " + fileSliceList);
assertEquals("Expect only one data-files in latest view as there is only one file-group", 1, dataFiles.size());
assertEquals("Data Filename must match", compactDataFileName, dataFiles.get(0).getFileName());
assertEquals("Only one latest file-slice in the partition", 1, fileSliceList.size());
fileSlice = fileSliceList.get(0);
assertEquals("Check file-Id is set correctly", fileId, fileSlice.getFileId());
assertEquals("Check data-filename is set correctly",
compactDataFileName, fileSlice.getDataFile().get().getFileName());
assertEquals("Ensure base-instant is now compaction request instant",
compactionRequestedTime, fileSlice.getBaseInstantTime());
logFiles = fileSlice.getLogFiles().collect(Collectors.toList());
assertEquals("Only log-files after compaction request shows up", 2, logFiles.size());
assertEquals("Log File Order check", fileName4, logFiles.get(0).getFileName());
assertEquals("Log File Order check", fileName3, logFiles.get(1).getFileName());
/** Data Files API tests */
dataFiles = roView.getLatestDataFiles().collect(Collectors.toList());
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.stream().forEach(df -> {
assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(),
compactionRequestedTime);
});
dataFiles = roView.getLatestDataFiles(partitionPath).collect(Collectors.toList());
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.stream().forEach(df -> {
assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(),
compactionRequestedTime);
});
dataFiles = roView.getLatestDataFilesBeforeOrOn(partitionPath, deltaInstantTime5).collect(Collectors.toList());
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.stream().forEach(df -> {
assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(),
compactionRequestedTime);
});
dataFiles = roView.getLatestDataFilesInRange(allInstantTimes).collect(Collectors.toList());
assertEquals("Expect only one data-file to be sent", 1, dataFiles.size());
dataFiles.stream().forEach(df -> {
assertEquals("Expect data-file created by compaction be returned", df.getCommitTime(),
compactionRequestedTime);
});
}
@Test
public void testGetLatestDataFilesForFileId() throws IOException {
String partitionPath = "2016/05/01";
@@ -328,15 +735,15 @@ public class HoodieTableFileSystemViewTest {
assertEquals(3, slices.size());
for (FileSlice slice : slices) {
if (slice.getFileId().equals(fileId1)) {
assertEquals(slice.getBaseCommitTime(), commitTime3);
assertEquals(slice.getBaseInstantTime(), commitTime3);
assertTrue(slice.getDataFile().isPresent());
assertEquals(slice.getLogFiles().count(), 0);
} else if (slice.getFileId().equals(fileId2)) {
assertEquals(slice.getBaseCommitTime(), commitTime4);
assertEquals(slice.getBaseInstantTime(), commitTime4);
assertFalse(slice.getDataFile().isPresent());
assertEquals(slice.getLogFiles().count(), 1);
} else if (slice.getFileId().equals(fileId3)) {
assertEquals(slice.getBaseCommitTime(), commitTime4);
assertEquals(slice.getBaseInstantTime(), commitTime4);
assertTrue(slice.getDataFile().isPresent());
assertEquals(slice.getLogFiles().count(), 0);
}
@@ -433,17 +840,17 @@ public class HoodieTableFileSystemViewTest {
List<FileSlice> slices = fileGroup.getAllFileSlices().collect(Collectors.toList());
if (fileGroup.getId().equals(fileId1)) {
assertEquals(2, slices.size());
assertEquals(commitTime4, slices.get(0).getBaseCommitTime());
assertEquals(commitTime1, slices.get(1).getBaseCommitTime());
assertEquals(commitTime4, slices.get(0).getBaseInstantTime());
assertEquals(commitTime1, slices.get(1).getBaseInstantTime());
} else if (fileGroup.getId().equals(fileId2)) {
assertEquals(3, slices.size());
assertEquals(commitTime3, slices.get(0).getBaseCommitTime());
assertEquals(commitTime2, slices.get(1).getBaseCommitTime());
assertEquals(commitTime1, slices.get(2).getBaseCommitTime());
assertEquals(commitTime3, slices.get(0).getBaseInstantTime());
assertEquals(commitTime2, slices.get(1).getBaseInstantTime());
assertEquals(commitTime1, slices.get(2).getBaseInstantTime());
} else if (fileGroup.getId().equals(fileId3)) {
assertEquals(2, slices.size());
assertEquals(commitTime4, slices.get(0).getBaseCommitTime());
assertEquals(commitTime3, slices.get(1).getBaseCommitTime());
assertEquals(commitTime4, slices.get(0).getBaseInstantTime());
assertEquals(commitTime3, slices.get(1).getBaseInstantTime());
}
}

View File

@@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieException;
@@ -108,7 +109,13 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
.getRelativePartitionPath(new Path(metaClient.getBasePath()), partitionPath);
try {
Stream<FileSlice> latestFileSlices = fsView.getLatestFileSlices(relPartitionPath);
// Both commit and delta-commits are included - pick the latest completed one
Optional<HoodieInstant> latestCompletedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
Stream<FileSlice> latestFileSlices = latestCompletedInstant.map(instant ->
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, instant.getTimestamp()))
.orElse(Stream.empty());
// subgroup splits again by file id & match with log files.
Map<String, List<FileSplit>> groupedInputSplits = partitionsToParquetSplits