getAllRawFileSlices() {
+ return fileSlices.values().stream();
+ }
+
+ public HoodieTimeline getTimeline() {
+ return timeline;
+ }
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
index 88acc4ae6..0820b57e8 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
@@ -22,14 +22,14 @@ import com.uber.hoodie.common.util.FSUtils;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
-import java.util.Optional;
+import java.util.Objects;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
- * Abstracts a single log file. Contains methods to extract metadata like the fileId, version and
- * extension from the log file path.
+ * Abstracts a single log file. Contains methods to extract metadata like the fileId, version and extension from the log
+ * file path.
*
* Also contains logic to roll-over the log file
*/
@@ -38,54 +38,68 @@ public class HoodieLogFile implements Serializable {
public static final String DELTA_EXTENSION = ".log";
public static final Integer LOGFILE_BASE_VERSION = 1;
- private final Path path;
- private Optional fileStatus;
+ private transient FileStatus fileStatus;
+ private final String pathStr;
+ private long fileLen;
public HoodieLogFile(FileStatus fileStatus) {
- this(fileStatus.getPath());
- this.fileStatus = Optional.of(fileStatus);
+ this.fileStatus = fileStatus;
+ this.pathStr = fileStatus.getPath().toString();
+ this.fileLen = fileStatus.getLen();
}
public HoodieLogFile(Path logPath) {
- this.path = logPath;
- this.fileStatus = Optional.empty();
+ this.fileStatus = null;
+ this.pathStr = logPath.toString();
+ this.fileLen = 0;
+ }
+
+ public HoodieLogFile(String logPathStr) {
+ this.fileStatus = null;
+ this.pathStr = logPathStr;
+ this.fileLen = -1;
}
public String getFileId() {
- return FSUtils.getFileIdFromLogPath(path);
+ return FSUtils.getFileIdFromLogPath(getPath());
}
public String getBaseCommitTime() {
- return FSUtils.getBaseCommitTimeFromLogPath(path);
+ return FSUtils.getBaseCommitTimeFromLogPath(getPath());
}
public int getLogVersion() {
- return FSUtils.getFileVersionFromLog(path);
+ return FSUtils.getFileVersionFromLog(getPath());
}
public String getFileExtension() {
- return FSUtils.getFileExtensionFromLog(path);
+ return FSUtils.getFileExtensionFromLog(getPath());
}
public Path getPath() {
- return path;
+ return new Path(pathStr);
}
public String getFileName() {
- return path.getName();
+ return getPath().getName();
}
- public Optional getFileStatus() {
+ public void setFileLen(long fileLen) {
+ this.fileLen = fileLen;
+ }
+
+ public long getFileSize() {
+ return fileLen;
+ }
+
+ public FileStatus getFileStatus() {
return fileStatus;
}
- public Optional getFileSize() {
- return fileStatus.map(FileStatus::getLen);
- }
-
public HoodieLogFile rollOver(FileSystem fs) throws IOException {
String fileId = getFileId();
String baseCommitTime = getBaseCommitTime();
+ Path path = getPath();
String extension = "." + FSUtils.getFileExtensionFromLog(path);
int newVersion = FSUtils
.computeNextLogVersion(fs, path.getParent(), fileId,
@@ -95,7 +109,16 @@ public class HoodieLogFile implements Serializable {
}
public static Comparator getBaseInstantAndLogVersionComparator() {
- return (o1, o2) -> {
+ return new BaseInstantAndLogVersionComparator();
+ }
+
+ /**
+ * Comparator to order log-files
+ */
+ private static class BaseInstantAndLogVersionComparator implements Comparator, Serializable {
+
+ @Override
+ public int compare(HoodieLogFile o1, HoodieLogFile o2) {
String baseInstantTime1 = o1.getBaseCommitTime();
String baseInstantTime2 = o2.getBaseCommitTime();
if (baseInstantTime1.equals(baseInstantTime2)) {
@@ -104,7 +127,7 @@ public class HoodieLogFile implements Serializable {
}
// reverse the order by base-commits
return baseInstantTime2.compareTo(baseInstantTime1);
- };
+ }
}
@Override
@@ -116,16 +139,19 @@ public class HoodieLogFile implements Serializable {
return false;
}
HoodieLogFile that = (HoodieLogFile) o;
- return path != null ? path.equals(that.path) : that.path == null;
+ return Objects.equals(pathStr, that.pathStr);
}
@Override
public int hashCode() {
- return path != null ? path.hashCode() : 0;
+ return Objects.hash(pathStr);
}
@Override
public String toString() {
- return "HoodieLogFile {" + path + '}';
+ return "HoodieLogFile{"
+ + "pathStr='" + pathStr + '\''
+ + ", fileLen=" + fileLen
+ + '}';
}
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
index 752368481..e74606c88 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
@@ -133,6 +133,11 @@ public class HoodieWriteStat implements Serializable {
*/
private long totalRollbackBlocks;
+ /**
+ * File Size as of close
+ */
+ private long fileSizeInBytes;
+
@Nullable
@JsonIgnore
private RuntimeStats runtimeStats;
@@ -285,6 +290,14 @@ public class HoodieWriteStat implements Serializable {
this.totalRollbackBlocks = totalRollbackBlocks;
}
+ public long getFileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ public void setFileSizeInBytes(long fileSizeInBytes) {
+ this.fileSizeInBytes = fileSizeInBytes;
+ }
+
@Nullable
public RuntimeStats getRuntimeStats() {
return runtimeStats;
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java
index 345f2906e..6779e6b4e 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableConfig.java
@@ -77,6 +77,10 @@ public class HoodieTableConfig implements Serializable {
this.props = props;
}
+ public HoodieTableConfig(Properties props) {
+ this.props = props;
+ }
+
/**
* For serailizing and de-serializing
*
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
index a2c990621..757078633 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
@@ -194,6 +194,16 @@ public class HoodieTableMetaClient implements Serializable {
return activeTimeline;
}
+ /**
+ * Reload ActiveTimeline and cache
+ *
+ * @return Active instants timeline
+ */
+ public synchronized HoodieActiveTimeline reloadActiveTimeline() {
+ activeTimeline = new HoodieActiveTimeline(this);
+ return activeTimeline;
+ }
+
/**
* Get the archived commits as a timeline. This is costly operation, as all data from the archived
* files are read. This should not be used, unless for historical debugging purposes
@@ -406,4 +416,20 @@ public class HoodieTableMetaClient implements Serializable {
sb.append('}');
return sb.toString();
}
+
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+
+ public void setMetaPath(String metaPath) {
+ this.metaPath = metaPath;
+ }
+
+ public void setActiveTimeline(HoodieActiveTimeline activeTimeline) {
+ this.activeTimeline = activeTimeline;
+ }
+
+ public void setTableConfig(HoodieTableConfig tableConfig) {
+ this.tableConfig = tableConfig;
+ }
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
index 6c8a20458..509de9c85 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTimeline.java
@@ -23,6 +23,7 @@ import com.uber.hoodie.common.util.StringUtils;
import java.io.Serializable;
import java.util.Optional;
import java.util.function.BiPredicate;
+import java.util.function.Predicate;
import java.util.stream.Stream;
/**
@@ -70,6 +71,8 @@ public interface HoodieTimeline extends Serializable {
String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION;
String RESTORE_EXTENSION = "." + RESTORE_ACTION;
+ String INVALID_INSTANT_TS = "0";
+
/**
* Filter this timeline to just include the in-flights
*
@@ -116,6 +119,11 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsAfter(String commitTime, int numCommits);
+ /**
+ * Custom Filter of Instants
+ */
+ HoodieTimeline filter(Predicate filter);
+
/**
* If the timeline has any instants
*
@@ -143,6 +151,13 @@ public interface HoodieTimeline extends Serializable {
*/
Optional lastInstant();
+
+ /**
+ * Get hash of timeline
+ * @return
+ */
+ String getTimelineHash();
+
/**
* @return nth completed instant going back from the last completed instant
*/
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/SyncableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/SyncableFileSystemView.java
new file mode 100644
index 000000000..378a5da57
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/SyncableFileSystemView.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table;
+
+/*
+ * A consolidated file-system view interface exposing both realtime and read-optimized views along with
+ * update operations.
+ */
+public interface SyncableFileSystemView extends TableFileSystemView, TableFileSystemView.ReadOptimizedView,
+ TableFileSystemView.RealtimeView {
+
+
+
+ /**
+ * Allow View to release resources and close
+ */
+ void close();
+
+ /**
+ * Reset View so that they can be refreshed
+ */
+ void reset();
+
+ /**
+ * Read the latest timeline and refresh the file-system view to match the current state of the file-system.
+ * The refresh can either be done incrementally (from reading file-slices in metadata files) or from scratch by
+ * reseting view storage
+ */
+ void sync();
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
index d0f2d87e0..994d9d3f6 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java
@@ -16,9 +16,13 @@
package com.uber.hoodie.common.table;
+import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup;
+import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.Option;
+import com.uber.hoodie.common.util.collection.Pair;
import java.util.List;
import java.util.stream.Stream;
@@ -30,55 +34,69 @@ import java.util.stream.Stream;
public interface TableFileSystemView {
/**
- * ReadOptimizedView - methods to provide a view of columnar data files only.
+ * ReadOptimizedView with methods to only access latest version of file for the instant(s) passed.
*/
- interface ReadOptimizedView {
+ interface ReadOptimizedViewWithLatestSlice {
/**
* Stream all the latest data files in the given partition
*/
Stream getLatestDataFiles(String partitionPath);
+ /**
+ * Get Latest data file for a partition and file-Id
+ */
+ Option getLatestDataFile(String partitionPath, String fileId);
+
/**
* Stream all the latest data files, in the file system view
*/
Stream getLatestDataFiles();
/**
- * Stream all the latest version data files in the given partition with precondition that
- * commitTime(file) before maxCommitTime
+ * Stream all the latest version data files in the given partition with precondition that commitTime(file) before
+ * maxCommitTime
*/
Stream getLatestDataFilesBeforeOrOn(String partitionPath,
String maxCommitTime);
- /**
- * Stream all the latest version data files in the given partition with precondition that
- * instant time of file matches passed in instant time.
- */
- Stream getLatestDataFilesOn(String partitionPath, String instantTime);
-
/**
* Stream all the latest data files pass
*/
Stream getLatestDataFilesInRange(List commitsToReturn);
+ }
+ /**
+ * ReadOptimizedView - methods to provide a view of columnar data files only.
+ */
+ interface ReadOptimizedView extends ReadOptimizedViewWithLatestSlice {
/**
* Stream all the data file versions grouped by FileId for a given partition
*/
Stream getAllDataFiles(String partitionPath);
+
+ /**
+ * Get the version of data file matching the instant time in the given partition
+ */
+ Option getDataFileOn(String partitionPath, String instantTime, String fileId);
+
}
/**
- * RealtimeView - methods to access a combination of columnar data files + log files with real
- * time data.
+ * RealtimeView with methods to only access latest version of file-slice for the instant(s) passed.
*/
- interface RealtimeView {
+ interface RealtimeViewWithLatestSlice {
/**
* Stream all the latest file slices in the given partition
*/
Stream getLatestFileSlices(String partitionPath);
+ /**
+ * Get Latest File Slice for a given fileId in a given partition
+ */
+ Option getLatestFileSlice(String partitionPath, String fileId);
+
/**
* Stream all the latest uncompacted file slices in the given partition
*/
@@ -106,15 +124,39 @@ public interface TableFileSystemView {
* Stream all the latest file slices, in the given range
*/
Stream getLatestFileSliceInRange(List commitsToReturn);
+ }
+
+ /**
+ * RealtimeView - methods to access a combination of columnar data files + log files with real time data.
+ */
+ interface RealtimeView extends RealtimeViewWithLatestSlice {
/**
* Stream all the file slices for a given partition, latest or not.
*/
Stream getAllFileSlices(String partitionPath);
+
}
/**
* Stream all the file groups for a given partition
*/
Stream getAllFileGroups(String partitionPath);
+
+ /**
+ * Return Pending Compaction Operations
+ *
+ * @return Pair>
+ */
+ Stream> getPendingCompactionOperations();
+
+ /**
+ * Last Known Instant on which the view is built
+ */
+ Option getLastInstant();
+
+ /**
+ * Timeline corresponding to the view
+ */
+ HoodieTimeline getTimeline();
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java
index f81f7d119..d34012e5f 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java
@@ -17,6 +17,7 @@
package com.uber.hoodie.common.table.timeline;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -58,7 +59,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION}));
private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class);
- private HoodieTableMetaClient metaClient;
+ protected HoodieTableMetaClient metaClient;
/**
* Returns next commit time in the {@link #COMMIT_FORMATTER} format.
@@ -71,9 +72,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
// Filter all the filter in the metapath and include only the extensions passed and
// convert them into HoodieInstant
try {
- this.instants = HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
- new Path(metaClient.getMetaPath()), includedExtensions);
- log.info("Loaded instants " + instants);
+ this.setInstants(HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
+ new Path(metaClient.getMetaPath()), includedExtensions));
+ log.info("Loaded instants " + getInstants());
} catch (IOException e) {
throw new HoodieIOException("Failed to scan metadata", e);
}
@@ -85,7 +86,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
public HoodieActiveTimeline(HoodieTableMetaClient metaClient) {
- this(metaClient, VALID_EXTENSIONS_IN_ACTIVE_TIMELINE);
+ this(metaClient,
+ new ImmutableSet.Builder()
+ .addAll(VALID_EXTENSIONS_IN_ACTIVE_TIMELINE).build());
}
/**
@@ -158,7 +161,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
* @param actions actions allowed in the timeline
*/
public HoodieTimeline getTimelineOfActions(Set actions) {
- return new HoodieDefaultTimeline(instants.stream().filter(s -> actions.contains(s.getAction())),
+ return new HoodieDefaultTimeline(getInstants().filter(s -> actions.contains(s.getAction())),
(Function> & Serializable) this::getInstantDetails);
}
@@ -195,9 +198,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
(Function> & Serializable) this::getInstantDetails);
}
-
protected Stream filterInstantsByAction(String action) {
- return instants.stream().filter(s -> s.getAction().equals(action));
+ return getInstants().filter(s -> s.getAction().equals(action));
}
public void createInflight(HoodieInstant instant) {
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java
index 1da29f669..ef24310fd 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java
@@ -61,9 +61,9 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
// This is okay because only tooling will load the archived commit timeline today
readCommits.put(key.toString(), Arrays.copyOf(val.getBytes(), val.getLength()));
}
- this.instants = readCommits.keySet().stream().map(
+ this.setInstants(readCommits.keySet().stream().map(
s -> new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s)).collect(
- Collectors.toList());
+ Collectors.toList()));
} catch (IOException e) {
throw new HoodieIOException(
"Could not load archived commit timeline from path " + archiveLogPath, e);
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
index 87c664c60..0f89bbea3 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieDefaultTimeline.java
@@ -17,11 +17,17 @@
package com.uber.hoodie.common.table.timeline;
import com.uber.hoodie.common.table.HoodieTimeline;
+import com.uber.hoodie.common.util.StringUtils;
+import com.uber.hoodie.exception.HoodieException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.commons.codec.binary.Hex;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -36,13 +42,29 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
private static final transient Logger log = LogManager.getLogger(HoodieDefaultTimeline.class);
- protected Function> details;
- protected List instants;
+ private static final String HASHING_ALGORITHM = "SHA-256";
+
+ protected transient Function> details;
+ private List instants;
+ private String timelineHash;
public HoodieDefaultTimeline(Stream instants,
Function> details) {
- this.instants = instants.collect(Collectors.toList());
this.details = details;
+ setInstants(instants.collect(Collectors.toList()));
+ }
+
+ public void setInstants(List instants) {
+ this.instants = instants;
+ final MessageDigest md;
+ try {
+ md = MessageDigest.getInstance(HASHING_ALGORITHM);
+ this.instants.stream().forEach(i -> md.update(
+ StringUtils.joinUsingDelim("_", i.getTimestamp(), i.getAction(), i.getState().name()).getBytes()));
+ } catch (NoSuchAlgorithmException nse) {
+ throw new HoodieException(nse);
+ }
+ this.timelineHash = new String(Hex.encodeHex(md.digest()));
}
/**
@@ -101,6 +123,11 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
.limit(numCommits), details);
}
+ @Override
+ public HoodieTimeline filter(Predicate filter) {
+ return new HoodieDefaultTimeline(instants.stream().filter(filter), details);
+ }
+
@Override
public boolean empty() {
return !instants.stream().findFirst().isPresent();
@@ -148,6 +175,11 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|| isBeforeTimelineStarts(instant);
}
+ @Override
+ public String getTimelineHash() {
+ return timelineHash;
+ }
+
@Override
public Stream getInstants() {
return instants.stream();
@@ -160,7 +192,6 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
&& HoodieTimeline.compareTimestamps(instant, firstCommit.get().getTimestamp(), LESSER);
}
-
@Override
public Optional getInstantDetails(HoodieInstant instant) {
return details.apply(instant);
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java
index f24eb9dce..4168471b5 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java
@@ -39,7 +39,9 @@ public class HoodieInstant implements Serializable {
// Inflight instant
INFLIGHT,
// Committed instant
- COMPLETED
+ COMPLETED,
+ // Invalid instant
+ INVALID
}
private State state = State.COMPLETED;
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/CompactionOpDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/CompactionOpDTO.java
new file mode 100644
index 000000000..d35876e40
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/CompactionOpDTO.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.uber.hoodie.common.model.CompactionOperation;
+import com.uber.hoodie.common.util.Option;
+import com.uber.hoodie.common.util.collection.Pair;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CompactionOpDTO {
+
+ @JsonProperty("baseInstant")
+ String baseInstantTime;
+
+ @JsonProperty("compactionInstant")
+ String compactionInstantTime;
+
+ @JsonProperty("dataFileInstant")
+ private String dataFileCommitTime;
+
+ @JsonProperty("deltaFiles")
+ private List deltaFilePaths;
+
+ @JsonProperty("baseFile")
+ private String dataFilePath;
+
+ @JsonProperty("id")
+ private String fileId;
+
+ @JsonProperty("partition")
+ private String partitionPath;
+
+ @JsonProperty("metrics")
+ private Map metrics;
+
+ public static CompactionOpDTO fromCompactionOperation(String compactionInstantTime,
+ CompactionOperation op) {
+ CompactionOpDTO dto = new CompactionOpDTO();
+ dto.fileId = op.getFileId();
+ dto.compactionInstantTime = compactionInstantTime;
+ dto.baseInstantTime = op.getBaseInstantTime();
+ dto.dataFileCommitTime = op.getDataFileCommitTime().orElse(null);
+ dto.dataFilePath = op.getDataFilePath().orElse(null);
+ dto.deltaFilePaths = new ArrayList<>(op.getDeltaFilePaths());
+ dto.partitionPath = op.getPartitionPath();
+ dto.metrics = op.getMetrics() == null ? new HashMap<>() : new HashMap<>(op.getMetrics());
+ return dto;
+ }
+
+ public static Pair toCompactionOperation(CompactionOpDTO dto) {
+ return Pair.of(dto.compactionInstantTime, new CompactionOperation(dto.fileId, dto.partitionPath,
+ dto.baseInstantTime, Option.ofNullable(dto.dataFileCommitTime), dto.deltaFilePaths,
+ Option.ofNullable(dto.dataFilePath), dto.metrics));
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/DataFileDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/DataFileDTO.java
new file mode 100644
index 000000000..aa16e42b1
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/DataFileDTO.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.uber.hoodie.common.model.HoodieDataFile;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DataFileDTO {
+
+ @JsonProperty("fileStatus")
+ private FileStatusDTO fileStatus;
+ @JsonProperty("fullPath")
+ private String fullPath;
+ @JsonProperty("fileLen")
+ private long fileLen;
+
+ public static HoodieDataFile toHoodieDataFile(DataFileDTO dto) {
+ if (null == dto) {
+ return null;
+ }
+
+ HoodieDataFile dataFile = null;
+ if (null != dto.fileStatus) {
+ dataFile = new HoodieDataFile(FileStatusDTO.toFileStatus(dto.fileStatus));
+ } else {
+ dataFile = new HoodieDataFile(dto.fullPath);
+ dataFile.setFileLen(dto.fileLen);
+ }
+ return dataFile;
+ }
+
+ public static DataFileDTO fromHoodieDataFile(HoodieDataFile dataFile) {
+ if (null == dataFile) {
+ return null;
+ }
+
+ DataFileDTO dto = new DataFileDTO();
+ dto.fileStatus = FileStatusDTO.fromFileStatus(dataFile.getFileStatus());
+ dto.fullPath = dataFile.getPath();
+ dto.fileLen = dataFile.getFileLen();
+ return dto;
+ }
+
+}
\ No newline at end of file
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FSPermissionDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FSPermissionDTO.java
new file mode 100644
index 000000000..92be3055f
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FSPermissionDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * A serializable FS Permission
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FSPermissionDTO implements Serializable {
+
+ @JsonProperty("useraction")
+ FsAction useraction;
+
+ @JsonProperty("groupaction")
+ FsAction groupaction;
+
+ @JsonProperty("otheraction")
+ FsAction otheraction;
+
+ @JsonProperty("stickyBit")
+ boolean stickyBit;
+
+ public static FSPermissionDTO fromFsPermission(FsPermission permission) {
+ if (null == permission) {
+ return null;
+ }
+ FSPermissionDTO dto = new FSPermissionDTO();
+ dto.useraction = permission.getUserAction();
+ dto.groupaction = permission.getGroupAction();
+ dto.otheraction = permission.getOtherAction();
+ dto.stickyBit = permission.getStickyBit();
+ return dto;
+ }
+
+ public static FsPermission fromFsPermissionDTO(FSPermissionDTO dto) {
+ if (null == dto) {
+ return null;
+ }
+ return new FsPermission(dto.useraction, dto.groupaction, dto.otheraction, dto.stickyBit);
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileGroupDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileGroupDTO.java
new file mode 100644
index 000000000..9049485b6
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileGroupDTO.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.uber.hoodie.common.model.HoodieFileGroup;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FileGroupDTO {
+
+ @JsonProperty("partition")
+ String partition;
+
+ @JsonProperty("fileId")
+ String id;
+
+ @JsonProperty("slices")
+ List slices;
+
+ @JsonProperty("timeline")
+ TimelineDTO timeline;
+
+ public static FileGroupDTO fromFileGroup(HoodieFileGroup fileGroup) {
+ FileGroupDTO dto = new FileGroupDTO();
+ dto.partition = fileGroup.getPartitionPath();
+ dto.id = fileGroup.getFileGroupId().getFileId();
+ dto.slices = fileGroup.getAllRawFileSlices().map(FileSliceDTO::fromFileSlice).collect(Collectors.toList());
+ dto.timeline = TimelineDTO.fromTimeline(fileGroup.getTimeline());
+ return dto;
+ }
+
+ public static HoodieFileGroup toFileGroup(FileGroupDTO dto, HoodieTableMetaClient metaClient) {
+ HoodieFileGroup fileGroup = new HoodieFileGroup(dto.partition, dto.id,
+ TimelineDTO.toTimeline(dto.timeline, metaClient));
+ dto.slices.stream().map(FileSliceDTO::toFileSlice).forEach(fileSlice -> fileGroup.addFileSlice(fileSlice));
+ return fileGroup;
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FilePathDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FilePathDTO.java
new file mode 100644
index 000000000..16c510de0
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FilePathDTO.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.hadoop.fs.Path;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FilePathDTO {
+
+ @JsonProperty("uri")
+ private String uri;
+
+ public static FilePathDTO fromPath(Path path) {
+ if (null == path) {
+ return null;
+ }
+ FilePathDTO dto = new FilePathDTO();
+ dto.uri = path.toUri().toString();
+ return dto;
+ }
+
+ public static Path toPath(FilePathDTO dto) {
+ if (null == dto) {
+ return null;
+ }
+
+ try {
+ return new Path(new URI(dto.uri));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileSliceDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileSliceDTO.java
new file mode 100644
index 000000000..34908a6f4
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileSliceDTO.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.uber.hoodie.common.model.FileSlice;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FileSliceDTO {
+
+ @JsonProperty("dataFile")
+ DataFileDTO dataFile;
+ @JsonProperty("logFiles")
+ List logFiles;
+ @JsonProperty("partition")
+ private String partitionPath;
+ @JsonProperty("fileId")
+ private String fileId;
+ @JsonProperty("baseInstant")
+ private String baseInstantTime;
+
+ public static FileSliceDTO fromFileSlice(FileSlice slice) {
+ FileSliceDTO dto = new FileSliceDTO();
+ dto.partitionPath = slice.getPartitionPath();
+ dto.baseInstantTime = slice.getBaseInstantTime();
+ dto.fileId = slice.getFileId();
+ dto.dataFile = slice.getDataFile().map(DataFileDTO::fromHoodieDataFile).orElse(null);
+ dto.logFiles = slice.getLogFiles().map(LogFileDTO::fromHoodieLogFile).collect(Collectors.toList());
+ return dto;
+ }
+
+ public static FileSlice toFileSlice(FileSliceDTO dto) {
+ FileSlice slice = new FileSlice(dto.partitionPath, dto.baseInstantTime, dto.fileId);
+ slice.setDataFile(DataFileDTO.toHoodieDataFile(dto.dataFile));
+ dto.logFiles.stream().forEach(lf -> slice.addLogFile(LogFileDTO.toHoodieLogFile(lf)));
+ return slice;
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileStatusDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileStatusDTO.java
new file mode 100644
index 000000000..c09a5d2e2
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/FileStatusDTO.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.uber.hoodie.exception.HoodieException;
+import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FileStatusDTO {
+
+ @JsonProperty("path")
+ FilePathDTO path;
+ @JsonProperty("length")
+ long length;
+ @JsonProperty("isdir")
+ boolean isdir;
+ @JsonProperty("blockReplication")
+ short blockReplication;
+ @JsonProperty("blocksize")
+ long blocksize;
+ @JsonProperty("modificationTime")
+ long modificationTime;
+ @JsonProperty("accessTime")
+ long accessTime;
+ @JsonProperty("permission")
+ FSPermissionDTO permission;
+ @JsonProperty("owner")
+ String owner;
+ @JsonProperty("group")
+ String group;
+ @JsonProperty("symlink")
+ FilePathDTO symlink;
+
+ public static FileStatusDTO fromFileStatus(FileStatus fileStatus) {
+ if (null == fileStatus) {
+ return null;
+ }
+
+ FileStatusDTO dto = new FileStatusDTO();
+ try {
+ dto.path = FilePathDTO.fromPath(fileStatus.getPath());
+ dto.length = fileStatus.getLen();
+ dto.isdir = fileStatus.isDirectory();
+ dto.blockReplication = fileStatus.getReplication();
+ dto.blocksize = fileStatus.getBlockSize();
+ dto.modificationTime = fileStatus.getModificationTime();
+ dto.accessTime = fileStatus.getModificationTime();
+ dto.symlink = fileStatus.isSymlink() ? FilePathDTO.fromPath(fileStatus.getSymlink()) : null;
+ safeReadAndSetMetadata(dto, fileStatus);
+ } catch (IOException ioe) {
+ throw new HoodieException(ioe);
+ }
+ return dto;
+ }
+
+ /**
+ * Used to safely handle FileStatus calls which might fail on some FileSystem implementation.
+ * (DeprecatedLocalFileSystem)
+ */
+ private static void safeReadAndSetMetadata(FileStatusDTO dto, FileStatus fileStatus) {
+ try {
+ dto.owner = fileStatus.getOwner();
+ dto.group = fileStatus.getGroup();
+ dto.permission = FSPermissionDTO.fromFsPermission(fileStatus.getPermission());
+ } catch (IllegalArgumentException ie) {
+ // Deprecated File System (testing) does not work well with this call
+ // skipping
+ }
+ }
+
+ public static FileStatus toFileStatus(FileStatusDTO dto) {
+ if (null == dto) {
+ return null;
+ }
+
+ return new FileStatus(dto.length, dto.isdir, dto.blockReplication, dto.blocksize, dto.modificationTime,
+ dto.accessTime, FSPermissionDTO.fromFsPermissionDTO(dto.permission), dto.owner, dto.group,
+ FilePathDTO.toPath(dto.symlink), FilePathDTO.toPath(dto.path));
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/InstantDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/InstantDTO.java
new file mode 100644
index 000000000..ada5a6c75
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/InstantDTO.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.uber.hoodie.common.table.timeline.HoodieInstant;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class InstantDTO {
+
+ @JsonProperty("action")
+ String action;
+ @JsonProperty("ts")
+ String timestamp;
+ @JsonProperty("state")
+ String state;
+
+ public static InstantDTO fromInstant(HoodieInstant instant) {
+ if (null == instant) {
+ return null;
+ }
+
+ InstantDTO dto = new InstantDTO();
+ dto.action = instant.getAction();
+ dto.timestamp = instant.getTimestamp();
+ dto.state = instant.getState().toString();
+ return dto;
+ }
+
+ public static HoodieInstant toInstant(InstantDTO dto) {
+ if (null == dto) {
+ return null;
+ }
+
+ return new HoodieInstant(HoodieInstant.State.valueOf(dto.state), dto.action, dto.timestamp);
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/LogFileDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/LogFileDTO.java
new file mode 100644
index 000000000..72cd5e9bc
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/LogFileDTO.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.uber.hoodie.common.model.HoodieLogFile;
+import org.apache.hadoop.fs.FileStatus;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LogFileDTO {
+
+ @JsonProperty("fileStatus")
+ private FileStatusDTO fileStatus;
+ @JsonProperty("path")
+ private String pathStr;
+ @JsonProperty("len")
+ private long fileLen;
+
+ public static HoodieLogFile toHoodieLogFile(LogFileDTO dto) {
+ FileStatus status = FileStatusDTO.toFileStatus(dto.fileStatus);
+ HoodieLogFile logFile = (status == null) ? new HoodieLogFile(dto.pathStr) : new HoodieLogFile(status);
+ logFile.setFileLen(dto.fileLen);
+ return logFile;
+ }
+
+ public static LogFileDTO fromHoodieLogFile(HoodieLogFile dataFile) {
+ LogFileDTO logFile = new LogFileDTO();
+ logFile.fileLen = dataFile.getFileSize();
+ logFile.pathStr = dataFile.getPath().toString();
+ logFile.fileStatus = FileStatusDTO.fromFileStatus(dataFile.getFileStatus());
+ return logFile;
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/TimelineDTO.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/TimelineDTO.java
new file mode 100644
index 000000000..d3c6403ea
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/dto/TimelineDTO.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.table.HoodieTimeline;
+import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TimelineDTO {
+
+ @JsonProperty("instants")
+ List instants;
+
+ public static TimelineDTO fromTimeline(HoodieTimeline timeline) {
+ TimelineDTO dto = new TimelineDTO();
+ dto.instants = timeline.getInstants().map(InstantDTO::fromInstant).collect(Collectors.toList());
+ return dto;
+ }
+
+ public static HoodieTimeline toTimeline(TimelineDTO dto, HoodieTableMetaClient metaClient) {
+ //TODO: For Now, we will assume, only active-timeline will be transferred.
+ return new HoodieDefaultTimeline(dto.instants.stream().map(InstantDTO::toInstant),
+ metaClient.getActiveTimeline()::getInstantDetails);
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
new file mode 100644
index 000000000..17f6b5687
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/AbstractTableFileSystemView.java
@@ -0,0 +1,823 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.table.view;
+
+import com.google.common.base.Preconditions;
+import com.uber.hoodie.common.model.CompactionOperation;
+import com.uber.hoodie.common.model.FileSlice;
+import com.uber.hoodie.common.model.HoodieDataFile;
+import com.uber.hoodie.common.model.HoodieFileGroup;
+import com.uber.hoodie.common.model.HoodieFileGroupId;
+import com.uber.hoodie.common.model.HoodieLogFile;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.table.HoodieTimeline;
+import com.uber.hoodie.common.table.SyncableFileSystemView;
+import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.CompactionUtils;
+import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieTimer;
+import com.uber.hoodie.common.util.Option;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.exception.HoodieIOException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Common thread-safe implementation for multiple TableFileSystemView Implementations.
+ * Provides uniform handling of
+ * (a) Loading file-system views from underlying file-system
+ * (b) Pending compaction operations and changing file-system views based on that
+ * (c) Thread-safety in loading and managing file system views for this dataset.
+ * (d) resetting file-system views
+ * The actual mechanism of fetching file slices from different view storages is delegated to sub-classes.
+ */
+public abstract class AbstractTableFileSystemView implements SyncableFileSystemView, Serializable {
+
+ private static Logger log = LogManager.getLogger(AbstractTableFileSystemView.class);
+
+ protected HoodieTableMetaClient metaClient;
+
+ // This is the commits that will be visible for all views extending this view
+ protected HoodieTimeline visibleActiveTimeline;
+
+ // Used to concurrently load and populate partition views
+ private ConcurrentHashMap addedPartitions = new ConcurrentHashMap<>(4096);
+
+ // Locks to control concurrency. Sync operations use write-lock blocking all fetch operations.
+ // For the common-case, we allow concurrent read of single or multiple partitions
+ private final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock();
+ private final ReadLock readLock = globalLock.readLock();
+ private final WriteLock writeLock = globalLock.writeLock();
+
+ private String getPartitionPathFromFilePath(String fullPath) {
+ return FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), new Path(fullPath).getParent());
+ }
+
+ /**
+ * Initialize the view.
+ */
+ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) {
+ this.metaClient = metaClient;
+ this.visibleActiveTimeline = visibleActiveTimeline;
+ // Load Pending Compaction Operations
+ resetPendingCompactionOperations(
+ CompactionUtils.getAllPendingCompactionOperations(metaClient).values()
+ .stream().map(e -> Pair.of(e.getKey(),
+ CompactionOperation.convertFromAvroRecordInstance(e.getValue()))));
+ }
+
+ /**
+ * Adds the provided statuses into the file system view, and also caches it inside this object.
+ */
+ protected List addFilesToView(FileStatus[] statuses) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List fileGroups = buildFileGroups(statuses, visibleActiveTimeline, true);
+ long fgBuildTimeTakenMs = timer.endTimer();
+ timer.startTimer();
+ // Group by partition for efficient updates for both InMemory and DiskBased stuctures.
+ fileGroups.stream().collect(Collectors.groupingBy(HoodieFileGroup::getPartitionPath)).entrySet()
+ .forEach(entry -> {
+ String partition = entry.getKey();
+ if (!isPartitionAvailableInStore(partition)) {
+ storePartitionView(partition, entry.getValue());
+ }
+ });
+ long storePartitionsTs = timer.endTimer();
+ log.info("addFilesToView: NumFiles=" + statuses.length + ", FileGroupsCreationTime=" + fgBuildTimeTakenMs
+ + ", StoreTimeTaken=" + storePartitionsTs);
+ return fileGroups;
+ }
+
+ /**
+ * Build FileGroups from passed in file-status
+ */
+ protected List buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline,
+ boolean addPendingCompactionFileSlice) {
+ return buildFileGroups(convertFileStatusesToDataFiles(statuses), convertFileStatusesToLogFiles(statuses), timeline,
+ addPendingCompactionFileSlice);
+ }
+
+ protected List buildFileGroups(Stream dataFileStream,
+ Stream logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
+
+ Map, List> dataFiles = dataFileStream
+ .collect(Collectors.groupingBy((dataFile) -> {
+ String partitionPathStr = getPartitionPathFromFilePath(dataFile.getPath());
+ return Pair.of(partitionPathStr, dataFile.getFileId());
+ }));
+
+ Map, List> logFiles = logFileStream
+ .collect(Collectors.groupingBy((logFile) -> {
+ String partitionPathStr = FSUtils.getRelativePartitionPath(
+ new Path(metaClient.getBasePath()),
+ logFile.getPath().getParent());
+ return Pair.of(partitionPathStr, logFile.getFileId());
+ }));
+
+ Set> fileIdSet = new HashSet<>(dataFiles.keySet());
+ fileIdSet.addAll(logFiles.keySet());
+
+ List fileGroups = new ArrayList<>();
+ fileIdSet.forEach(pair -> {
+ String fileId = pair.getValue();
+ HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline);
+ if (dataFiles.containsKey(pair)) {
+ dataFiles.get(pair).forEach(group::addDataFile);
+ }
+ if (logFiles.containsKey(pair)) {
+ logFiles.get(pair).forEach(group::addLogFile);
+ }
+ if (addPendingCompactionFileSlice) {
+ Option> pendingCompaction =
+ getPendingCompactionOperationWithInstant(group.getFileGroupId());
+ if (pendingCompaction.isPresent()) {
+ // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears
+ // so that any new ingestion uses the correct base-instant
+ group.addNewFileSliceAtInstant(pendingCompaction.get().getKey());
+ }
+ }
+ fileGroups.add(group);
+ });
+
+ return fileGroups;
+ }
+
+ /**
+ * Clears the partition Map and reset view states
+ */
+ public final void reset() {
+ try {
+ writeLock.lock();
+
+ addedPartitions.clear();
+ resetViewState();
+
+ // Initialize with new Hoodie timeline.
+ init(metaClient, visibleActiveTimeline);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Allows all view metadata in file system view storage to be reset by subclasses
+ */
+ protected abstract void resetViewState();
+
+ /**
+ * Allows lazily loading the partitions if needed
+ *
+ * @param partition partition to be loaded if not present
+ */
+ private void ensurePartitionLoadedCorrectly(String partition) {
+
+ Preconditions.checkArgument(!isClosed(), "View is already closed");
+
+ // ensure we list files only once even in the face of concurrency
+ addedPartitions.computeIfAbsent(partition, (partitionPathStr) -> {
+ long beginTs = System.currentTimeMillis();
+ if (!isPartitionAvailableInStore(partitionPathStr)) {
+ // Not loaded yet
+ try {
+ log.info("Building file system view for partition (" + partitionPathStr + ")");
+
+ // Create the path if it does not exist already
+ Path partitionPath = new Path(metaClient.getBasePath(), partitionPathStr);
+ FSUtils.createPathIfNotExists(metaClient.getFs(), partitionPath);
+ long beginLsTs = System.currentTimeMillis();
+ FileStatus[] statuses = metaClient.getFs().listStatus(partitionPath);
+ long endLsTs = System.currentTimeMillis();
+ log.info("#files found in partition (" + partitionPathStr + ") =" + statuses.length
+ + ", Time taken =" + (endLsTs - beginLsTs));
+ List groups = addFilesToView(statuses);
+
+ if (groups.isEmpty()) {
+ storePartitionView(partitionPathStr, new ArrayList<>());
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to list data files in partition " + partitionPathStr, e);
+ }
+ } else {
+ log.debug("View already built for Partition :" + partitionPathStr + ", FOUND is ");
+ }
+ long endTs = System.currentTimeMillis();
+ log.info("Time to load partition (" + partitionPathStr + ") =" + (endTs - beginTs));
+ return true;
+ });
+ }
+
+ /**
+ * Helper to convert file-status to data-files
+ *
+ * @param statuses List of Fole-Status
+ */
+ private Stream convertFileStatusesToDataFiles(FileStatus[] statuses) {
+ Predicate roFilePredicate = fileStatus ->
+ fileStatus.getPath().getName()
+ .contains(metaClient.getTableConfig().getROFileFormat().getFileExtension());
+ return Arrays.stream(statuses).filter(roFilePredicate).map(HoodieDataFile::new);
+ }
+
+ /**
+ * Helper to convert file-status to log-files
+ *
+ * @param statuses List of FIle-Status
+ */
+ private Stream convertFileStatusesToLogFiles(FileStatus[] statuses) {
+ Predicate rtFilePredicate = fileStatus ->
+ fileStatus.getPath().getName()
+ .contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension());
+ return Arrays.stream(statuses).filter(rtFilePredicate).map(HoodieLogFile::new);
+ }
+
+ /**
+ * With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore those
+ * data-files
+ *
+ * @param dataFile Data File
+ */
+ protected boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) {
+ final String partitionPath = getPartitionPathFromFilePath(dataFile.getPath());
+
+ Option> compactionWithInstantTime =
+ getPendingCompactionOperationWithInstant(new HoodieFileGroupId(partitionPath, dataFile.getFileId()));
+ return (compactionWithInstantTime.isPresent()) && (null != compactionWithInstantTime.get().getKey())
+ && dataFile.getCommitTime().equals(compactionWithInstantTime.get().getKey());
+ }
+
+ /**
+ * Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction
+ * Instant
+ *
+ * @param fileSlice File Slice
+ */
+ protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
+ Option> compactionWithInstantTime =
+ getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId());
+ return (compactionWithInstantTime.isPresent())
+ && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.get().getKey());
+ }
+
+ /**
+ * With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore those
+ * data-files
+ *
+ * @param fileSlice File Slice
+ */
+ protected FileSlice filterDataFileAfterPendingCompaction(FileSlice fileSlice) {
+ if (isFileSliceAfterPendingCompaction(fileSlice)) {
+ // Data file is filtered out of the file-slice as the corresponding compaction
+ // instant not completed yet.
+ FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(),
+ fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+ fileSlice.getLogFiles().forEach(transformed::addLogFile);
+ return transformed;
+ }
+ return fileSlice;
+ }
+
+ @Override
+ public final Stream> getPendingCompactionOperations() {
+ try {
+ readLock.lock();
+ return fetchPendingCompactionOperations();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestDataFiles(String partitionStr) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchLatestDataFiles(partitionPath);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestDataFiles() {
+ try {
+ readLock.lock();
+ return fetchLatestDataFiles();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestDataFilesBeforeOrOn(String partitionStr, String maxCommitTime) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchAllStoredFileGroups(partitionPath)
+ .map(fileGroup -> fileGroup.getAllDataFiles()
+ .filter(dataFile ->
+ HoodieTimeline.compareTimestamps(dataFile.getCommitTime(),
+ maxCommitTime,
+ HoodieTimeline.LESSER_OR_EQUAL))
+ .filter(df -> !isDataFileDueToPendingCompaction(df))
+ .findFirst())
+ .filter(Optional::isPresent)
+ .map(Optional::get);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Option getDataFileOn(String partitionStr, String instantTime, String fileId) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchHoodieFileGroup(partitionPath, fileId)
+ .map(fileGroup -> fileGroup.getAllDataFiles()
+ .filter(dataFile ->
+ HoodieTimeline.compareTimestamps(dataFile.getCommitTime(),
+ instantTime, HoodieTimeline.EQUAL))
+ .filter(df -> !isDataFileDueToPendingCompaction(df))
+ .findFirst().orElse(null));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get Latest data file for a partition and file-Id
+ */
+ public final Option getLatestDataFile(String partitionStr, String fileId) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchLatestDataFile(partitionPath, fileId);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestDataFilesInRange(List commitsToReturn) {
+ try {
+ readLock.lock();
+ return fetchAllStoredFileGroups().map(fileGroup -> {
+ return fileGroup.getAllDataFiles()
+ .filter(dataFile -> commitsToReturn.contains(dataFile.getCommitTime())
+ && !isDataFileDueToPendingCompaction(dataFile))
+ .findFirst();
+ }).filter(Optional::isPresent).map(Optional::get);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getAllDataFiles(String partitionStr) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchAllDataFiles(partitionPath)
+ .filter(df -> visibleActiveTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
+ .filter(df -> !isDataFileDueToPendingCompaction(df));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestFileSlices(String partitionStr) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchLatestFileSlices(partitionPath).map(fs -> filterDataFileAfterPendingCompaction(fs));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get Latest File Slice for a given fileId in a given partition
+ */
+ public final Option getLatestFileSlice(String partitionStr, String fileId) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ Option fs = fetchLatestFileSlice(partitionPath, fileId);
+ return fs.map(f -> filterDataFileAfterPendingCompaction(f));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestUnCompactedFileSlices(String partitionStr) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchAllStoredFileGroups(partitionPath)
+ .map(fileGroup -> {
+ FileSlice fileSlice = fileGroup.getLatestFileSlice().get();
+ // if the file-group is under compaction, pick the latest before compaction instant time.
+ Option> compactionWithInstantPair =
+ getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId());
+ if (compactionWithInstantPair.isPresent()) {
+ String compactionInstantTime = compactionWithInstantPair.get().getLeft();
+ return fileGroup.getLatestFileSliceBefore(compactionInstantTime);
+ }
+ return Optional.of(fileSlice);
+ })
+ .map(Optional::get);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime) {
+ try {
+ readLock.lock();
+ String partitionPath = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
+ .map(fs -> filterDataFileAfterPendingCompaction(fs));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestMergedFileSlicesBeforeOrOn(String partitionStr, String maxInstantTime) {
+ try {
+ readLock.lock();
+ String partition = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partition);
+ return fetchAllStoredFileGroups(partition)
+ .map(fileGroup -> {
+ Optional fileSlice = fileGroup.getLatestFileSliceBeforeOrOn(maxInstantTime);
+ // if the file-group is under construction, pick the latest before compaction instant time.
+ if (fileSlice.isPresent()) {
+ fileSlice = Optional.of(fetchMergedFileSlice(fileGroup, fileSlice.get()));
+ }
+ return fileSlice;
+ })
+ .filter(Optional::isPresent)
+ .map(Optional::get);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getLatestFileSliceInRange(List commitsToReturn) {
+ try {
+ readLock.lock();
+ return fetchLatestFileSliceInRange(commitsToReturn);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public final Stream getAllFileSlices(String partitionStr) {
+ try {
+ readLock.lock();
+ String partition = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partition);
+ return fetchAllFileSlices(partition);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Ensure there is consistency in handling trailing slash in partition-path. Always trim it which is what is done in
+ * other places.
+ */
+ private String formatPartitionKey(String partitionStr) {
+ return partitionStr.endsWith("/") ? partitionStr.substring(0, partitionStr.length() - 1) : partitionStr;
+ }
+
+ @Override
+ public final Stream getAllFileGroups(String partitionStr) {
+ try {
+ readLock.lock();
+ // Ensure there is consistency in handling trailing slash in partition-path. Always trim it which is what is done
+ // in other places.
+ String partition = formatPartitionKey(partitionStr);
+ ensurePartitionLoadedCorrectly(partition);
+ return fetchAllStoredFileGroups(partition);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ // Fetch APIs to be implemented by concrete sub-classes
+
+ /**
+ * Check if there is an outstanding compaction scheduled for this file
+ *
+ * @param fgId File-Group Id
+ * @return true if there is a pending compaction, false otherwise
+ */
+ protected abstract boolean isPendingCompactionScheduledForFileId(HoodieFileGroupId fgId);
+
+ /**
+ * resets the pending compaction operation and overwrite with the new list
+ *
+ * @param operations Pending Compaction Operations
+ */
+ abstract void resetPendingCompactionOperations(Stream> operations);
+
+ /**
+ * Add pending compaction operations to store
+ *
+ * @param operations Pending compaction operations to be added
+ */
+ abstract void addPendingCompactionOperations(Stream