[MINOR] Reorder HoodieTimeline#compareTimestamp arguments for better readability (#1575)
- reads nicely as (instantTime1, GREATER_THAN_OR_EQUALS, instantTime2) etc
This commit is contained in:
@@ -357,7 +357,7 @@ public class CommitsCommand implements CommandMarker {
|
||||
sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();
|
||||
|
||||
if (sourceLatestCommit != null
|
||||
&& HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
|
||||
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
|
||||
// source is behind the target
|
||||
List<String> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
|
||||
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
|
||||
|
||||
@@ -260,9 +260,9 @@ public class FileSystemViewCommand implements CommandMarker {
|
||||
if (!maxInstant.isEmpty()) {
|
||||
final BiPredicate<String, String> predicate;
|
||||
if (includeMaxInstant) {
|
||||
predicate = HoodieTimeline.GREATER_OR_EQUAL;
|
||||
predicate = HoodieTimeline.GREATER_THAN_OR_EQUALS;
|
||||
} else {
|
||||
predicate = HoodieTimeline.GREATER;
|
||||
predicate = HoodieTimeline.GREATER_THAN;
|
||||
}
|
||||
instantsStream = instantsStream.filter(is -> predicate.test(maxInstant, is.getTimestamp()));
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ public class HoodieSyncCommand implements CommandMarker {
|
||||
sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();
|
||||
|
||||
if (sourceLatestCommit != null
|
||||
&& HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
|
||||
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
|
||||
// source is behind the target
|
||||
return getString(target, targetTimeline, source, sourceCount, targetCount, sourceLatestCommit);
|
||||
} else {
|
||||
|
||||
@@ -426,7 +426,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
||||
try {
|
||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
||||
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
|
||||
.filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitInstantTime))
|
||||
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
|
||||
.findFirst());
|
||||
if (commitInstantOpt.isPresent()) {
|
||||
HoodieRollbackMetadata rollbackMetadata = table.rollback(jsc, rollbackInstantTime, commitInstantOpt.get(), true);
|
||||
@@ -537,7 +537,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
||||
// if there are pending compactions, their instantTime must not be greater than that of this instant time
|
||||
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
|
||||
ValidationUtils.checkArgument(
|
||||
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER),
|
||||
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
|
||||
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
|
||||
+ latestPending + ", Ingesting at " + instantTime));
|
||||
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
|
||||
|
||||
@@ -182,8 +182,8 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
// 2) is less than the first commit ts in the timeline
|
||||
return !commitTimeline.empty()
|
||||
&& (commitTimeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTs))
|
||||
|| HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), commitTs,
|
||||
HoodieTimeline.GREATER));
|
||||
|| HoodieTimeline.compareTimestamps(commitTimeline.firstInstant().get().getTimestamp(), HoodieTimeline.GREATER_THAN, commitTs
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -169,10 +169,10 @@ public class HoodieTimelineArchiveLog {
|
||||
instants = Stream.concat(instants, commitTimeline.getInstants().filter(s -> {
|
||||
// if no savepoint present, then dont filter
|
||||
return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(),
|
||||
s.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
|
||||
HoodieTimeline.LESSER_THAN_OR_EQUALS, s.getTimestamp()));
|
||||
}).filter(s -> {
|
||||
// Ensure commits >= oldest pending compaction commit is retained
|
||||
return oldestPendingCompactionInstant.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), s.getTimestamp(), HoodieTimeline.GREATER)).orElse(true);
|
||||
return oldestPendingCompactionInstant.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, s.getTimestamp())).orElse(true);
|
||||
}).limit(commitTimeline.countInstants() - minCommitsToKeep));
|
||||
}
|
||||
|
||||
@@ -243,7 +243,7 @@ public class HoodieTimelineArchiveLog {
|
||||
|
||||
List<HoodieInstant> instantsToBeDeleted =
|
||||
instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
|
||||
thresholdInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL)).collect(Collectors.toList());
|
||||
HoodieTimeline.LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList());
|
||||
|
||||
for (HoodieInstant deleteInstant : instantsToBeDeleted) {
|
||||
LOG.info("Deleting instant " + deleteInstant + " in auxiliary meta path " + metaClient.getMetaAuxiliaryPath());
|
||||
|
||||
@@ -123,9 +123,11 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
|
||||
LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
|
||||
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
|
||||
+ ". New Instant to retain : " + newInstantToRetain);
|
||||
return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
|
||||
HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
|
||||
newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> {
|
||||
return hoodieTable.getCompletedCommitsTimeline().getInstants()
|
||||
.filter(instant ->
|
||||
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, cleanMetadata.getEarliestCommitToRetain())
|
||||
&& HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
|
||||
).flatMap(instant -> {
|
||||
try {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||
return commitMetadata.getPartitionToWriteStats().keySet().stream();
|
||||
@@ -252,7 +254,7 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
|
||||
|
||||
// Always keep the last commit
|
||||
if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
|
||||
.compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
|
||||
.compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
|
||||
// this is a commit, that should be cleaned.
|
||||
aFile.ifPresent(hoodieDataFile -> deletePaths.add(hoodieDataFile.getFileName()));
|
||||
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
|
||||
@@ -273,7 +275,7 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
|
||||
private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, HoodieInstant instantTime) {
|
||||
for (FileSlice file : fileSliceList) {
|
||||
String fileCommitTime = file.getBaseInstantTime();
|
||||
if (HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), fileCommitTime, HoodieTimeline.GREATER)) {
|
||||
if (HoodieTimeline.compareTimestamps(instantTime.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
|
||||
// fileList is sorted on the reverse, so the first commit we find <= instantTime is the
|
||||
// one we want
|
||||
return fileCommitTime;
|
||||
@@ -324,8 +326,8 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> implements Serializa
|
||||
CompactionOperation op = fgIdToPendingCompactionOperations.get(fileSlice.getFileGroupId());
|
||||
if (null != op) {
|
||||
// If file slice's instant time is newer or same as that of operation, do not clean
|
||||
return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), op.getBaseInstantTime(),
|
||||
HoodieTimeline.GREATER_OR_EQUAL);
|
||||
return HoodieTimeline.compareTimestamps(fileSlice.getBaseInstantTime(), HoodieTimeline.GREATER_THAN_OR_EQUALS, op.getBaseInstantTime()
|
||||
);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -89,7 +89,7 @@ public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<
|
||||
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
|
||||
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
|
||||
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
|
||||
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
|
||||
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
|
||||
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
|
||||
+ ", Compaction scheduled at " + instantTime));
|
||||
|
||||
@@ -97,7 +97,7 @@ public class ScheduleCompactionActionExecutor extends BaseActionExecutor<Option<
|
||||
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
|
||||
.getCommitsAndCompactionTimeline().getInstants()
|
||||
.filter(instant -> HoodieTimeline.compareTimestamps(
|
||||
instant.getTimestamp(), instantTime, HoodieTimeline.GREATER_OR_EQUAL))
|
||||
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
|
||||
.collect(Collectors.toList());
|
||||
ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
|
||||
"Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
|
||||
|
||||
@@ -64,7 +64,7 @@ public abstract class BaseRestoreActionExecutor extends BaseActionExecutor<Hoodi
|
||||
// Get all the commits on the timeline after the provided commit time
|
||||
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getCommitsAndCompactionTimeline()
|
||||
.getReverseOrderedInstants()
|
||||
.filter(instant -> HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), restoreInstantTime))
|
||||
.filter(instant -> HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), restoreInstantTime))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Map<String, List<HoodieRollbackMetadata>> instantToMetadata = new HashMap<>();
|
||||
|
||||
@@ -225,13 +225,13 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
|
||||
// For sanity, log instant time can never be less than base-commit on which we are rolling back
|
||||
ValidationUtils
|
||||
.checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()),
|
||||
rollbackInstant.getTimestamp(), HoodieTimeline.LESSER_OR_EQUAL));
|
||||
HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()));
|
||||
}
|
||||
|
||||
return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(
|
||||
// Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option
|
||||
// to delete and we should not step on it
|
||||
wStat.getFileId()), rollbackInstant.getTimestamp(), HoodieTimeline.LESSER);
|
||||
wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp());
|
||||
}).map(wStat -> {
|
||||
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
|
||||
return RollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(),
|
||||
|
||||
@@ -84,7 +84,7 @@ public class SavepointActionExecutor extends BaseActionExecutor<HoodieSavepointM
|
||||
}
|
||||
|
||||
// Cannot allow savepoint time on a commit that could have been cleaned
|
||||
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
|
||||
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),
|
||||
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
|
||||
|
||||
Map<String, List<String>> latestFilesMap = jsc.parallelize(FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
|
||||
|
||||
@@ -1103,7 +1103,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
|
||||
assertTrue(HoodieTimeline.compareTimestamps(
|
||||
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
|
||||
fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER),
|
||||
HoodieTimeline.GREATER_THAN, fileIdWithCommitTime.getValue()),
|
||||
"Deleted instant time must be less than pending compaction");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -164,7 +164,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(),
|
||||
"Expecting a single commit.");
|
||||
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
|
||||
assertTrue(HoodieTimeline.compareTimestamps("000", latestCompactionCommitTime, HoodieTimeline.LESSER));
|
||||
assertTrue(HoodieTimeline.compareTimestamps("000", HoodieTimeline.LESSER_THAN, latestCompactionCommitTime));
|
||||
|
||||
assertEquals(200, HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count(),
|
||||
"Must contain 200 records");
|
||||
@@ -877,7 +877,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
||||
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
|
||||
|
||||
assertTrue(HoodieTimeline
|
||||
.compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, HoodieTimeline.GREATER),
|
||||
.compareTimestamps(timeline.lastInstant().get().getTimestamp(), HoodieTimeline.GREATER_THAN, newCommitTime),
|
||||
"Compaction commit should be > than last insert");
|
||||
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
|
||||
@@ -114,7 +114,7 @@ public class HoodieFileGroup implements Serializable {
|
||||
private boolean isFileSliceCommitted(FileSlice slice) {
|
||||
String maxCommitTime = lastInstant.get().getTimestamp();
|
||||
return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime())
|
||||
&& HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), maxCommitTime, HoodieTimeline.LESSER_OR_EQUAL);
|
||||
&& HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime);
|
||||
|
||||
}
|
||||
|
||||
@@ -164,7 +164,7 @@ public class HoodieFileGroup implements Serializable {
|
||||
*/
|
||||
public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxInstantTime) {
|
||||
return Option.fromJavaOptional(getAllFileSlices().filter(slice -> HoodieTimeline
|
||||
.compareTimestamps(slice.getBaseInstantTime(), maxInstantTime, HoodieTimeline.LESSER_OR_EQUAL)).findFirst());
|
||||
.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)).findFirst());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -175,7 +175,7 @@ public class HoodieFileGroup implements Serializable {
|
||||
*/
|
||||
public Option<FileSlice> getLatestFileSliceBefore(String maxInstantTime) {
|
||||
return Option.fromJavaOptional(getAllFileSlices().filter(
|
||||
slice -> HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), maxInstantTime, HoodieTimeline.LESSER))
|
||||
slice -> HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, maxInstantTime))
|
||||
.findFirst());
|
||||
}
|
||||
|
||||
|
||||
@@ -138,8 +138,8 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
HoodieLogBlock r = logFormatReaderWrapper.next();
|
||||
totalLogBlocks.incrementAndGet();
|
||||
if (r.getBlockType() != CORRUPT_BLOCK
|
||||
&& !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), this.latestInstantTime,
|
||||
HoodieTimeline.LESSER_OR_EQUAL)) {
|
||||
&& !HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
|
||||
)) {
|
||||
// hit a block with instant time greater than should be processed, stop processing further
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
String newCommitTime;
|
||||
do {
|
||||
newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
|
||||
} while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL));
|
||||
} while (HoodieTimeline.compareTimestamps(newCommitTime, LESSER_THAN_OR_EQUALS, oldVal));
|
||||
return newCommitTime;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -128,14 +128,14 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
@Override
|
||||
public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) {
|
||||
return new HoodieDefaultTimeline(instants.stream()
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), instantTime, GREATER)).limit(numCommits),
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)).limit(numCommits),
|
||||
details);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
|
||||
return new HoodieDefaultTimeline(instants.stream()
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), instantTime, LESSER)),
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantTime)),
|
||||
details);
|
||||
}
|
||||
|
||||
@@ -288,7 +288,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
public boolean isBeforeTimelineStarts(String instant) {
|
||||
Option<HoodieInstant> firstCommit = firstInstant();
|
||||
return firstCommit.isPresent()
|
||||
&& HoodieTimeline.compareTimestamps(instant, firstCommit.get().getTimestamp(), LESSER);
|
||||
&& HoodieTimeline.compareTimestamps(instant, LESSER_THAN, firstCommit.get().getTimestamp());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -228,13 +228,13 @@ public interface HoodieTimeline extends Serializable {
|
||||
/**
|
||||
* Helper methods to compare instants.
|
||||
**/
|
||||
BiPredicate<String, String> EQUAL = (commit1, commit2) -> commit1.compareTo(commit2) == 0;
|
||||
BiPredicate<String, String> GREATER_OR_EQUAL = (commit1, commit2) -> commit1.compareTo(commit2) >= 0;
|
||||
BiPredicate<String, String> GREATER = (commit1, commit2) -> commit1.compareTo(commit2) > 0;
|
||||
BiPredicate<String, String> LESSER_OR_EQUAL = (commit1, commit2) -> commit1.compareTo(commit2) <= 0;
|
||||
BiPredicate<String, String> LESSER = (commit1, commit2) -> commit1.compareTo(commit2) < 0;
|
||||
BiPredicate<String, String> EQUALS = (commit1, commit2) -> commit1.compareTo(commit2) == 0;
|
||||
BiPredicate<String, String> GREATER_THAN_OR_EQUALS = (commit1, commit2) -> commit1.compareTo(commit2) >= 0;
|
||||
BiPredicate<String, String> GREATER_THAN = (commit1, commit2) -> commit1.compareTo(commit2) > 0;
|
||||
BiPredicate<String, String> LESSER_THAN_OR_EQUALS = (commit1, commit2) -> commit1.compareTo(commit2) <= 0;
|
||||
BiPredicate<String, String> LESSER_THAN = (commit1, commit2) -> commit1.compareTo(commit2) < 0;
|
||||
|
||||
static boolean compareTimestamps(String commit1, String commit2, BiPredicate<String, String> predicateToApply) {
|
||||
static boolean compareTimestamps(String commit1, BiPredicate<String, String> predicateToApply, String commit2) {
|
||||
return predicateToApply.test(commit1, commit2);
|
||||
}
|
||||
|
||||
@@ -242,8 +242,8 @@ public interface HoodieTimeline extends Serializable {
|
||||
* Return true if specified timestamp is in range (startTs, endTs].
|
||||
*/
|
||||
static boolean isInRange(String timestamp, String startTs, String endTs) {
|
||||
return HoodieTimeline.compareTimestamps(timestamp, startTs, GREATER)
|
||||
&& HoodieTimeline.compareTimestamps(timestamp, endTs, LESSER_OR_EQUAL);
|
||||
return HoodieTimeline.compareTimestamps(timestamp, GREATER_THAN, startTs)
|
||||
&& HoodieTimeline.compareTimestamps(timestamp, LESSER_THAN_OR_EQUALS, endTs);
|
||||
}
|
||||
|
||||
static HoodieInstant getCompletedInstant(final HoodieInstant instant) {
|
||||
|
||||
@@ -48,7 +48,7 @@ public class TimelineDiffHelper {
|
||||
|
||||
if (lastSeenInstant.isPresent() && firstInstantInNewTimeline.isPresent()) {
|
||||
if (HoodieTimeline.compareTimestamps(lastSeenInstant.get().getTimestamp(),
|
||||
firstInstantInNewTimeline.get().getTimestamp(), HoodieTimeline.LESSER)) {
|
||||
HoodieTimeline.LESSER_THAN, firstInstantInNewTimeline.get().getTimestamp())) {
|
||||
// The last seen instant is no longer in the timeline. Do not incrementally Sync.
|
||||
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
|
||||
}
|
||||
|
||||
@@ -352,8 +352,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
ensurePartitionLoadedCorrectly(partitionPath);
|
||||
return fetchAllStoredFileGroups(partitionPath)
|
||||
.map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
|
||||
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), maxCommitTime,
|
||||
HoodieTimeline.LESSER_OR_EQUAL))
|
||||
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
|
||||
))
|
||||
.filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst()))
|
||||
.filter(Option::isPresent).map(Option::get);
|
||||
} finally {
|
||||
@@ -369,7 +369,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
ensurePartitionLoadedCorrectly(partitionPath);
|
||||
return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
|
||||
.filter(
|
||||
baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), instantTime, HoodieTimeline.EQUAL))
|
||||
baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS, instantTime))
|
||||
.filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null));
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
|
||||
@@ -289,8 +289,8 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
|
||||
.map(Pair::getValue).reduce(null,
|
||||
(x, y) -> ((x == null) ? y
|
||||
: (y == null) ? null
|
||||
: HoodieTimeline.compareTimestamps(x.getBaseInstantTime(), y.getBaseInstantTime(),
|
||||
HoodieTimeline.GREATER) ? x : y)));
|
||||
: HoodieTimeline.compareTimestamps(x.getBaseInstantTime(), HoodieTimeline.GREATER_THAN, y.getBaseInstantTime()
|
||||
) ? x : y)));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -303,7 +303,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
|
||||
.map(Pair::getValue).reduce(null,
|
||||
(x, y) -> ((x == null) ? y
|
||||
: (y == null) ? null
|
||||
: HoodieTimeline.compareTimestamps(x.getCommitTime(), y.getCommitTime(), HoodieTimeline.GREATER)
|
||||
: HoodieTimeline.compareTimestamps(x.getCommitTime(), HoodieTimeline.GREATER_THAN, y.getCommitTime())
|
||||
? x
|
||||
: y)));
|
||||
}
|
||||
|
||||
@@ -381,8 +381,8 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
}
|
||||
Assert.assertEquals(State.COMPLETED, view.getLastInstant().get().getState());
|
||||
|
||||
if (HoodieTimeline.compareTimestamps(newRestoreInstants.get(idx), emptyRestoreInstant,
|
||||
HoodieTimeline.GREATER_OR_EQUAL)) {
|
||||
if (HoodieTimeline.compareTimestamps(newRestoreInstants.get(idx), HoodieTimeline.GREATER_THAN_OR_EQUALS, emptyRestoreInstant
|
||||
)) {
|
||||
partitions.forEach(p -> Assert.assertEquals(0, view.getLatestFileSlices(p).count()));
|
||||
} else {
|
||||
partitions.forEach(p -> Assert.assertEquals(fileIdsPerPartition.size(), view.getLatestFileSlices(p).count()));
|
||||
@@ -510,7 +510,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
});
|
||||
view.getLatestMergedFileSlicesBeforeOrOn(p, instantTime).forEach(fs -> {
|
||||
Assert
|
||||
.assertTrue(HoodieTimeline.compareTimestamps(instantTime, fs.getBaseInstantTime(), HoodieTimeline.GREATER));
|
||||
.assertTrue(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN, fs.getBaseInstantTime()));
|
||||
Assert.assertEquals(p, fs.getPartitionPath());
|
||||
});
|
||||
});
|
||||
|
||||
@@ -133,8 +133,8 @@ public class HoodieSnapshotCopier implements Serializable {
|
||||
return true;
|
||||
} else {
|
||||
String instantTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
|
||||
return HoodieTimeline.compareTimestamps(instantTime, latestCommitTimestamp,
|
||||
HoodieTimeline.LESSER_OR_EQUAL);
|
||||
return HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCommitTimestamp
|
||||
);
|
||||
}
|
||||
});
|
||||
for (FileStatus commitStatus : commitFilesToCopy) {
|
||||
|
||||
@@ -229,8 +229,8 @@ public class HoodieSnapshotExporter {
|
||||
return true;
|
||||
} else {
|
||||
String instantTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
|
||||
return HoodieTimeline.compareTimestamps(instantTime, latestCommitTimestamp,
|
||||
HoodieTimeline.LESSER_OR_EQUAL);
|
||||
return HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCommitTimestamp
|
||||
);
|
||||
}
|
||||
});
|
||||
for (FileStatus commitStatus : commitFilesToCopy) {
|
||||
|
||||
@@ -88,11 +88,11 @@ public class IncrSourceHelper {
|
||||
*/
|
||||
public static void validateInstantTime(Row row, String instantTime, String sinceInstant, String endInstant) {
|
||||
Objects.requireNonNull(instantTime);
|
||||
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, sinceInstant, HoodieTimeline.GREATER),
|
||||
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN, sinceInstant),
|
||||
"Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between "
|
||||
+ sinceInstant + "(excl) - " + endInstant + "(incl)");
|
||||
ValidationUtils.checkArgument(
|
||||
HoodieTimeline.compareTimestamps(instantTime, endInstant, HoodieTimeline.LESSER_OR_EQUAL),
|
||||
HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, endInstant),
|
||||
"Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between "
|
||||
+ sinceInstant + "(excl) - " + endInstant + "(incl)");
|
||||
}
|
||||
|
||||
@@ -704,8 +704,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
ds2.sync();
|
||||
mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
|
||||
HoodieInstant newLastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
|
||||
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(),
|
||||
HoodieTimeline.GREATER));
|
||||
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), HoodieTimeline.GREATER_THAN, lastFinished.getTimestamp()
|
||||
));
|
||||
|
||||
// Ensure it is empty
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
|
||||
Reference in New Issue
Block a user