Cleanup calls to HoodieTimeline.compareTimeStamps
This commit is contained in:
@@ -228,8 +228,8 @@ public class CommitsCommand implements CommandMarker {
|
||||
String sourceLatestCommit =
|
||||
sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();
|
||||
|
||||
if (sourceLatestCommit != null && sourceTimeline
|
||||
.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
|
||||
if (sourceLatestCommit != null &&
|
||||
HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
|
||||
// source is behind the target
|
||||
List<String> commitsToCatchup =
|
||||
targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
|
||||
|
||||
@@ -79,7 +79,7 @@ public class HoodieSyncCommand implements CommandMarker {
|
||||
String sourceLatestCommit =
|
||||
sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();
|
||||
|
||||
if (sourceLatestCommit != null && sourceTimeline
|
||||
if (sourceLatestCommit != null && HoodieTimeline
|
||||
.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
|
||||
// source is behind the target
|
||||
List<HoodieInstant> commitsToCatchup =
|
||||
|
||||
@@ -470,7 +470,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
|
||||
// Cannot allow savepoint time on a commit that could have been cleaned
|
||||
Preconditions.checkArgument(table.getActiveTimeline()
|
||||
Preconditions.checkArgument(HoodieTimeline
|
||||
.compareTimestamps(commitTime, lastCommitRetained, HoodieTimeline.GREATER_OR_EQUAL),
|
||||
"Could not savepoint commit " + commitTime + " as this is beyond the lookup window "
|
||||
+ lastCommitRetained);
|
||||
|
||||
@@ -187,8 +187,9 @@ public class HoodieCleaner<T extends HoodieRecordPayload<T>> {
|
||||
}
|
||||
|
||||
// Always keep the last commit
|
||||
if (commitTimeline
|
||||
.compareTimestamps(earliestCommitToRetain.getTimestamp(), fileCommitTime,
|
||||
if (HoodieTimeline.compareTimestamps(
|
||||
earliestCommitToRetain.getTimestamp(),
|
||||
fileCommitTime,
|
||||
HoodieTimeline.GREATER)) {
|
||||
// this is a commit, that should be cleaned.
|
||||
deletePaths.add(String
|
||||
@@ -217,7 +218,7 @@ public class HoodieCleaner<T extends HoodieRecordPayload<T>> {
|
||||
HoodieInstant commitTime) {
|
||||
for (HoodieDataFile file : fileList) {
|
||||
String fileCommitTime = FSUtils.getCommitTime(file.getFileName());
|
||||
if (commitTimeline.compareTimestamps(commitTime.getTimestamp(), fileCommitTime,
|
||||
if (HoodieTimeline.compareTimestamps(commitTime.getTimestamp(), fileCommitTime,
|
||||
HoodieTimeline.GREATER)) {
|
||||
// fileList is sorted on the reverse, so the first commit we find <= commitTime is the one we want
|
||||
return fileCommitTime;
|
||||
|
||||
@@ -35,9 +35,7 @@ import org.apache.log4j.Logger;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Driver program that uses the Hoodie client with synthetic workload, and performs basic
|
||||
|
||||
@@ -175,7 +175,7 @@ public class TestMergeOnReadTable {
|
||||
HoodieReadClient readClient = new HoodieReadClient(jsc, basePath, sqlContext);
|
||||
assertEquals("Expecting a single commit.", 1, readClient.listCommitsSince("000").size());
|
||||
String latestCompactionCommitTime = readClient.latestCommit();
|
||||
assertTrue(metaClient.getActiveTimeline()
|
||||
assertTrue(HoodieTimeline
|
||||
.compareTimestamps("000", latestCompactionCommitTime, HoodieTimeline.LESSER));
|
||||
assertEquals("Must contain 200 records", 200, readClient.readSince("000").count());
|
||||
}
|
||||
|
||||
@@ -183,8 +183,8 @@ public class TestHoodieCompactor {
|
||||
table = HoodieTable.getHoodieTable(metaClient, config);
|
||||
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
|
||||
|
||||
assertTrue("Compaction commit should be > than last insert", timeline
|
||||
.compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime,
|
||||
assertTrue("Compaction commit should be > than last insert",
|
||||
HoodieTimeline.compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime,
|
||||
HoodieTimeline.GREATER));
|
||||
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
|
||||
@@ -186,9 +186,8 @@ public class TestCopyOnWriteTable {
|
||||
for (File file : new File(basePath + "/2016/01/31").listFiles()) {
|
||||
if (file.getName().endsWith(".parquet")) {
|
||||
if (FSUtils.getFileId(file.getName())
|
||||
.equals(FSUtils.getFileId(parquetFile.getName())) && metadata
|
||||
.getActiveTimeline().getCommitTimeline()
|
||||
.compareTimestamps(FSUtils.getCommitTime(file.getName()),
|
||||
.equals(FSUtils.getFileId(parquetFile.getName())) &&
|
||||
HoodieTimeline.compareTimestamps(FSUtils.getCommitTime(file.getName()),
|
||||
FSUtils.getCommitTime(parquetFile.getName()), HoodieTimeline.GREATER)) {
|
||||
updatedParquetFile = file;
|
||||
break;
|
||||
|
||||
@@ -23,7 +23,6 @@ import org.apache.parquet.avro.AvroWriteSupport;
|
||||
import org.apache.parquet.hadoop.api.WriteSupport;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
|
||||
@@ -164,7 +164,7 @@ public interface HoodieTimeline extends Serializable {
|
||||
(commit1, commit2) -> commit1.compareTo(commit2) <= 0;
|
||||
BiPredicate<String, String> LESSER = (commit1, commit2) -> commit1.compareTo(commit2) < 0;
|
||||
|
||||
default boolean compareTimestamps(String commit1, String commit2,
|
||||
static boolean compareTimestamps(String commit1, String commit2,
|
||||
BiPredicate<String, String> predicateToApply) {
|
||||
return predicateToApply.test(commit1, commit2);
|
||||
}
|
||||
|
||||
@@ -65,14 +65,15 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
@Override
|
||||
public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) {
|
||||
return new HoodieDefaultTimeline(instants.stream().filter(
|
||||
s -> compareTimestamps(s.getTimestamp(), startTs, GREATER) && compareTimestamps(
|
||||
s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), startTs, GREATER) &&
|
||||
HoodieTimeline.compareTimestamps(
|
||||
s.getTimestamp(), endTs, LESSER_OR_EQUAL)), details);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieDefaultTimeline findInstantsAfter(String commitTime, int numCommits) {
|
||||
return new HoodieDefaultTimeline(
|
||||
instants.stream().filter(s -> compareTimestamps(s.getTimestamp(), commitTime, GREATER))
|
||||
instants.stream().filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), commitTime, GREATER))
|
||||
.limit(numCommits), details);
|
||||
}
|
||||
|
||||
@@ -131,8 +132,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
@Override
|
||||
public boolean isBeforeTimelineStarts(String instant) {
|
||||
Optional<HoodieInstant> firstCommit = firstInstant();
|
||||
return firstCommit.isPresent() && compareTimestamps(instant,
|
||||
firstCommit.get().getTimestamp(), LESSER);
|
||||
return firstCommit.isPresent() &&
|
||||
HoodieTimeline.compareTimestamps(instant, firstCommit.get().getTimestamp(), LESSER);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -165,7 +165,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
|
||||
visibleActiveCommitTimeline.lastInstant().get().getTimestamp())
|
||||
.map((Function<List<HoodieDataFile>, Optional<HoodieDataFile>>) fss -> {
|
||||
for (HoodieDataFile fs1 : fss) {
|
||||
if (visibleActiveCommitTimeline
|
||||
if (HoodieTimeline
|
||||
.compareTimestamps(fs1.getCommitTime(), maxCommitToReturn,
|
||||
HoodieTimeline.LESSER_OR_EQUAL)) {
|
||||
return Optional.of(fs1);
|
||||
@@ -248,7 +248,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
|
||||
.flatMap(fileStatus -> {
|
||||
HoodieDataFile dataFile = new HoodieDataFile(fileStatus);
|
||||
if (visibleActiveCommitTimeline.containsOrBeforeTimelineStarts(dataFile.getCommitTime())
|
||||
&& visibleActiveCommitTimeline
|
||||
&& HoodieTimeline
|
||||
.compareTimestamps(dataFile.getCommitTime(), maxCommitTime,
|
||||
HoodieTimeline.LESSER_OR_EQUAL)) {
|
||||
return Stream.of(Pair.of(dataFile.getFileId(), dataFile));
|
||||
|
||||
@@ -106,9 +106,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
|
||||
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
|
||||
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);
|
||||
|
||||
// FIXME(vc): This is ugly.. Need to fix the usage everywhere
|
||||
HoodieTimeline defTimeline = new HoodieDefaultTimeline();
|
||||
|
||||
LOG.info(String.format("About to read logs %s for base split %s, projecting cols %s",
|
||||
split.getDeltaFilePaths(), split.getPath(), projectionFields));
|
||||
for (String logFilePath: split.getDeltaFilePaths()) {
|
||||
@@ -119,7 +116,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
|
||||
GenericRecord rec = reader.next();
|
||||
String key = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
String commitTime = rec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
||||
if (defTimeline.compareTimestamps(commitTime, split.getMaxCommitTime(), HoodieTimeline.GREATER)) {
|
||||
if (HoodieTimeline.compareTimestamps(commitTime, split.getMaxCommitTime(), HoodieTimeline.GREATER)) {
|
||||
// stop reading this log file. we hit a record later than max known commit time.
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -133,8 +133,7 @@ public class HoodieSnapshotCopier implements Serializable {
|
||||
} else {
|
||||
String commitTime =
|
||||
FSUtils.getCommitFromCommitFile(commitFilePath.getName());
|
||||
return tableMetadata.getActiveTimeline().getCommitTimeline()
|
||||
.compareTimestamps(commitTime, latestCommitTimestamp, HoodieTimeline.LESSER_OR_EQUAL);
|
||||
return HoodieTimeline.compareTimestamps(commitTime, latestCommitTimestamp, HoodieTimeline.LESSER_OR_EQUAL);
|
||||
}
|
||||
});
|
||||
for (FileStatus commitStatus : commitFilesToCopy) {
|
||||
|
||||
Reference in New Issue
Block a user