metrics = super.captureMetrics(config, dataFile, partitionPath, logFiles);
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
index f1f277bad..9690e4446 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -84,7 +84,9 @@ import java.util.stream.Collectors;
import scala.Tuple2;
/**
- * Implementation of a very heavily read-optimized Hoodie Table where.
+ * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
+ * zero read amplification.
+ *
*
* INSERTS - Produce new files, block aligned to desired size (or) Merge with the smallest existing file, to expand it
*
@@ -181,7 +183,7 @@ public class HoodieCopyOnWriteTable extends Hoodi
}
public Iterator> handleUpdate(String commitTime, String fileId,
- Map> keyToNewRecords, HoodieDataFile oldDataFile) throws IOException {
+ Map> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, commitTime, fileId);
@@ -223,7 +225,7 @@ public class HoodieCopyOnWriteTable extends Hoodi
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId,
- Map> keyToNewRecords, HoodieDataFile dataFileToBeMerged) {
+ Map> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged);
}
@@ -685,10 +687,10 @@ public class HoodieCopyOnWriteTable extends Hoodi
if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
- List allFiles = getROFileSystemView()
- .getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
+ List allFiles = getBaseFileOnlyView()
+ .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
- for (HoodieDataFile file : allFiles) {
+ for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
index a654fcbf2..754b0ac8a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java
@@ -40,7 +40,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.func.MergeOnReadLazyInsertIterable;
import org.apache.hudi.io.HoodieAppendHandle;
-import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor;
+import org.apache.hudi.io.compact.HoodieMergeOnReadTableCompactor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -61,7 +61,8 @@ import java.util.Objects;
import java.util.stream.Collectors;
/**
- * Implementation of a more real-time read-optimized Hoodie Table where
+ * Implementation of a more real-time Hoodie Table the provides tradeoffs on read and write cost/amplification.
+ *
*
* INSERTS - Same as HoodieCopyOnWriteTable - Produce new files, block aligned to desired size (or) Merge with the
* smallest existing file, to expand it
@@ -142,10 +143,10 @@ public class HoodieMergeOnReadTable extends Hoodi
}
LOG.info("Compacting merge on read table " + config.getBasePath());
- HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor();
+ HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.generateCompactionPlan(jsc, this, config, instantTime,
- ((SyncableFileSystemView) getRTFileSystemView()).getPendingCompactionOperations()
+ ((SyncableFileSystemView) getSliceView()).getPendingCompactionOperations()
.map(instantTimeCompactionopPair -> instantTimeCompactionopPair.getValue().getFileGroupId())
.collect(Collectors.toSet()));
@@ -157,7 +158,7 @@ public class HoodieMergeOnReadTable extends Hoodi
@Override
public JavaRDD compact(JavaSparkContext jsc, String compactionInstantTime,
HoodieCompactionPlan compactionPlan) {
- HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor();
+ HoodieMergeOnReadTableCompactor compactor = new HoodieMergeOnReadTableCompactor();
try {
return compactor.compact(jsc, compactionPlan, this, config, compactionInstantTime);
} catch (IOException e) {
@@ -344,10 +345,10 @@ public class HoodieMergeOnReadTable extends Hoodi
if (!index.canIndexLogFiles()) {
// TODO : choose last N small files since there can be multiple small files written to a single partition
// by different spark partitions in a single batch
- Option smallFileSlice = Option.fromJavaOptional(getRTFileSystemView()
+ Option smallFileSlice = Option.fromJavaOptional(getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
- .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getDataFile().get().getFileSize() < config.getParquetSmallFileLimit())
- .min((FileSlice left, FileSlice right) -> left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize() ? -1 : 1));
+ .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit())
+ .min((FileSlice left, FileSlice right) -> left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
if (smallFileSlice.isPresent()) {
allSmallFileSlices.add(smallFileSlice.get());
}
@@ -355,7 +356,7 @@ public class HoodieMergeOnReadTable extends Hoodi
// If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction.
List allFileSlices =
- getRTFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
+ getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
@@ -366,9 +367,9 @@ public class HoodieMergeOnReadTable extends Hoodi
// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : allSmallFileSlices) {
SmallFile sf = new SmallFile();
- if (smallFileSlice.getDataFile().isPresent()) {
+ if (smallFileSlice.getBaseFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
- String filename = smallFileSlice.getDataFile().get().getFileName();
+ String filename = smallFileSlice.getBaseFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
@@ -394,10 +395,10 @@ public class HoodieMergeOnReadTable extends Hoodi
}
private long getTotalFileSize(FileSlice fileSlice) {
- if (!fileSlice.getDataFile().isPresent()) {
+ if (!fileSlice.getBaseFile().isPresent()) {
return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
} else {
- return fileSlice.getDataFile().get().getFileSize()
+ return fileSlice.getBaseFile().get().getFileSize()
+ convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
}
}
@@ -428,7 +429,7 @@ public class HoodieMergeOnReadTable extends Hoodi
// used to write the new log files. In this case, the commit time for the log file is the compaction requested time.
// But the index (global) might store the baseCommit of the parquet and not the requested, hence get the
// baseCommit always by listing the file slice
- Map fileIdToBaseCommitTimeForLogMap = this.getRTFileSystemView().getLatestFileSlices(partitionPath)
+ Map fileIdToBaseCommitTimeForLogMap = this.getSliceView().getLatestFileSlices(partitionPath)
.collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime));
return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index d2f571564..2762048eb 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -33,6 +33,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.TableFileSystemView;
+import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
+import org.apache.hudi.common.table.TableFileSystemView.SliceView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewManager;
@@ -145,16 +147,16 @@ public abstract class HoodieTable implements Seri
}
/**
- * Get the read optimized view of the file system for this table.
+ * Get the base file only view of the file system for this table.
*/
- public TableFileSystemView.ReadOptimizedView getROFileSystemView() {
+ public BaseFileOnlyView getBaseFileOnlyView() {
return getViewManager().getFileSystemView(metaClient.getBasePath());
}
/**
- * Get the real time view of the file system for this table.
+ * Get the full view of the file system for this table.
*/
- public TableFileSystemView.RealtimeView getRTFileSystemView() {
+ public SliceView getSliceView() {
return getViewManager().getFileSystemView(metaClient.getBasePath());
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java
index d3d79545e..4f8fbb1f4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestAsyncCompaction.java
@@ -23,7 +23,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
@@ -402,7 +402,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
assertEquals("Expect baseInstant to match compaction Instant", fileSlice.getBaseInstantTime(), opPair.getKey());
assertTrue("Expect atleast one log file to be present where the latest delta commit was written",
fileSlice.getLogFiles().count() > 0);
- assertFalse("Expect no data-file to be present", fileSlice.getDataFile().isPresent());
+ assertFalse("Expect no data-file to be present", fileSlice.getBaseFile().isPresent());
} else {
assertTrue("Expect baseInstant to be less than or equal to latestDeltaCommit",
fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0);
@@ -439,8 +439,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
assertNoWriteErrors(statusList);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
- List dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
- assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
+ List dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
+ assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.stream().findAny().isPresent());
validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
}
@@ -487,7 +487,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
assertFalse("Verify all file-slices have base-instant same as compaction instant", fileSliceList.stream()
.anyMatch(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime)));
assertFalse("Verify all file-slices have data-files",
- fileSliceList.stream().anyMatch(fs -> !fs.getDataFile().isPresent()));
+ fileSliceList.stream().anyMatch(fs -> !fs.getBaseFile().isPresent()));
if (hasDeltaCommitAfterPendingCompaction) {
assertFalse("Verify all file-slices have atleast one log-file",
@@ -533,11 +533,11 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
return statusList;
}
- private List getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
+ private List getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
HoodieTableFileSystemView view =
new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
- return view.getLatestDataFiles().collect(Collectors.toList());
+ return view.getLatestBaseFiles().collect(Collectors.toList());
}
private List getCurrentLatestFileSlices(HoodieTable table) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
index baec87508..24aa9cd1b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
@@ -267,8 +267,8 @@ public class TestCleaner extends TestHoodieClientBase {
for (HoodieFileGroup fileGroup : fileGroups) {
if (compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) {
// Ensure latest file-slice selected for compaction is retained
- Option dataFileForCompactionPresent =
- Option.fromJavaOptional(fileGroup.getAllDataFiles().filter(df -> {
+ Option dataFileForCompactionPresent =
+ Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> {
return compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()).getBaseInstantTime()
.equals(df.getCommitTime());
}).findAny());
@@ -277,7 +277,7 @@ public class TestCleaner extends TestHoodieClientBase {
} else {
// file has no more than max versions
String fileId = fileGroup.getFileGroupId().getFileId();
- List dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList());
+ List dataFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList());
assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions",
dataFiles.size() <= maxVersions);
@@ -391,7 +391,7 @@ public class TestCleaner extends TestHoodieClientBase {
List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
Set commitTimes = new HashSet<>();
- fileGroup.getAllDataFiles().forEach(value -> {
+ fileGroup.getAllBaseFiles().forEach(value -> {
LOG.debug("Data File - " + value);
commitTimes.add(value.getCommitTime());
});
@@ -1025,7 +1025,7 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
FileSlice slice =
- table.getRTFileSystemView().getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ table.getSliceView().getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
List slices = new ArrayList<>();
if (compactionInstantsToFileSlices.containsKey(compactionInstants[j])) {
@@ -1069,12 +1069,12 @@ public class TestCleaner extends TestHoodieClientBase {
expFileIdToPendingCompaction.forEach((fileId, value) -> {
String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId);
- Option fileSliceForCompaction = Option.fromJavaOptional(hoodieTable.getRTFileSystemView()
+ Option fileSliceForCompaction = Option.fromJavaOptional(hoodieTable.getSliceView()
.getLatestFileSlicesBeforeOrOn(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, baseInstantForCompaction,
true)
.filter(fs -> fs.getFileId().equals(fileId)).findFirst());
Assert.assertTrue("Base Instant for Compaction must be preserved", fileSliceForCompaction.isPresent());
- Assert.assertTrue("FileSlice has data-file", fileSliceForCompaction.get().getDataFile().isPresent());
+ Assert.assertTrue("FileSlice has data-file", fileSliceForCompaction.get().getBaseFile().isPresent());
Assert.assertEquals("FileSlice has log-files", 2, fileSliceForCompaction.get().getLogFiles().count());
});
@@ -1135,9 +1135,9 @@ public class TestCleaner extends TestHoodieClientBase {
private Stream> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient,
List paths) {
Predicate roFilePredicate =
- path -> path.contains(metaClient.getTableConfig().getROFileFormat().getFileExtension());
+ path -> path.contains(metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
Predicate rtFilePredicate =
- path -> path.contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension());
+ path -> path.contains(metaClient.getTableConfig().getLogFileFormat().getFileExtension());
Stream> stream1 = paths.stream().filter(roFilePredicate).map(fullPath -> {
String fileName = Paths.get(fullPath).getFileName().toString();
return Pair.of(FSUtils.getFileId(fileName), FSUtils.getCommitTime(fileName));
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
index abefe86e5..0da3959a8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
@@ -20,11 +20,11 @@ package org.apache.hudi;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableFileSystemView.ReadOptimizedView;
+import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -100,15 +100,15 @@ public class TestClientRollback extends TestHoodieClientBase {
FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
- final ReadOptimizedView view1 = table.getROFileSystemView();
+ final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
- List dataFiles = partitionPaths.stream().flatMap(s -> {
- return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("003"));
+ List dataFiles = partitionPaths.stream().flatMap(s -> {
+ return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"));
}).collect(Collectors.toList());
assertEquals("The data files for commit 003 should be present", 3, dataFiles.size());
dataFiles = partitionPaths.stream().flatMap(s -> {
- return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
+ return view1.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
assertEquals("The data files for commit 002 should be present", 3, dataFiles.size());
@@ -125,9 +125,9 @@ public class TestClientRollback extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
- final ReadOptimizedView view2 = table.getROFileSystemView();
+ final BaseFileOnlyView view2 = table.getBaseFileOnlyView();
- dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
+ dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
assertEquals("The data files for commit 004 should be present", 3, dataFiles.size());
// rolling back to a non existent savepoint must not succeed
@@ -144,19 +144,19 @@ public class TestClientRollback extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
- final ReadOptimizedView view3 = table.getROFileSystemView();
+ final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
dataFiles = partitionPaths.stream().flatMap(s -> {
- return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
+ return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList());
assertEquals("The data files for commit 002 be available", 3, dataFiles.size());
dataFiles = partitionPaths.stream().flatMap(s -> {
- return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("003"));
+ return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("003"));
}).collect(Collectors.toList());
assertEquals("The data files for commit 003 should be rolled back", 0, dataFiles.size());
dataFiles = partitionPaths.stream().flatMap(s -> {
- return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("004"));
+ return view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"));
}).collect(Collectors.toList());
assertEquals("The data files for commit 004 should be rolled back", 0, dataFiles.size());
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/TestCompactionAdminClient.java
index 2ce452ad3..0ed435ce2 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestCompactionAdminClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestCompactionAdminClient.java
@@ -275,7 +275,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
- Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent());
+ Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
});
@@ -336,7 +336,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant))
.filter(fs -> fs.getFileId().equals(op.getFileId())).forEach(fs -> {
- Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent());
+ Assert.assertFalse("No Data file must be present", fs.getBaseFile().isPresent());
Assert.assertEquals("No Log Files", 0, fs.getLogFiles().count());
});
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java
index 2608aea99..91ca10b13 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java
@@ -157,7 +157,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
- ((SyncableFileSystemView) (table.getRTFileSystemView())).reset();
+ ((SyncableFileSystemView) (table.getSliceView())).reset();
return table;
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
index 4a8e518c0..25bd14b5b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
@@ -21,7 +21,7 @@ package org.apache.hudi;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRollingStat;
@@ -30,7 +30,7 @@ import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.TimelineLayoutVersion;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
-import org.apache.hudi.common.table.TableFileSystemView.ReadOptimizedView;
+import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ConsistencyGuardConfig;
@@ -510,12 +510,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = getHoodieTable(metadata, config);
- ReadOptimizedView fileSystemView = table.getROFileSystemView();
- List files =
- fileSystemView.getLatestDataFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
+ BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
+ List files =
+ fileSystemView.getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
int numTotalInsertsInCommit3 = 0;
int numTotalUpdatesInCommit3 = 0;
- for (HoodieDataFile file : files) {
+ for (HoodieBaseFile file : files) {
if (file.getFileName().contains(file1)) {
assertEquals("Existing file should be expanded", commitTime3, file.getCommitTime());
records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath()));
@@ -616,12 +616,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = getHoodieTable(metaClient, config);
- List files = table.getROFileSystemView()
- .getLatestDataFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
+ List files = table.getBaseFileOnlyView()
+ .getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
assertEquals("Total of 2 valid data files", 2, files.size());
int totalInserts = 0;
- for (HoodieDataFile file : files) {
+ for (HoodieBaseFile file : files) {
assertEquals("All files must be at commit 3", commitTime3, file.getCommitTime());
records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new Path(file.getPath()));
totalInserts += records.size();
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
index 06986533f..d13010649 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
@@ -25,13 +25,13 @@ import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
-import org.apache.hudi.common.table.TableFileSystemView.ReadOptimizedView;
+import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.FSUtils;
@@ -203,10 +203,10 @@ public class HoodieClientTestUtils {
try {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
for (String path : paths) {
- ReadOptimizedView fileSystemView = new HoodieTableFileSystemView(metaClient,
+ BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
- List latestFiles = fileSystemView.getLatestDataFiles().collect(Collectors.toList());
- for (HoodieDataFile file : latestFiles) {
+ List latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
+ for (HoodieBaseFile file : latestFiles) {
filteredPaths.add(file.getPath());
}
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
index b986e8a17..01dc5425c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
@@ -157,7 +157,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
List groupedLogFiles =
- table.getRTFileSystemView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
for (FileSlice fileSlice : groupedLogFiles) {
assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
}
@@ -185,7 +185,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
return HoodieTableType.MERGE_ON_READ;
}
- // TODO - after modifying HoodieReadClient to support realtime tables - add more tests to make
+ // TODO - after modifying HoodieReadClient to support mor tables - add more tests to make
// sure the data read is the updated data (compaction correctness)
// TODO - add more test cases for compactions after a failed commit/compaction
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieDataFile.java b/hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieBaseFile.java
similarity index 83%
rename from hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieDataFile.java
rename to hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieBaseFile.java
index 8d7851362..c23cfe223 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieDataFile.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieBaseFile.java
@@ -18,21 +18,21 @@
package org.apache.hudi.io.strategy;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import java.util.UUID;
-public class TestHoodieDataFile extends HoodieDataFile {
+public class TestHoodieBaseFile extends HoodieBaseFile {
private final long size;
- public TestHoodieDataFile(long size) {
+ public TestHoodieBaseFile(long size) {
super("/tmp/XYXYXYXYXYYX_11_20180918020003.parquet");
this.size = size;
}
- public static HoodieDataFile newDataFile(long size) {
- return new TestHoodieDataFile(size);
+ public static HoodieBaseFile newDataFile(long size) {
+ return new TestHoodieBaseFile(size);
}
@Override
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieCompactionStrategy.java b/hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieCompactionStrategy.java
index 5eda0b2c9..95be5a953 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieCompactionStrategy.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/strategy/TestHoodieCompactionStrategy.java
@@ -19,7 +19,7 @@
package org.apache.hudi.io.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -239,7 +239,7 @@ public class TestHoodieCompactionStrategy {
List operations = new ArrayList<>(sizesMap.size());
sizesMap.forEach((k, v) -> {
- HoodieDataFile df = TestHoodieDataFile.newDataFile(k);
+ HoodieBaseFile df = TestHoodieBaseFile.newDataFile(k);
String partitionPath = keyToPartitionMap.get(k);
List logFiles = v.stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList());
operations.add(new HoodieCompactionOperation(df.getCommitTime(),
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 840dbed31..c68413e60 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -39,8 +39,8 @@ import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
-import org.apache.hudi.common.table.TableFileSystemView.ReadOptimizedView;
-import org.apache.hudi.common.table.TableFileSystemView.RealtimeView;
+import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
+import org.apache.hudi.common.table.TableFileSystemView.SliceView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -130,14 +130,14 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- ReadOptimizedView roView =
+ BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream dataFilesToRead = roView.getLatestDataFiles();
+ Stream dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
- assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
+ dataFilesToRead = roView.getLatestBaseFiles();
+ assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
/**
@@ -170,7 +170,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
+ dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
@@ -238,14 +238,14 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- ReadOptimizedView roView =
+ BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream dataFilesToRead = roView.getLatestDataFiles();
+ Stream dataFilesToRead = roView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
- assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
+ dataFilesToRead = roView.getLatestBaseFiles();
+ assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
/**
@@ -281,10 +281,10 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
+ dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
- List dataFiles = roView.getLatestDataFiles().map(HoodieDataFile::getPath).collect(Collectors.toList());
+ List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals("Must contain 0 records", 0, recordsRead.size());
@@ -343,7 +343,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
final String absentCommit = newCommitTime;
- assertFalse(roView.getLatestDataFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
+ assertFalse(roView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
}
}
@@ -379,14 +379,14 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- ReadOptimizedView roView =
+ BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream dataFilesToRead = roView.getLatestDataFiles();
+ Stream dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
- assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
+ dataFilesToRead = roView.getLatestBaseFiles();
+ assertTrue("should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
/**
@@ -401,7 +401,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
- List dataFiles = roView.getLatestDataFiles().map(HoodieDataFile::getPath).collect(Collectors.toList());
+ List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
@@ -415,7 +415,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// After rollback, there should be no parquet file with the failed commit time
Assert.assertEquals(Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime1)).count(), 0);
- dataFiles = roView.getLatestDataFiles().map(HoodieDataFile::getPath).collect(Collectors.toList());
+ dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
}
@@ -431,7 +431,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
- List dataFiles = roView.getLatestDataFiles().map(HoodieDataFile::getPath).collect(Collectors.toList());
+ List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
@@ -452,7 +452,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFiles = roView.getLatestDataFiles().map(HoodieDataFile::getPath).collect(Collectors.toList());
+ dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// check that the number of records read is still correct after rollback operation
assertEquals(recordsRead.size(), 200);
@@ -483,7 +483,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
- assertTrue(roView.getLatestDataFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
thirdClient.rollback(compactedCommitTime);
@@ -491,7 +491,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
- assertFalse(roView.getLatestDataFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertFalse(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
}
}
}
@@ -526,14 +526,14 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- ReadOptimizedView roView =
+ BaseFileOnlyView roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream dataFilesToRead = roView.getLatestDataFiles();
+ Stream dataFilesToRead = roView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
- assertTrue("ReadOptimizedTableView should list the parquet files we wrote in the delta commit",
+ dataFilesToRead = roView.getLatestBaseFiles();
+ assertTrue("Should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent());
/**
@@ -548,7 +548,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
- List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
+ List dataFiles = roView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
@@ -611,7 +611,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
- assertTrue(roView.getLatestDataFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
/**
* Write 5 (updates)
@@ -635,9 +635,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
roView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
+ dataFilesToRead = roView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
- RealtimeView rtView =
+ SliceView rtView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
List fileGroups =
((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList());
@@ -689,16 +689,16 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
+ BaseFileOnlyView roView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
- Stream dataFilesToRead = roView.getLatestDataFiles();
+ Stream dataFilesToRead = roView.getLatestBaseFiles();
Map parquetFileIdToSize =
- dataFilesToRead.collect(Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize));
+ dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
- List dataFilesList = dataFilesToRead.collect(Collectors.toList());
- assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
+ dataFilesToRead = roView.getLatestBaseFiles();
+ List dataFilesList = dataFilesToRead.collect(Collectors.toList());
+ assertTrue("Should list the parquet files we wrote in the delta commit",
dataFilesList.size() > 0);
/**
@@ -725,14 +725,14 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
- dataFilesToRead = roView.getLatestDataFiles();
- List newDataFilesList = dataFilesToRead.collect(Collectors.toList());
+ dataFilesToRead = roView.getLatestBaseFiles();
+ List newDataFilesList = dataFilesToRead.collect(Collectors.toList());
Map parquetFileIdToNewSize =
- newDataFilesList.stream().collect(Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize));
+ newDataFilesList.stream().collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
assertTrue(parquetFileIdToNewSize.entrySet().stream().anyMatch(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()));
- List dataFiles = roView.getLatestDataFiles().map(HoodieDataFile::getPath).collect(Collectors.toList());
+ List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// Wrote 20 records in 2 batches
assertEquals("Must contain 40 records", 40, recordsRead.size());
@@ -770,11 +770,11 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
- ((SyncableFileSystemView) (table.getRTFileSystemView())).reset();
+ ((SyncableFileSystemView) (table.getSliceView())).reset();
for (String partitionPath : dataGen.getPartitionPaths()) {
List groupedLogFiles =
- table.getRTFileSystemView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
for (FileSlice fileSlice : groupedLogFiles) {
assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
}
@@ -800,9 +800,9 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
for (String partitionPath : dataGen.getPartitionPaths()) {
List groupedLogFiles =
- table.getRTFileSystemView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
for (FileSlice slice : groupedLogFiles) {
- assertEquals("After compaction there should be no log files visiable on a Realtime view", 0, slice.getLogFiles().count());
+ assertEquals("After compaction there should be no log files visible on a full view", 0, slice.getLogFiles().count());
}
List writeStatuses = result.collect();
assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
@@ -827,12 +827,12 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
HoodieTable table =
HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
- RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
+ SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
assertEquals(0, tableRTFileSystemView.getLatestFileSlices(partitionPath)
- .filter(fileSlice -> fileSlice.getDataFile().isPresent()).count());
+ .filter(fileSlice -> fileSlice.getBaseFile().isPresent()).count());
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
@@ -903,11 +903,11 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
- RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
+ SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
- Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getDataFile().isPresent()));
+ Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
@@ -940,11 +940,11 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
HoodieTable table =
HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
- RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
+ SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
- Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getDataFile().isPresent()));
+ Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
numLogFiles += tableRTFileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice -> fileSlice.getLogFiles().count() > 0).count();
@@ -961,12 +961,12 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Trigger a rollback of compaction
writeClient.rollback(newCommitTime);
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
- tableRTFileSystemView = table.getRTFileSystemView();
+ tableRTFileSystemView = table.getSliceView();
((SyncableFileSystemView) tableRTFileSystemView).reset();
Option lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
System.out.println("Last Instant =" + lastInstant);
for (String partitionPath : dataGen.getPartitionPaths()) {
- Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getDataFile().isPresent()));
+ Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).noneMatch(fileSlice -> fileSlice.getBaseFile().isPresent()));
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).anyMatch(fileSlice -> fileSlice.getLogFiles().count() > 0));
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
index ea178a832..dd4aaf40a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java
@@ -60,7 +60,7 @@ public class CompactionOperation implements Serializable {
this.metrics = metrics;
}
- public CompactionOperation(Option dataFile, String partitionPath, List logFiles,
+ public CompactionOperation(Option dataFile, String partitionPath, List logFiles,
Map metrics) {
if (dataFile.isPresent()) {
this.baseInstantTime = dataFile.get().getCommitTime();
@@ -111,9 +111,9 @@ public class CompactionOperation implements Serializable {
return id;
}
- public Option getBaseFile(String basePath, String partitionPath) {
+ public Option getBaseFile(String basePath, String partitionPath) {
Path dirPath = FSUtils.getPartitionPath(basePath, partitionPath);
- return dataFileName.map(df -> new HoodieDataFile(new Path(dirPath, df).toString()));
+ return dataFileName.map(df -> new HoodieBaseFile(new Path(dirPath, df).toString()));
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
index 7a6521464..19e62f999 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
@@ -44,7 +44,7 @@ public class FileSlice implements Serializable {
/**
* data file, with the compacted data, for this slice.
*/
- private HoodieDataFile dataFile;
+ private HoodieBaseFile baseFile;
/**
* List of appendable log files with real time data - Sorted with greater log version first - Always empty for
@@ -59,12 +59,12 @@ public class FileSlice implements Serializable {
public FileSlice(HoodieFileGroupId fileGroupId, String baseInstantTime) {
this.fileGroupId = fileGroupId;
this.baseInstantTime = baseInstantTime;
- this.dataFile = null;
+ this.baseFile = null;
this.logFiles = new TreeSet<>(HoodieLogFile.getReverseLogFileComparator());
}
- public void setDataFile(HoodieDataFile dataFile) {
- this.dataFile = dataFile;
+ public void setBaseFile(HoodieBaseFile baseFile) {
+ this.baseFile = baseFile;
}
public void addLogFile(HoodieLogFile logFile) {
@@ -91,8 +91,8 @@ public class FileSlice implements Serializable {
return fileGroupId;
}
- public Option getDataFile() {
- return Option.ofNullable(dataFile);
+ public Option getBaseFile() {
+ return Option.ofNullable(baseFile);
}
public Option getLatestLogFile() {
@@ -105,7 +105,7 @@ public class FileSlice implements Serializable {
* @return
*/
public boolean isEmpty() {
- return (dataFile == null) && (logFiles.isEmpty());
+ return (baseFile == null) && (logFiles.isEmpty());
}
@Override
@@ -113,7 +113,7 @@ public class FileSlice implements Serializable {
final StringBuilder sb = new StringBuilder("FileSlice {");
sb.append("fileGroupId=").append(fileGroupId);
sb.append(", baseCommitTime=").append(baseInstantTime);
- sb.append(", dataFile='").append(dataFile).append('\'');
+ sb.append(", baseFile='").append(baseFile).append('\'');
sb.append(", logFiles='").append(logFiles).append('\'');
sb.append('}');
return sb.toString();
@@ -129,7 +129,7 @@ public class FileSlice implements Serializable {
}
FileSlice slice = (FileSlice) o;
return Objects.equals(fileGroupId, slice.fileGroupId) && Objects.equals(baseInstantTime, slice.baseInstantTime)
- && Objects.equals(dataFile, slice.dataFile) && Objects.equals(logFiles, slice.logFiles);
+ && Objects.equals(baseFile, slice.baseFile) && Objects.equals(logFiles, slice.logFiles);
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDataFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
similarity index 91%
rename from hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDataFile.java
rename to hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
index 4983b74d8..90e429b0d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDataFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java
@@ -27,21 +27,21 @@ import java.io.Serializable;
import java.util.Objects;
/**
- * Hoodie data file.
+ * Hoodie base file.
*/
-public class HoodieDataFile implements Serializable {
+public class HoodieBaseFile implements Serializable {
private transient FileStatus fileStatus;
private final String fullPath;
private long fileLen;
- public HoodieDataFile(FileStatus fileStatus) {
+ public HoodieBaseFile(FileStatus fileStatus) {
this.fileStatus = fileStatus;
this.fullPath = fileStatus.getPath().toString();
this.fileLen = fileStatus.getLen();
}
- public HoodieDataFile(String filePath) {
+ public HoodieBaseFile(String filePath) {
this.fileStatus = null;
this.fullPath = filePath;
this.fileLen = -1;
@@ -87,7 +87,7 @@ public class HoodieDataFile implements Serializable {
if (o == null || getClass() != o.getClass()) {
return false;
}
- HoodieDataFile dataFile = (HoodieDataFile) o;
+ HoodieBaseFile dataFile = (HoodieBaseFile) o;
return Objects.equals(fullPath, dataFile.fullPath);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
index 3313586eb..83e38d4c3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
@@ -82,11 +82,11 @@ public class HoodieFileGroup implements Serializable {
/**
* Add a new datafile into the file group.
*/
- public void addDataFile(HoodieDataFile dataFile) {
+ public void addBaseFile(HoodieBaseFile dataFile) {
if (!fileSlices.containsKey(dataFile.getCommitTime())) {
fileSlices.put(dataFile.getCommitTime(), new FileSlice(fileGroupId, dataFile.getCommitTime()));
}
- fileSlices.get(dataFile.getCommitTime()).setDataFile(dataFile);
+ fileSlices.get(dataFile.getCommitTime()).setBaseFile(dataFile);
}
/**
@@ -155,8 +155,8 @@ public class HoodieFileGroup implements Serializable {
/**
* Gets the latest data file.
*/
- public Option getLatestDataFile() {
- return Option.fromJavaOptional(getAllDataFiles().findFirst());
+ public Option getLatestDataFile() {
+ return Option.fromJavaOptional(getAllBaseFiles().findFirst());
}
/**
@@ -187,8 +187,8 @@ public class HoodieFileGroup implements Serializable {
/**
* Stream of committed data files, sorted reverse commit time.
*/
- public Stream getAllDataFiles() {
- return getAllFileSlices().filter(slice -> slice.getDataFile().isPresent()).map(slice -> slice.getDataFile().get());
+ public Stream getAllBaseFiles() {
+ return getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).map(slice -> slice.getBaseFile().get());
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableType.java
index 6c24e391b..851771c08 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableType.java
@@ -27,9 +27,6 @@ package org.apache.hudi.common.model;
*
* MERGE_ON_READ - Speeds up upserts, by delaying merge until enough work piles up.
*
- * In the future, following might be added.
- *
- * SIMPLE_LSM - A simple 2 level LSM tree.
*/
public enum HoodieTableType {
COPY_ON_WRITE, MERGE_ON_READ
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index b19456536..49326b806 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -53,15 +53,19 @@ public class HoodieTableConfig implements Serializable {
public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name";
public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type";
+ @Deprecated
public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = "hoodie.table.ro.file.format";
+ @Deprecated
public static final String HOODIE_RT_FILE_FORMAT_PROP_NAME = "hoodie.table.rt.file.format";
+ public static final String HOODIE_BASE_FILE_FORMAT_PROP_NAME = "hoodie.table.base.file.format";
+ public static final String HOODIE_LOG_FILE_FORMAT_PROP_NAME = "hoodie.table.log.file.format";
public static final String HOODIE_TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
public static final String HOODIE_PAYLOAD_CLASS_PROP_NAME = "hoodie.compaction.payload.class";
public static final String HOODIE_ARCHIVELOG_FOLDER_PROP_NAME = "hoodie.archivelog.folder";
public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE;
- public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET;
- public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG;
+ public static final HoodieFileFormat DEFAULT_BASE_FILE_FORMAT = HoodieFileFormat.PARQUET;
+ public static final HoodieFileFormat DEFAULT_LOG_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG;
public static final String DEFAULT_PAYLOAD_CLASS = OverwriteWithLatestAvroPayload.class.getName();
public static final Integer DEFAULT_TIMELINE_LAYOUT_VERSION = TimelineLayoutVersion.VERSION_0;
public static final String DEFAULT_ARCHIVELOG_FOLDER = "";
@@ -164,27 +168,33 @@ public class HoodieTableConfig implements Serializable {
}
/**
- * Get the Read Optimized Storage Format.
+ * Get the base file storage format.
*
- * @return HoodieFileFormat for the Read Optimized Storage format
+ * @return HoodieFileFormat for the base file Storage format
*/
- public HoodieFileFormat getROFileFormat() {
+ public HoodieFileFormat getBaseFileFormat() {
+ if (props.containsKey(HOODIE_BASE_FILE_FORMAT_PROP_NAME)) {
+ return HoodieFileFormat.valueOf(props.getProperty(HOODIE_BASE_FILE_FORMAT_PROP_NAME));
+ }
if (props.containsKey(HOODIE_RO_FILE_FORMAT_PROP_NAME)) {
return HoodieFileFormat.valueOf(props.getProperty(HOODIE_RO_FILE_FORMAT_PROP_NAME));
}
- return DEFAULT_RO_FILE_FORMAT;
+ return DEFAULT_BASE_FILE_FORMAT;
}
/**
- * Get the Read Optimized Storage Format.
+ * Get the log Storage Format.
*
- * @return HoodieFileFormat for the Read Optimized Storage format
+ * @return HoodieFileFormat for the log Storage format
*/
- public HoodieFileFormat getRTFileFormat() {
+ public HoodieFileFormat getLogFileFormat() {
+ if (props.containsKey(HOODIE_LOG_FILE_FORMAT_PROP_NAME)) {
+ return HoodieFileFormat.valueOf(props.getProperty(HOODIE_LOG_FILE_FORMAT_PROP_NAME));
+ }
if (props.containsKey(HOODIE_RT_FILE_FORMAT_PROP_NAME)) {
return HoodieFileFormat.valueOf(props.getProperty(HOODIE_RT_FILE_FORMAT_PROP_NAME));
}
- return DEFAULT_RT_FILE_FORMAT;
+ return DEFAULT_LOG_FILE_FORMAT;
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 1d27d81b7..40583ddc5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -299,7 +299,7 @@ public class HoodieTableMetaClient implements Serializable {
}
/**
- * Helper method to initialize a given path, as a given storage type and table name.
+ * Helper method to initialize a given path, as a given type and table name.
*/
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType, String tableName, String payloadClassName) throws IOException {
@@ -437,7 +437,7 @@ public class HoodieTableMetaClient implements Serializable {
case MERGE_ON_READ:
return HoodieActiveTimeline.DELTA_COMMIT_ACTION;
default:
- throw new HoodieException("Could not commit on unknown storage type " + this.getTableType());
+ throw new HoodieException("Could not commit on unknown table type " + this.getTableType());
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/SyncableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/SyncableFileSystemView.java
index d8023a6a4..7a03f0ff6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/SyncableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/SyncableFileSystemView.java
@@ -18,12 +18,15 @@
package org.apache.hudi.common.table;
+import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView;
+import org.apache.hudi.common.table.TableFileSystemView.SliceView;
+
/**
- * A consolidated file-system view interface exposing both realtime and read-optimized views along with
+ * A consolidated file-system view interface exposing both complete slice and basefile only views along with
* update operations.
*/
public interface SyncableFileSystemView
- extends TableFileSystemView, TableFileSystemView.ReadOptimizedView, TableFileSystemView.RealtimeView {
+ extends TableFileSystemView, BaseFileOnlyView, SliceView {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableFileSystemView.java
index f77d1e1c0..0a5bb3231 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableFileSystemView.java
@@ -20,7 +20,7 @@ package org.apache.hudi.common.table;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
@@ -37,57 +37,57 @@ import java.util.stream.Stream;
public interface TableFileSystemView {
/**
- * ReadOptimizedView with methods to only access latest version of file for the instant(s) passed.
+ * Methods to only access latest version of file for the instant(s) passed.
*/
- interface ReadOptimizedViewWithLatestSlice {
+ interface BaseFileOnlyViewWithLatestSlice {
/**
* Stream all the latest data files in the given partition.
*/
- Stream getLatestDataFiles(String partitionPath);
+ Stream getLatestBaseFiles(String partitionPath);
/**
* Get Latest data file for a partition and file-Id.
*/
- Option getLatestDataFile(String partitionPath, String fileId);
+ Option getLatestBaseFile(String partitionPath, String fileId);
/**
* Stream all the latest data files, in the file system view.
*/
- Stream getLatestDataFiles();
+ Stream getLatestBaseFiles();
/**
* Stream all the latest version data files in the given partition with precondition that commitTime(file) before
* maxCommitTime.
*/
- Stream getLatestDataFilesBeforeOrOn(String partitionPath, String maxCommitTime);
+ Stream getLatestBaseFilesBeforeOrOn(String partitionPath, String maxCommitTime);
/**
* Stream all the latest data files pass.
*/
- Stream getLatestDataFilesInRange(List commitsToReturn);
+ Stream getLatestBaseFilesInRange(List commitsToReturn);
}
/**
- * ReadOptimizedView - methods to provide a view of columnar data files only.
+ * Methods to provide a view of base files only.
*/
- interface ReadOptimizedView extends ReadOptimizedViewWithLatestSlice {
+ interface BaseFileOnlyView extends BaseFileOnlyViewWithLatestSlice {
/**
* Stream all the data file versions grouped by FileId for a given partition.
*/
- Stream getAllDataFiles(String partitionPath);
+ Stream getAllBaseFiles(String partitionPath);
/**
* Get the version of data file matching the instant time in the given partition.
*/
- Option getDataFileOn(String partitionPath, String instantTime, String fileId);
+ Option getBaseFileOn(String partitionPath, String instantTime, String fileId);
}
/**
- * RealtimeView with methods to only access latest version of file-slice for the instant(s) passed.
+ * Methods to only access latest version of file-slice for the instant(s) passed.
*/
- interface RealtimeViewWithLatestSlice {
+ interface SliceViewWithLatestSlice {
/**
* Stream all the latest file slices in the given partition.
@@ -131,9 +131,9 @@ public interface TableFileSystemView {
}
/**
- * RealtimeView - methods to access a combination of columnar data files + log files with real time data.
+ * Methods to access a combination of base files + log file slices.
*/
- interface RealtimeView extends RealtimeViewWithLatestSlice {
+ interface SliceView extends SliceViewWithLatestSlice {
/**
* Stream all the file slices for a given partition, latest or not.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DataFileDTO.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/BaseFileDTO.java
similarity index 77%
rename from hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DataFileDTO.java
rename to hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/BaseFileDTO.java
index fab7ad1df..408aafda9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DataFileDTO.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/BaseFileDTO.java
@@ -18,7 +18,7 @@
package org.apache.hudi.common.table.timeline.dto;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
* The data transfer object of data file.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
-public class DataFileDTO {
+public class BaseFileDTO {
@JsonProperty("fileStatus")
private FileStatusDTO fileStatus;
@@ -36,27 +36,27 @@ public class DataFileDTO {
@JsonProperty("fileLen")
private long fileLen;
- public static HoodieDataFile toHoodieDataFile(DataFileDTO dto) {
+ public static HoodieBaseFile toHoodieBaseFile(BaseFileDTO dto) {
if (null == dto) {
return null;
}
- HoodieDataFile dataFile = null;
+ HoodieBaseFile baseFile;
if (null != dto.fileStatus) {
- dataFile = new HoodieDataFile(FileStatusDTO.toFileStatus(dto.fileStatus));
+ baseFile = new HoodieBaseFile(FileStatusDTO.toFileStatus(dto.fileStatus));
} else {
- dataFile = new HoodieDataFile(dto.fullPath);
- dataFile.setFileLen(dto.fileLen);
+ baseFile = new HoodieBaseFile(dto.fullPath);
+ baseFile.setFileLen(dto.fileLen);
}
- return dataFile;
+ return baseFile;
}
- public static DataFileDTO fromHoodieDataFile(HoodieDataFile dataFile) {
+ public static BaseFileDTO fromHoodieBaseFile(HoodieBaseFile dataFile) {
if (null == dataFile) {
return null;
}
- DataFileDTO dto = new DataFileDTO();
+ BaseFileDTO dto = new BaseFileDTO();
dto.fileStatus = FileStatusDTO.fromFileStatus(dataFile.getFileStatus());
dto.fullPath = dataFile.getPath();
dto.fileLen = dataFile.getFileLen();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileSliceDTO.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileSliceDTO.java
index e10386971..b3f7d24b7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileSliceDTO.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileSliceDTO.java
@@ -32,8 +32,8 @@ import java.util.stream.Collectors;
@JsonIgnoreProperties(ignoreUnknown = true)
public class FileSliceDTO {
- @JsonProperty("dataFile")
- DataFileDTO dataFile;
+ @JsonProperty("baseFile")
+ BaseFileDTO baseFile;
@JsonProperty("logFiles")
List logFiles;
@JsonProperty("partition")
@@ -48,14 +48,14 @@ public class FileSliceDTO {
dto.partitionPath = slice.getPartitionPath();
dto.baseInstantTime = slice.getBaseInstantTime();
dto.fileId = slice.getFileId();
- dto.dataFile = slice.getDataFile().map(DataFileDTO::fromHoodieDataFile).orElse(null);
+ dto.baseFile = slice.getBaseFile().map(BaseFileDTO::fromHoodieBaseFile).orElse(null);
dto.logFiles = slice.getLogFiles().map(LogFileDTO::fromHoodieLogFile).collect(Collectors.toList());
return dto;
}
public static FileSlice toFileSlice(FileSliceDTO dto) {
FileSlice slice = new FileSlice(dto.partitionPath, dto.baseInstantTime, dto.fileId);
- slice.setDataFile(DataFileDTO.toHoodieDataFile(dto.dataFile));
+ slice.setBaseFile(BaseFileDTO.toHoodieBaseFile(dto.baseFile));
dto.logFiles.stream().forEach(lf -> slice.addLogFile(LogFileDTO.toHoodieLogFile(lf)));
return slice;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 3ffa9fbd8..ee28cdc49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -20,7 +20,7 @@ package org.apache.hudi.common.table.view;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -133,16 +133,16 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
*/
protected List buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline,
boolean addPendingCompactionFileSlice) {
- return buildFileGroups(convertFileStatusesToDataFiles(statuses), convertFileStatusesToLogFiles(statuses), timeline,
+ return buildFileGroups(convertFileStatusesToBaseFiles(statuses), convertFileStatusesToLogFiles(statuses), timeline,
addPendingCompactionFileSlice);
}
- protected List buildFileGroups(Stream dataFileStream,
+ protected List buildFileGroups(Stream baseFileStream,
Stream logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
- Map, List> dataFiles =
- dataFileStream.collect(Collectors.groupingBy((dataFile) -> {
- String partitionPathStr = getPartitionPathFromFilePath(dataFile.getPath());
- return Pair.of(partitionPathStr, dataFile.getFileId());
+ Map, List> baseFiles =
+ baseFileStream.collect(Collectors.groupingBy((baseFile) -> {
+ String partitionPathStr = getPartitionPathFromFilePath(baseFile.getPath());
+ return Pair.of(partitionPathStr, baseFile.getFileId());
}));
Map, List> logFiles = logFileStream.collect(Collectors.groupingBy((logFile) -> {
@@ -151,15 +151,15 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
return Pair.of(partitionPathStr, logFile.getFileId());
}));
- Set> fileIdSet = new HashSet<>(dataFiles.keySet());
+ Set> fileIdSet = new HashSet<>(baseFiles.keySet());
fileIdSet.addAll(logFiles.keySet());
List fileGroups = new ArrayList<>();
fileIdSet.forEach(pair -> {
String fileId = pair.getValue();
HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline);
- if (dataFiles.containsKey(pair)) {
- dataFiles.get(pair).forEach(group::addDataFile);
+ if (baseFiles.containsKey(pair)) {
+ baseFiles.get(pair).forEach(group::addBaseFile);
}
if (logFiles.containsKey(pair)) {
logFiles.get(pair).forEach(group::addLogFile);
@@ -233,7 +233,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
storePartitionView(partitionPathStr, new ArrayList<>());
}
} catch (IOException e) {
- throw new HoodieIOException("Failed to list data files in partition " + partitionPathStr, e);
+ throw new HoodieIOException("Failed to list base files in partition " + partitionPathStr, e);
}
} else {
LOG.debug("View already built for Partition :" + partitionPathStr + ", FOUND is ");
@@ -245,14 +245,14 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
/**
- * Helper to convert file-status to data-files.
+ * Helper to convert file-status to base-files.
*
* @param statuses List of File-Status
*/
- private Stream convertFileStatusesToDataFiles(FileStatus[] statuses) {
+ private Stream convertFileStatusesToBaseFiles(FileStatus[] statuses) {
Predicate roFilePredicate = fileStatus -> fileStatus.getPath().getName()
- .contains(metaClient.getTableConfig().getROFileFormat().getFileExtension());
- return Arrays.stream(statuses).filter(roFilePredicate).map(HoodieDataFile::new);
+ .contains(metaClient.getTableConfig().getBaseFileFormat().getFileExtension());
+ return Arrays.stream(statuses).filter(roFilePredicate).map(HoodieBaseFile::new);
}
/**
@@ -262,23 +262,23 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
*/
private Stream convertFileStatusesToLogFiles(FileStatus[] statuses) {
Predicate rtFilePredicate = fileStatus -> fileStatus.getPath().getName()
- .contains(metaClient.getTableConfig().getRTFileFormat().getFileExtension());
+ .contains(metaClient.getTableConfig().getLogFileFormat().getFileExtension());
return Arrays.stream(statuses).filter(rtFilePredicate).map(HoodieLogFile::new);
}
/**
- * With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore those
- * data-files.
+ * With async compaction, it is possible to see partial/complete base-files due to inflight-compactions, Ignore those
+ * base-files.
*
- * @param dataFile Data File
+ * @param baseFile base File
*/
- protected boolean isDataFileDueToPendingCompaction(HoodieDataFile dataFile) {
- final String partitionPath = getPartitionPathFromFilePath(dataFile.getPath());
+ protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) {
+ final String partitionPath = getPartitionPathFromFilePath(baseFile.getPath());
Option> compactionWithInstantTime =
- getPendingCompactionOperationWithInstant(new HoodieFileGroupId(partitionPath, dataFile.getFileId()));
+ getPendingCompactionOperationWithInstant(new HoodieFileGroupId(partitionPath, baseFile.getFileId()));
return (compactionWithInstantTime.isPresent()) && (null != compactionWithInstantTime.get().getKey())
- && dataFile.getCommitTime().equals(compactionWithInstantTime.get().getKey());
+ && baseFile.getCommitTime().equals(compactionWithInstantTime.get().getKey());
}
/**
@@ -296,15 +296,15 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
/**
- * With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore those
- * data-files.
+ * With async compaction, it is possible to see partial/complete base-files due to inflight-compactions, Ignore those
+ * base-files.
*
* @param fileSlice File Slice
*/
- protected FileSlice filterDataFileAfterPendingCompaction(FileSlice fileSlice) {
+ protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) {
if (isFileSliceAfterPendingCompaction(fileSlice)) {
LOG.info("File Slice (" + fileSlice + ") is in pending compaction");
- // Data file is filtered out of the file-slice as the corresponding compaction
+ // Base file is filtered out of the file-slice as the corresponding compaction
// instant not completed yet.
FileSlice transformed =
new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
@@ -325,38 +325,38 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
@Override
- public final Stream getLatestDataFiles(String partitionStr) {
+ public final Stream getLatestBaseFiles(String partitionStr) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- return fetchLatestDataFiles(partitionPath);
+ return fetchLatestBaseFiles(partitionPath);
} finally {
readLock.unlock();
}
}
@Override
- public final Stream getLatestDataFiles() {
+ public final Stream getLatestBaseFiles() {
try {
readLock.lock();
- return fetchLatestDataFiles();
+ return fetchLatestBaseFiles();
} finally {
readLock.unlock();
}
}
@Override
- public final Stream getLatestDataFilesBeforeOrOn(String partitionStr, String maxCommitTime) {
+ public final Stream getLatestBaseFilesBeforeOrOn(String partitionStr, String maxCommitTime) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
return fetchAllStoredFileGroups(partitionPath)
- .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllDataFiles()
- .filter(dataFile -> HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), maxCommitTime,
+ .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
+ .filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), maxCommitTime,
HoodieTimeline.LESSER_OR_EQUAL))
- .filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst()))
+ .filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst()))
.filter(Option::isPresent).map(Option::get);
} finally {
readLock.unlock();
@@ -364,43 +364,43 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
@Override
- public final Option getDataFileOn(String partitionStr, String instantTime, String fileId) {
+ public final Option getBaseFileOn(String partitionStr, String instantTime, String fileId) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllDataFiles()
+ return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
.filter(
- dataFile -> HoodieTimeline.compareTimestamps(dataFile.getCommitTime(), instantTime, HoodieTimeline.EQUAL))
- .filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst().orElse(null));
+ baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), instantTime, HoodieTimeline.EQUAL))
+ .filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null));
} finally {
readLock.unlock();
}
}
/**
- * Get Latest data file for a partition and file-Id.
+ * Get Latest base file for a partition and file-Id.
*/
@Override
- public final Option getLatestDataFile(String partitionStr, String fileId) {
+ public final Option getLatestBaseFile(String partitionStr, String fileId) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- return fetchLatestDataFile(partitionPath, fileId);
+ return fetchLatestBaseFile(partitionPath, fileId);
} finally {
readLock.unlock();
}
}
@Override
- public final Stream getLatestDataFilesInRange(List commitsToReturn) {
+ public final Stream getLatestBaseFilesInRange(List commitsToReturn) {
try {
readLock.lock();
return fetchAllStoredFileGroups().map(fileGroup -> {
return Option.fromJavaOptional(
- fileGroup.getAllDataFiles().filter(dataFile -> commitsToReturn.contains(dataFile.getCommitTime())
- && !isDataFileDueToPendingCompaction(dataFile)).findFirst());
+ fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
+ && !isBaseFileDueToPendingCompaction(baseFile)).findFirst());
}).filter(Option::isPresent).map(Option::get);
} finally {
readLock.unlock();
@@ -408,14 +408,14 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
@Override
- public final Stream getAllDataFiles(String partitionStr) {
+ public final Stream getAllBaseFiles(String partitionStr) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- return fetchAllDataFiles(partitionPath)
+ return fetchAllBaseFiles(partitionPath)
.filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
- .filter(df -> !isDataFileDueToPendingCompaction(df));
+ .filter(df -> !isBaseFileDueToPendingCompaction(df));
} finally {
readLock.unlock();
}
@@ -427,7 +427,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
- return fetchLatestFileSlices(partitionPath).map(fs -> filterDataFileAfterPendingCompaction(fs));
+ return fetchLatestFileSlices(partitionPath).map(this::filterBaseFileAfterPendingCompaction);
} finally {
readLock.unlock();
}
@@ -443,7 +443,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
Option fs = fetchLatestFileSlice(partitionPath, fileId);
- return fs.map(f -> filterDataFileAfterPendingCompaction(f));
+ return fs.map(f -> filterBaseFileAfterPendingCompaction(f));
} finally {
readLock.unlock();
}
@@ -480,7 +480,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
ensurePartitionLoadedCorrectly(partitionPath);
Stream fileSliceStream = fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime);
if (includeFileSlicesInPendingCompaction) {
- return fileSliceStream.map(fs -> filterDataFileAfterPendingCompaction(fs));
+ return fileSliceStream.map(fs -> filterBaseFileAfterPendingCompaction(fs));
} else {
return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId()));
}
@@ -653,33 +653,33 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
/**
- * Default implementation for fetching latest data-files for the partition-path.
+ * Default implementation for fetching latest base-files for the partition-path.
*/
- Stream fetchLatestDataFiles(final String partitionPath) {
- return fetchAllStoredFileGroups(partitionPath).map(this::getLatestDataFile).filter(Option::isPresent)
+ Stream fetchLatestBaseFiles(final String partitionPath) {
+ return fetchAllStoredFileGroups(partitionPath).map(this::getLatestBaseFile).filter(Option::isPresent)
.map(Option::get);
}
- protected Option getLatestDataFile(HoodieFileGroup fileGroup) {
+ protected Option getLatestBaseFile(HoodieFileGroup fileGroup) {
return Option
- .fromJavaOptional(fileGroup.getAllDataFiles().filter(df -> !isDataFileDueToPendingCompaction(df)).findFirst());
+ .fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst());
}
/**
- * Default implementation for fetching latest data-files across all partitions.
+ * Default implementation for fetching latest base-files across all partitions.
*/
- Stream fetchLatestDataFiles() {
- return fetchAllStoredFileGroups().map(this::getLatestDataFile).filter(Option::isPresent).map(Option::get);
+ Stream fetchLatestBaseFiles() {
+ return fetchAllStoredFileGroups().map(this::getLatestBaseFile).filter(Option::isPresent).map(Option::get);
}
/**
- * Default implementation for fetching all data-files for a partition.
+ * Default implementation for fetching all base-files for a partition.
*
* @param partitionPath partition-path
*/
- Stream fetchAllDataFiles(String partitionPath) {
- return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getAllDataFiles)
- .flatMap(dataFileList -> dataFileList);
+ Stream fetchAllBaseFiles(String partitionPath) {
+ return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getAllBaseFiles)
+ .flatMap(baseFileList -> baseFileList);
}
/**
@@ -719,8 +719,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) {
FileSlice merged = new FileSlice(penultimateSlice.getPartitionPath(), penultimateSlice.getBaseInstantTime(),
penultimateSlice.getFileId());
- if (penultimateSlice.getDataFile().isPresent()) {
- merged.setDataFile(penultimateSlice.getDataFile().get());
+ if (penultimateSlice.getBaseFile().isPresent()) {
+ merged.setBaseFile(penultimateSlice.getBaseFile().get());
}
// Add Log files from penultimate and last slices
penultimateSlice.getLogFiles().forEach(merged::addLogFile);
@@ -752,15 +752,15 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
}
/**
- * Default implementation for fetching latest data-file.
+ * Default implementation for fetching latest base-file.
*
* @param partitionPath Partition path
* @param fileId File Id
- * @return Data File if present
+ * @return base File if present
*/
- protected Option fetchLatestDataFile(String partitionPath, String fileId) {
- return Option
- .fromJavaOptional(fetchLatestDataFiles(partitionPath).filter(fs -> fs.getFileId().equals(fileId)).findFirst());
+ protected Option fetchLatestBaseFile(String partitionPath, String fileId) {
+ return Option.fromJavaOptional(fetchLatestBaseFiles(partitionPath)
+ .filter(fs -> fs.getFileId().equals(fileId)).findFirst());
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
index 19209acf7..07b262d89 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTimeline;
@@ -318,13 +318,13 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
* Note that while finding the new data/log files added/removed, the path stored in metadata will be missing the
* base-path,scheme and authority. Ensure the matching process takes care of this discrepancy.
*/
- Map viewDataFiles = fileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices)
- .map(FileSlice::getDataFile).filter(Option::isPresent).map(Option::get)
+ Map viewDataFiles = fileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices)
+ .map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get)
.map(df -> Pair.of(Path.getPathWithoutSchemeAndAuthority(new Path(df.getPath())).toString(), df))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// Note: Delta Log Files and Data FIles can be empty when adding/removing pending compactions
- Map deltaDataFiles = deltaFileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices)
- .map(FileSlice::getDataFile).filter(Option::isPresent).map(Option::get)
+ Map deltaDataFiles = deltaFileGroups.stream().flatMap(HoodieFileGroup::getAllRawFileSlices)
+ .map(FileSlice::getBaseFile).filter(Option::isPresent).map(Option::get)
.map(df -> Pair.of(Path.getPathWithoutSchemeAndAuthority(new Path(df.getPath())).toString(), df))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index e9087db95..5eb1173c0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -20,7 +20,7 @@ package org.apache.hudi.common.table.view;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
@@ -120,39 +120,39 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
}
@Override
- public Stream getLatestDataFiles(String partitionPath) {
- return execute(partitionPath, preferredView::getLatestDataFiles, secondaryView::getLatestDataFiles);
+ public Stream getLatestBaseFiles(String partitionPath) {
+ return execute(partitionPath, preferredView::getLatestBaseFiles, secondaryView::getLatestBaseFiles);
}
@Override
- public Stream getLatestDataFiles() {
- return execute(preferredView::getLatestDataFiles, secondaryView::getLatestDataFiles);
+ public Stream getLatestBaseFiles() {
+ return execute(preferredView::getLatestBaseFiles, secondaryView::getLatestBaseFiles);
}
@Override
- public Stream getLatestDataFilesBeforeOrOn(String partitionPath, String maxCommitTime) {
- return execute(partitionPath, maxCommitTime, preferredView::getLatestDataFilesBeforeOrOn,
- secondaryView::getLatestDataFilesBeforeOrOn);
+ public Stream getLatestBaseFilesBeforeOrOn(String partitionPath, String maxCommitTime) {
+ return execute(partitionPath, maxCommitTime, preferredView::getLatestBaseFilesBeforeOrOn,
+ secondaryView::getLatestBaseFilesBeforeOrOn);
}
@Override
- public Option getLatestDataFile(String partitionPath, String fileId) {
- return execute(partitionPath, fileId, preferredView::getLatestDataFile, secondaryView::getLatestDataFile);
+ public Option getLatestBaseFile(String partitionPath, String fileId) {
+ return execute(partitionPath, fileId, preferredView::getLatestBaseFile, secondaryView::getLatestBaseFile);
}
@Override
- public Option getDataFileOn(String partitionPath, String instantTime, String fileId) {
- return execute(partitionPath, instantTime, fileId, preferredView::getDataFileOn, secondaryView::getDataFileOn);
+ public Option getBaseFileOn(String partitionPath, String instantTime, String fileId) {
+ return execute(partitionPath, instantTime, fileId, preferredView::getBaseFileOn, secondaryView::getBaseFileOn);
}
@Override
- public Stream getLatestDataFilesInRange(List commitsToReturn) {
- return execute(commitsToReturn, preferredView::getLatestDataFilesInRange, secondaryView::getLatestDataFilesInRange);
+ public Stream getLatestBaseFilesInRange(List commitsToReturn) {
+ return execute(commitsToReturn, preferredView::getLatestBaseFilesInRange, secondaryView::getLatestBaseFilesInRange);
}
@Override
- public Stream getAllDataFiles(String partitionPath) {
- return execute(partitionPath, preferredView::getAllDataFiles, secondaryView::getAllDataFiles);
+ public Stream getAllBaseFiles(String partitionPath) {
+ return execute(partitionPath, preferredView::getAllBaseFiles, secondaryView::getAllBaseFiles);
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 62ef3e6a2..791417e52 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -20,14 +20,14 @@ package org.apache.hudi.common.table.view;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
-import org.apache.hudi.common.table.timeline.dto.DataFileDTO;
+import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
@@ -205,74 +205,74 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
}
@Override
- public Stream getLatestDataFiles(String partitionPath) {
+ public Stream getLatestBaseFiles(String partitionPath) {
Map paramsMap = getParamsWithPartitionPath(partitionPath);
try {
- List dataFiles = executeRequest(LATEST_PARTITION_DATA_FILES_URL, paramsMap,
- new TypeReference>() {}, RequestMethod.GET);
- return dataFiles.stream().map(DataFileDTO::toHoodieDataFile);
+ List dataFiles = executeRequest(LATEST_PARTITION_DATA_FILES_URL, paramsMap,
+ new TypeReference