diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index c475b1d36..1123b2d1f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -177,7 +177,7 @@ public class HoodieReadClient implements Serializable { + hoodieTable.getMetaClient().getBasePath()); } - TableFileSystemView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(), + TableFileSystemView.ReadOptimizedView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(), hoodieTable.getCompletedCommitTimeline(), fs.globStatus(new Path(path))); List latestFiles = fileSystemView.getLatestDataFiles().collect( Collectors.toList()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index fb7755f23..1dd3db1d5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -479,7 +479,7 @@ public class HoodieWriteClient implements Seriali .mapToPair((PairFunction>) partitionPath -> { // Scan all partitions files with this commit time logger.info("Collecting latest files in partition path " + partitionPath); - TableFileSystemView view = table.getFileSystemView(); + TableFileSystemView.ReadOptimizedView view = table.getROFileSystemView(); List latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime) .map(HoodieDataFile::getFileName).collect(Collectors.toList()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java index a6c292a7b..ef213810c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java @@ -184,7 +184,7 @@ public class HoodieBloomIndex extends HoodieIndex List> list = new ArrayList<>(); if (latestCommitTime.isPresent()) { List filteredFiles = - hoodieTable.getFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath, + hoodieTable.getROFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()).collect(Collectors.toList()); for (HoodieDataFile file : filteredFiles) { list.add(new Tuple2<>(partitionPath, file.getFileName())); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index bed5293e6..8ba4068ad 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -40,7 +40,7 @@ public abstract class HoodieIOHandle { protected final FileSystem fs; protected final HoodieTable hoodieTable; protected HoodieTimeline hoodieTimeline; - protected TableFileSystemView fileSystemView; + protected TableFileSystemView.ReadOptimizedView fileSystemView; protected final Schema schema; public HoodieIOHandle(HoodieWriteConfig config, String commitTime, @@ -50,7 +50,7 @@ public abstract class HoodieIOHandle { this.fs = FSUtils.getFs(); this.hoodieTable = hoodieTable; this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline(); - this.fileSystemView = hoodieTable.getFileSystemView(); + this.fileSystemView = hoodieTable.getROFileSystemView(); this.schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 698f2a84c..003f8e699 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -84,7 +84,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction) partitionPath -> hoodieTable - .getFileSystemView() + .getRTFileSystemView() .getLatestFileSlices(partitionPath) .map(s -> new CompactionOperation(s.getDataFile().get(), partitionPath, s.getLogFiles().collect(Collectors.toList()), config)) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 0386889af..23fec955d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -311,7 +311,7 @@ public class HoodieCopyOnWriteTable extends Hoodi if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - List allFiles = getFileSystemView() + List allFiles = getROFileSystemView() .getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()) .collect(Collectors.toList()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 097ad65d2..260774074 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -108,6 +108,24 @@ public abstract class HoodieTable implements Seri return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline()); } + /** + * Get the read optimized view of the file system for this table + * + * @return + */ + public TableFileSystemView.ReadOptimizedView getROFileSystemView() { + return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline()); + } + + /** + * Get the real time view of the file system for this table + * + * @return + */ + public TableFileSystemView.RealtimeView getRTFileSystemView() { + return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline()); + } + /** * Get the view of the file system for this table * diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index 0a24f6294..c70516305 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -44,6 +44,7 @@ import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; +import java.util.Collection; import java.util.Map; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; @@ -412,7 +413,7 @@ public class TestHoodieClient implements Serializable { List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); - final TableFileSystemView view = table.getFileSystemView(); + final TableFileSystemView.ReadOptimizedView view = table.getROFileSystemView(); List dataFiles = partitionPaths.stream().flatMap(s -> { return view.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002")); }).collect(Collectors.toList()); @@ -431,7 +432,7 @@ public class TestHoodieClient implements Serializable { metaClient = new HoodieTableMetaClient(fs, basePath); table = HoodieTable.getHoodieTable(metaClient, getConfig()); - final TableFileSystemView view1 = table.getFileSystemView(); + final TableFileSystemView.ReadOptimizedView view1 = table.getROFileSystemView(); dataFiles = partitionPaths.stream().flatMap(s -> { return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002")); }).collect(Collectors.toList()); @@ -482,7 +483,7 @@ public class TestHoodieClient implements Serializable { List partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); - final TableFileSystemView view1 = table.getFileSystemView(); + final TableFileSystemView.ReadOptimizedView view1 = table.getROFileSystemView(); List dataFiles = partitionPaths.stream().flatMap(s -> { return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("003")); @@ -501,7 +502,7 @@ public class TestHoodieClient implements Serializable { metaClient = new HoodieTableMetaClient(fs, basePath); table = HoodieTable.getHoodieTable(metaClient, getConfig()); - final TableFileSystemView view2 = table.getFileSystemView(); + final TableFileSystemView.ReadOptimizedView view2 = table.getROFileSystemView(); dataFiles = partitionPaths.stream().flatMap(s -> { return view2.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("004")); @@ -524,7 +525,7 @@ public class TestHoodieClient implements Serializable { metaClient = new HoodieTableMetaClient(fs, basePath); table = HoodieTable.getHoodieTable(metaClient, getConfig()); - final TableFileSystemView view3 = table.getFileSystemView(); + final TableFileSystemView.ReadOptimizedView view3 = table.getROFileSystemView(); dataFiles = partitionPaths.stream().flatMap(s -> { return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002")); }).collect(Collectors.toList()); @@ -961,7 +962,7 @@ public class TestHoodieClient implements Serializable { assertEquals("2 files needs to be committed.", 2, statuses.size()); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metadata, config); - TableFileSystemView fileSystemView = table.getFileSystemView(); + TableFileSystemView.ReadOptimizedView fileSystemView = table.getROFileSystemView(); List files = fileSystemView.getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3).collect( Collectors.toList()); int numTotalInsertsInCommit3 = 0; @@ -1057,7 +1058,7 @@ public class TestHoodieClient implements Serializable { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); List files = - table.getFileSystemView().getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3) + table.getROFileSystemView().getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3) .collect(Collectors.toList()); assertEquals("Total of 2 valid data files", 2, files.size()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java index a55857c04..adf5c3a7e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java @@ -128,13 +128,13 @@ public class TestMergeOnReadTable { assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); - TableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); - Stream dataFilesToRead = fsView.getLatestDataFiles(); + Stream dataFilesToRead = roView.getLatestDataFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); - fsView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); - dataFilesToRead = fsView.getLatestDataFiles(); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + dataFilesToRead = roView.getLatestDataFiles(); assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", dataFilesToRead.findAny().isPresent()); @@ -169,8 +169,8 @@ public class TestMergeOnReadTable { compactor.compact(jsc, getConfig(), table); allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); - fsView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); - dataFilesToRead = fsView.getLatestDataFiles(); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + dataFilesToRead = roView.getLatestDataFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); // verify that there is a commit diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index e2a1f1550..1150e2e07 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -167,7 +167,7 @@ public class TestHoodieCompactor { table = HoodieTable.getHoodieTable(metaClient, config); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = - table.getFileSystemView().getLatestFileSlices(partitionPath) + table.getRTFileSystemView().getLatestFileSlices(partitionPath) .collect(Collectors.toList()); for (FileSlice fileSlice : groupedLogFiles) { assertEquals("There should be 1 log file written for every data file", 1, @@ -192,7 +192,7 @@ public class TestHoodieCompactor { HoodieTimeline.GREATER)); for (String partitionPath : dataGen.getPartitionPaths()) { - List groupedLogFiles = table.getFileSystemView() + List groupedLogFiles = table.getRTFileSystemView() .getLatestFileSlices(partitionPath) .collect(Collectors.toList()); for (FileSlice slice: groupedLogFiles) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java index d5f64f11b..be8b6c1c4 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java @@ -82,7 +82,7 @@ public class FileSlice implements Serializable { } public Optional getDataFile() { - return Optional.of(dataFile); + return Optional.ofNullable(dataFile); } @Override diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java index 50daa5e5c..ee4e373b5 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/TableFileSystemView.java @@ -30,91 +30,69 @@ import java.util.stream.Stream; /** * Interface for viewing the table file system. - * Dependening on the Hoodie Table Type - The view of the filesystem changes. - *

- * ReadOptimizedView - Lets queries run only on organized columnar data files at the expense of latency - * WriteOptimizedView - Lets queries run on columnar data as well as delta files (sequential) at the expense of query execution time * * @since 0.3.0 */ public interface TableFileSystemView { /** - * Stream all the latest data files in the given partition - * - * @param partitionPath - * @return + * ReadOptimizedView - methods to provide a view of columnar data files only. */ - Stream getLatestDataFiles(String partitionPath); + interface ReadOptimizedView { + /** + * Stream all the latest data files in the given partition + */ + Stream getLatestDataFiles(String partitionPath); + + /** + * Stream all the latest data files, in the file system view + */ + Stream getLatestDataFiles(); + + /** + * Stream all the latest version data files in the given partition with precondition that + * commitTime(file) before maxCommitTime + */ + Stream getLatestDataFilesBeforeOrOn(String partitionPath, + String maxCommitTime); + + /** + * Stream all the latest data files pass + */ + Stream getLatestDataFilesInRange(List commitsToReturn); + + /** + * Stream all the data file versions grouped by FileId for a given partition + */ + Stream getAllDataFiles(String partitionPath); + } /** - * Stream all the latest data files, in the file system view - * - * @return + * RealtimeView - methods to access a combination of columnar data files + log files with real time data. */ - Stream getLatestDataFiles(); + interface RealtimeView { + /** + * Stream all the latest file slices in the given partition + */ + Stream getLatestFileSlices(String partitionPath); - /** - * Stream all the latest version data files in the given partition - * with precondition that commitTime(file) before maxCommitTime - * - * @param partitionPath - * @param maxCommitTime - * @return - */ - Stream getLatestDataFilesBeforeOrOn(String partitionPath, + /** + * Stream all the latest file slices in the given partition with precondition that + * commitTime(file) before maxCommitTime + */ + Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime); - /** - * Stream all the latest data files pass - * - * @param commitsToReturn - * @return - */ - Stream getLatestDataFilesInRange(List commitsToReturn); + /** + * Stream all the latest file slices, in the given range + */ + Stream getLatestFileSliceInRange(List commitsToReturn); - /** - * Stream all the data file versions grouped by FileId for a given partition - * - * @param partitionPath - * @return - */ - Stream getAllDataFiles(String partitionPath); - - /** - * Stream all the latest file slices in the given partition - * - * @param partitionPath - * @return - */ - Stream getLatestFileSlices(String partitionPath); - - /** - * Stream all the latest file slices in the given partition - * with precondition that commitTime(file) before maxCommitTime - * - * @param partitionPath - * @param maxCommitTime - * @return - */ - Stream getLatestFileSlicesBeforeOrOn(String partitionPath, - String maxCommitTime); - - /** - * Stream all the latest file slices, in the given range - * - * @param commitsToReturn - * @return - */ - Stream getLatestFileSliceInRange(List commitsToReturn); - - /** - * Stream all the file slices for a given partition, latest or not. - * - * @param partitionPath - * @return - */ - Stream getAllFileSlices(String partitionPath); + /** + * Stream all the file slices for a given partition, latest or not. + */ + Stream getAllFileSlices(String partitionPath); + } /** * Stream all the file groups for a given partition @@ -123,12 +101,4 @@ public interface TableFileSystemView { * @return */ Stream getAllFileGroups(String partitionPath); - - /** - * Get the file Status for the path specified - * - * @param path - * @return - */ - FileStatus getFileStatus(String path); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index b345ad61f..6f1d63c44 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -16,8 +16,6 @@ package com.uber.hoodie.common.table.view; -import static java.util.stream.Collectors.toList; - import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; @@ -44,7 +42,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -57,7 +54,8 @@ import java.util.stream.Stream; * @see TableFileSystemView * @since 0.3.0 */ -public class HoodieTableFileSystemView implements TableFileSystemView, Serializable { +public class HoodieTableFileSystemView implements TableFileSystemView, TableFileSystemView.ReadOptimizedView, + TableFileSystemView.RealtimeView, Serializable { protected HoodieTableMetaClient metaClient; protected transient FileSystem fs; @@ -187,7 +185,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa @Override public Stream getLatestDataFiles() { - return fileGroupMap.values().stream() + return fileGroupMap.values().stream() .map(fileGroup -> fileGroup.getLatestDataFile()) .filter(dataFileOpt -> dataFileOpt.isPresent()) .map(Optional::get); @@ -271,13 +269,4 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa "Failed to list data files in partition " + partitionPathStr, e); } } - - @Override - public FileStatus getFileStatus(String path) { - try { - return fs.getFileStatus(new Path(path)); - } catch (IOException e) { - throw new HoodieIOException("Could not get FileStatus on path " + path); - } - } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java index d56e33470..1e273fa1e 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java @@ -18,8 +18,11 @@ package com.uber.hoodie.common.table.view; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + +import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieFileGroup; +import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -35,11 +38,15 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.*; @@ -48,6 +55,8 @@ public class HoodieTableFileSystemViewTest { private HoodieTableMetaClient metaClient; private String basePath; private TableFileSystemView fsView; + private TableFileSystemView.ReadOptimizedView roView; + private TableFileSystemView.RealtimeView rtView; @Before public void init() throws IOException { @@ -57,6 +66,8 @@ public class HoodieTableFileSystemViewTest { metaClient = HoodieTestUtils.init(basePath); fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); + roView = (TableFileSystemView.ReadOptimizedView) fsView; + rtView = (TableFileSystemView.RealtimeView) fsView; } private void refreshFsView(FileStatus[] statuses) { @@ -69,6 +80,8 @@ public class HoodieTableFileSystemViewTest { fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); } + roView = (TableFileSystemView.ReadOptimizedView) fsView; + rtView = (TableFileSystemView.RealtimeView) fsView; } @Test @@ -78,7 +91,7 @@ public class HoodieTableFileSystemViewTest { String fileId = UUID.randomUUID().toString(); assertFalse("No commit, should not find any data file", - fsView.getLatestDataFiles(partitionPath) + roView.getLatestDataFiles(partitionPath) .filter(dfile -> dfile.getFileId().equals(fileId)).findFirst().isPresent()); // Only one commit, but is not safe @@ -87,7 +100,7 @@ public class HoodieTableFileSystemViewTest { new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile(); refreshFsView(null); assertFalse("No commit, should not find any data file", - fsView.getLatestDataFiles(partitionPath) + roView.getLatestDataFiles(partitionPath) .filter(dfile -> dfile.getFileId().equals(fileId)) .findFirst().isPresent()); @@ -97,7 +110,7 @@ public class HoodieTableFileSystemViewTest { new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); commitTimeline.saveAsComplete(instant1, Optional.empty()); refreshFsView(null); - assertEquals("", fileName1, fsView + assertEquals("", fileName1, roView .getLatestDataFiles(partitionPath) .filter(dfile -> dfile.getFileId().equals(fileId)) .findFirst().get() @@ -108,7 +121,7 @@ public class HoodieTableFileSystemViewTest { String fileName2 = FSUtils.makeDataFileName(commitTime2, 1, fileId); new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile(); refreshFsView(null); - assertEquals("", fileName1, fsView + assertEquals("", fileName1, roView .getLatestDataFiles(partitionPath) .filter(dfile -> dfile.getFileId().equals(fileId)) .findFirst().get() @@ -119,7 +132,7 @@ public class HoodieTableFileSystemViewTest { new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2); commitTimeline.saveAsComplete(instant2, Optional.empty()); refreshFsView(null); - assertEquals("", fileName2, fsView + assertEquals("", fileName2, roView .getLatestDataFiles(partitionPath) .filter(dfile -> dfile.getFileId().equals(fileId)) .findFirst().get() @@ -138,21 +151,31 @@ public class HoodieTableFileSystemViewTest { String fileId1 = UUID.randomUUID().toString(); String fileId2 = UUID.randomUUID().toString(); String fileId3 = UUID.randomUUID().toString(); + String fileId4 = UUID.randomUUID().toString(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)) .createNewFile(); + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)) + .createNewFile(); + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 1)) + .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)) .createNewFile(); + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0)) + .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)) .createNewFile(); + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)) + .createNewFile(); + new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); @@ -161,13 +184,24 @@ public class HoodieTableFileSystemViewTest { // Now we list the entire partition FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); - assertEquals(statuses.length, 7); - + assertEquals(11, statuses.length); refreshFsView(null); + + // Check files as of lastest commit. + List allSlices = rtView.getAllFileSlices("2016/05/01").collect(Collectors.toList()); + assertEquals(8, allSlices.size()); + Map fileSliceMap = allSlices.stream().collect(Collectors.groupingBy( + slice -> slice.getFileId(), Collectors.counting())); + assertEquals(2, fileSliceMap.get(fileId1).longValue()); + assertEquals(3, fileSliceMap.get(fileId2).longValue()); + assertEquals(2, fileSliceMap.get(fileId3).longValue()); + assertEquals(1, fileSliceMap.get(fileId4).longValue()); + + List dataFileList = - fsView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime4) + roView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime4) .collect(Collectors.toList()); - assertEquals(dataFileList.size(), 3); + assertEquals(3, dataFileList.size()); Set filenames = Sets.newHashSet(); for (HoodieDataFile status : dataFileList) { filenames.add(status.getFileName()); @@ -176,18 +210,40 @@ public class HoodieTableFileSystemViewTest { assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3))); - // Reset the max commit time - List statuses2 = - fsView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime3) - .collect(Collectors.toList()); - assertEquals(statuses2.size(), 3); filenames = Sets.newHashSet(); - for (HoodieDataFile status : statuses2) { + List logFilesList = + rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4) + .map(slice -> slice.getLogFiles()) + .flatMap(logFileList -> logFileList) + .collect(Collectors.toList()); + assertEquals(logFilesList.size(), 4); + for (HoodieLogFile logFile: logFilesList) { + filenames.add(logFile.getFileName()); + } + assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))); + assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 1))); + assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0))); + assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))); + + // Reset the max commit time + List dataFiles = + roView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime3) + .collect(Collectors.toList()); + assertEquals(dataFiles.size(), 3); + filenames = Sets.newHashSet(); + for (HoodieDataFile status : dataFiles) { filenames.add(status.getFileName()); } assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3))); + + logFilesList = + rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3) + .map(slice -> slice.getLogFiles()) + .flatMap(logFileList -> logFileList).collect(Collectors.toList()); + assertEquals(logFilesList.size(), 1); + assertTrue(logFilesList.get(0).getFileName().equals(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0))); } @Test @@ -225,12 +281,12 @@ public class HoodieTableFileSystemViewTest { // Now we list the entire partition FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); - assertEquals(statuses.length, 7); + assertEquals(7, statuses.length); refreshFsView(null); List fileGroups = fsView.getAllFileGroups("2016/05/01").collect(Collectors.toList()); - assertEquals(fileGroups.size(), 3); + assertEquals(3, fileGroups.size()); for (HoodieFileGroup fileGroup : fileGroups) { String fileId = fileGroup.getId(); @@ -271,19 +327,28 @@ public class HoodieTableFileSystemViewTest { new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)) .createNewFile(); - new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)) + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0)) + .createNewFile(); + new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId1)) .createNewFile(); + + + new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)) .createNewFile(); + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)) + .createNewFile(); + new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)) .createNewFile(); + new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile(); @@ -291,19 +356,40 @@ public class HoodieTableFileSystemViewTest { // Now we list the entire partition FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); - assertEquals(statuses.length, 7); + assertEquals(9, statuses.length); refreshFsView(statuses); - List statuses1 = fsView + List dataFiles = roView .getLatestDataFilesInRange(Lists.newArrayList(commitTime2, commitTime3)) .collect(Collectors.toList()); - assertEquals(statuses1.size(), 2); + assertEquals(3, dataFiles.size()); Set filenames = Sets.newHashSet(); - for (HoodieDataFile status : statuses1) { + for (HoodieDataFile status : dataFiles) { filenames.add(status.getFileName()); } assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3))); + + + List slices = rtView + .getLatestFileSliceInRange(Lists.newArrayList(commitTime3, commitTime4)) + .collect(Collectors.toList()); + assertEquals(3, slices.size()); + for (FileSlice slice: slices) { + if (slice.getFileId().equals(fileId1)) { + assertEquals(slice.getBaseCommitTime(), commitTime3); + assertTrue(slice.getDataFile().isPresent()); + assertEquals(slice.getLogFiles().count(), 0); + } else if (slice.getFileId().equals(fileId2)) { + assertEquals(slice.getBaseCommitTime(), commitTime4); + assertFalse(slice.getDataFile().isPresent()); + assertEquals(slice.getLogFiles().count(), 1); + } else if (slice.getFileId().equals(fileId3)) { + assertEquals(slice.getBaseCommitTime(), commitTime4); + assertTrue(slice.getDataFile().isPresent()); + assertEquals(slice.getLogFiles().count(), 0); + } + } } @Test @@ -342,20 +428,19 @@ public class HoodieTableFileSystemViewTest { // Now we list the entire partition FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); - assertEquals(statuses.length, 7); + assertEquals(7, statuses.length); refreshFsView(null); - List statuses1 = - fsView.getLatestDataFilesBeforeOrOn(partitionPath, commitTime2) + List dataFiles = + roView.getLatestDataFilesBeforeOrOn(partitionPath, commitTime2) .collect(Collectors.toList()); - assertEquals(statuses1.size(), 2); + assertEquals(2, dataFiles.size()); Set filenames = Sets.newHashSet(); - for (HoodieDataFile status : statuses1) { + for (HoodieDataFile status : dataFiles) { filenames.add(status.getFileName()); } assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime2, 1, fileId2))); - } @Test @@ -374,19 +459,28 @@ public class HoodieTableFileSystemViewTest { new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)) .createNewFile(); + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0)) + .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)) .createNewFile(); + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)) + .createNewFile(); + new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)) .createNewFile(); + new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0)) + .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)) .createNewFile(); + new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)) .createNewFile(); new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)) .createNewFile(); + new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile(); @@ -394,12 +488,35 @@ public class HoodieTableFileSystemViewTest { // Now we list the entire partition FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); - assertEquals(statuses.length, 7); + assertEquals(10, statuses.length); refreshFsView(statuses); + + List fileGroups = fsView + .getAllFileGroups(partitionPath) + .collect(Collectors.toList()); + assertEquals(3, fileGroups.size()); + for (HoodieFileGroup fileGroup: fileGroups) { + List slices = fileGroup.getAllFileSlices().collect(Collectors.toList()); + if (fileGroup.getId().equals(fileId1)) { + assertEquals(2, slices.size()); + assertEquals(commitTime4, slices.get(0).getBaseCommitTime()); + assertEquals(commitTime1, slices.get(1).getBaseCommitTime()); + } else if (fileGroup.getId().equals(fileId2)) { + assertEquals(3, slices.size()); + assertEquals(commitTime3, slices.get(0).getBaseCommitTime()); + assertEquals(commitTime2, slices.get(1).getBaseCommitTime()); + assertEquals(commitTime1, slices.get(2).getBaseCommitTime()); + } else if (fileGroup.getId().equals(fileId3)) { + assertEquals(2, slices.size()); + assertEquals(commitTime4, slices.get(0).getBaseCommitTime()); + assertEquals(commitTime3, slices.get(1).getBaseCommitTime()); + } + } + List statuses1 = - fsView.getLatestDataFiles().collect(Collectors.toList()); - assertEquals(statuses1.size(), 3); + roView.getLatestDataFiles().collect(Collectors.toList()); + assertEquals(3, statuses1.size()); Set filenames = Sets.newHashSet(); for (HoodieDataFile status : statuses1) { filenames.add(status.getFileName()); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index 5b35ca9da..2b30a33b3 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -24,6 +24,7 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; +import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.InvalidDatasetException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -101,7 +102,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName); // Get all commits, delta commits, compactions, as all of them produce a base parquet file today HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); - TableFileSystemView fsView = new HoodieTableFileSystemView(metadata, timeline, statuses); + TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metadata, timeline, statuses); if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) { // this is of the form commitTs_partition_sequenceNumber @@ -112,25 +113,25 @@ public class HoodieInputFormat extends MapredParquetInputFormat List commitsToReturn = timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - List filteredFiles = fsView + List filteredFiles = roView .getLatestDataFilesInRange(commitsToReturn) .collect(Collectors.toList()); for (HoodieDataFile filteredFile : filteredFiles) { LOG.info("Processing incremental hoodie file - " + filteredFile.getPath()); - filteredFile = checkFileStatus(fsView, filteredFile); + filteredFile = checkFileStatus(filteredFile); returns.add(filteredFile.getFileStatus()); } LOG.info( "Total paths to process after hoodie incremental filter " + filteredFiles.size()); } else { // filter files on the latest commit found - List filteredFiles = fsView.getLatestDataFiles().collect(Collectors.toList()); + List filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList()); LOG.info("Total paths to process after hoodie filter " + filteredFiles.size()); for (HoodieDataFile filteredFile : filteredFiles) { if (LOG.isDebugEnabled()) { LOG.debug("Processing latest hoodie file - " + filteredFile.getPath()); } - filteredFile = checkFileStatus(fsView, filteredFile); + filteredFile = checkFileStatus(filteredFile); returns.add(filteredFile.getFileStatus()); } } @@ -140,21 +141,23 @@ public class HoodieInputFormat extends MapredParquetInputFormat } /** - * Checks the file status for a race condition which can set the file size to 0. - * 1. HiveInputFormat does super.listStatus() and gets back a FileStatus[] - * 2. Then it creates the HoodieTableMetaClient for the paths listed. - * 3. Generation of splits looks at FileStatus size to create splits, which skips this file - * - * @param fsView - * @param dataFile - * @return + * Checks the file status for a race condition which can set the file size to 0. 1. + * HiveInputFormat does super.listStatus() and gets back a FileStatus[] 2. Then it creates the + * HoodieTableMetaClient for the paths listed. 3. Generation of splits looks at FileStatus size + * to create splits, which skips this file */ - private HoodieDataFile checkFileStatus(TableFileSystemView fsView, HoodieDataFile dataFile) { - if(dataFile.getFileSize() == 0) { - LOG.info("Refreshing file status " + dataFile.getPath()); - return new HoodieDataFile(fsView.getFileStatus(dataFile.getPath())); + private HoodieDataFile checkFileStatus(HoodieDataFile dataFile) throws IOException { + Path dataPath = dataFile.getFileStatus().getPath(); + try { + if (dataFile.getFileSize() == 0) { + FileSystem fs = dataPath.getFileSystem(conf); + LOG.info("Refreshing file status " + dataFile.getPath()); + return new HoodieDataFile(fs.getFileStatus(dataPath)); + } + return dataFile; + } catch (IOException e) { + throw new HoodieIOException("Could not get FileStatus on path " + dataPath); } - return dataFile; } private Map> groupFileStatus(FileStatus[] fileStatuses) diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index 4b3bdb601..9bb7869bd 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -69,7 +69,7 @@ public class HoodieSnapshotCopier implements Serializable { public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir, final boolean shouldAssumeDatePartitioning) throws IOException { FileSystem fs = FSUtils.getFs(); final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir); - final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata, + final TableFileSystemView.ReadOptimizedView fsView = new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants()); // Get the latest commit Optional latestCommit = tableMetadata.getActiveTimeline()