1
0

Introduce ReadOptimizedView & RealtimeView out of TableFileSystemView

- Usage now marks code as clearly using either RO or RT views, for future evolution
  - Tests on all of FileGroups and FileSlices
This commit is contained in:
Vinoth Chandar
2017-06-19 17:16:45 -07:00
committed by prazanna
parent c00f1a9ed9
commit 754ab88a2d
16 changed files with 264 additions and 166 deletions

View File

@@ -177,7 +177,7 @@ public class HoodieReadClient implements Serializable {
+ hoodieTable.getMetaClient().getBasePath()); + hoodieTable.getMetaClient().getBasePath());
} }
TableFileSystemView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(), TableFileSystemView.ReadOptimizedView fileSystemView = new HoodieTableFileSystemView(hoodieTable.getMetaClient(),
hoodieTable.getCompletedCommitTimeline(), fs.globStatus(new Path(path))); hoodieTable.getCompletedCommitTimeline(), fs.globStatus(new Path(path)));
List<HoodieDataFile> latestFiles = fileSystemView.getLatestDataFiles().collect( List<HoodieDataFile> latestFiles = fileSystemView.getLatestDataFiles().collect(
Collectors.toList()); Collectors.toList());

View File

@@ -479,7 +479,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
.mapToPair((PairFunction<String, String, List<String>>) partitionPath -> { .mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
// Scan all partitions files with this commit time // Scan all partitions files with this commit time
logger.info("Collecting latest files in partition path " + partitionPath); logger.info("Collecting latest files in partition path " + partitionPath);
TableFileSystemView view = table.getFileSystemView(); TableFileSystemView.ReadOptimizedView view = table.getROFileSystemView();
List<String> latestFiles = List<String> latestFiles =
view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime) view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime)
.map(HoodieDataFile::getFileName).collect(Collectors.toList()); .map(HoodieDataFile::getFileName).collect(Collectors.toList());

View File

@@ -184,7 +184,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
List<Tuple2<String, String>> list = new ArrayList<>(); List<Tuple2<String, String>> list = new ArrayList<>();
if (latestCommitTime.isPresent()) { if (latestCommitTime.isPresent()) {
List<HoodieDataFile> filteredFiles = List<HoodieDataFile> filteredFiles =
hoodieTable.getFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath, hoodieTable.getROFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath,
latestCommitTime.get().getTimestamp()).collect(Collectors.toList()); latestCommitTime.get().getTimestamp()).collect(Collectors.toList());
for (HoodieDataFile file : filteredFiles) { for (HoodieDataFile file : filteredFiles) {
list.add(new Tuple2<>(partitionPath, file.getFileName())); list.add(new Tuple2<>(partitionPath, file.getFileName()));

View File

@@ -40,7 +40,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
protected final FileSystem fs; protected final FileSystem fs;
protected final HoodieTable<T> hoodieTable; protected final HoodieTable<T> hoodieTable;
protected HoodieTimeline hoodieTimeline; protected HoodieTimeline hoodieTimeline;
protected TableFileSystemView fileSystemView; protected TableFileSystemView.ReadOptimizedView fileSystemView;
protected final Schema schema; protected final Schema schema;
public HoodieIOHandle(HoodieWriteConfig config, String commitTime, public HoodieIOHandle(HoodieWriteConfig config, String commitTime,
@@ -50,7 +50,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
this.fs = FSUtils.getFs(); this.fs = FSUtils.getFs();
this.hoodieTable = hoodieTable; this.hoodieTable = hoodieTable;
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline(); this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
this.fileSystemView = hoodieTable.getFileSystemView(); this.fileSystemView = hoodieTable.getROFileSystemView();
this.schema = this.schema =
HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
} }

View File

@@ -84,7 +84,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
List<CompactionOperation> operations = List<CompactionOperation> operations =
jsc.parallelize(partitionPaths, partitionPaths.size()) jsc.parallelize(partitionPaths, partitionPaths.size())
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> hoodieTable .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> hoodieTable
.getFileSystemView() .getRTFileSystemView()
.getLatestFileSlices(partitionPath) .getLatestFileSlices(partitionPath)
.map(s -> new CompactionOperation(s.getDataFile().get(), .map(s -> new CompactionOperation(s.getDataFile().get(),
partitionPath, s.getLogFiles().collect(Collectors.toList()), config)) partitionPath, s.getLogFiles().collect(Collectors.toList()), config))

View File

@@ -311,7 +311,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
if (!commitTimeline.empty()) { // if we have some commits if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
List<HoodieDataFile> allFiles = getFileSystemView() List<HoodieDataFile> allFiles = getROFileSystemView()
.getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()) .getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp())
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@@ -108,6 +108,24 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline()); return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
} }
/**
* Get the read optimized view of the file system for this table
*
* @return
*/
public TableFileSystemView.ReadOptimizedView getROFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
}
/**
* Get the real time view of the file system for this table
*
* @return
*/
public TableFileSystemView.RealtimeView getRTFileSystemView() {
return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline());
}
/** /**
* Get the view of the file system for this table * Get the view of the file system for this table
* *

View File

@@ -44,6 +44,7 @@ import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@@ -412,7 +413,7 @@ public class TestHoodieClient implements Serializable {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view = table.getFileSystemView(); final TableFileSystemView.ReadOptimizedView view = table.getROFileSystemView();
List<HoodieDataFile> dataFiles = partitionPaths.stream().flatMap(s -> { List<HoodieDataFile> dataFiles = partitionPaths.stream().flatMap(s -> {
return view.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002")); return view.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList()); }).collect(Collectors.toList());
@@ -431,7 +432,7 @@ public class TestHoodieClient implements Serializable {
metaClient = new HoodieTableMetaClient(fs, basePath); metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig()); table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view1 = table.getFileSystemView(); final TableFileSystemView.ReadOptimizedView view1 = table.getROFileSystemView();
dataFiles = partitionPaths.stream().flatMap(s -> { dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002")); return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList()); }).collect(Collectors.toList());
@@ -482,7 +483,7 @@ public class TestHoodieClient implements Serializable {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning()); List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view1 = table.getFileSystemView(); final TableFileSystemView.ReadOptimizedView view1 = table.getROFileSystemView();
List<HoodieDataFile> dataFiles = partitionPaths.stream().flatMap(s -> { List<HoodieDataFile> dataFiles = partitionPaths.stream().flatMap(s -> {
return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("003")); return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("003"));
@@ -501,7 +502,7 @@ public class TestHoodieClient implements Serializable {
metaClient = new HoodieTableMetaClient(fs, basePath); metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig()); table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view2 = table.getFileSystemView(); final TableFileSystemView.ReadOptimizedView view2 = table.getROFileSystemView();
dataFiles = partitionPaths.stream().flatMap(s -> { dataFiles = partitionPaths.stream().flatMap(s -> {
return view2.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("004")); return view2.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("004"));
@@ -524,7 +525,7 @@ public class TestHoodieClient implements Serializable {
metaClient = new HoodieTableMetaClient(fs, basePath); metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig()); table = HoodieTable.getHoodieTable(metaClient, getConfig());
final TableFileSystemView view3 = table.getFileSystemView(); final TableFileSystemView.ReadOptimizedView view3 = table.getROFileSystemView();
dataFiles = partitionPaths.stream().flatMap(s -> { dataFiles = partitionPaths.stream().flatMap(s -> {
return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002")); return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
}).collect(Collectors.toList()); }).collect(Collectors.toList());
@@ -961,7 +962,7 @@ public class TestHoodieClient implements Serializable {
assertEquals("2 files needs to be committed.", 2, statuses.size()); assertEquals("2 files needs to be committed.", 2, statuses.size());
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metadata, config); HoodieTable table = HoodieTable.getHoodieTable(metadata, config);
TableFileSystemView fileSystemView = table.getFileSystemView(); TableFileSystemView.ReadOptimizedView fileSystemView = table.getROFileSystemView();
List<HoodieDataFile> files = fileSystemView.getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3).collect( List<HoodieDataFile> files = fileSystemView.getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3).collect(
Collectors.toList()); Collectors.toList());
int numTotalInsertsInCommit3 = 0; int numTotalInsertsInCommit3 = 0;
@@ -1057,7 +1058,7 @@ public class TestHoodieClient implements Serializable {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config);
List<HoodieDataFile> files = List<HoodieDataFile> files =
table.getFileSystemView().getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3) table.getROFileSystemView().getLatestDataFilesBeforeOrOn(TEST_PARTITION_PATH, commitTime3)
.collect(Collectors.toList()); .collect(Collectors.toList());
assertEquals("Total of 2 valid data files", 2, files.size()); assertEquals("Total of 2 valid data files", 2, files.size());

View File

@@ -128,13 +128,13 @@ public class TestMergeOnReadTable {
assertFalse(commit.isPresent()); assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
TableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); hoodieTable.getCompletedCompactionCommitTimeline(), allFiles);
Stream<HoodieDataFile> dataFilesToRead = fsView.getLatestDataFiles(); Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
assertTrue(!dataFilesToRead.findAny().isPresent()); assertTrue(!dataFilesToRead.findAny().isPresent());
fsView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
dataFilesToRead = fsView.getLatestDataFiles(); dataFilesToRead = roView.getLatestDataFiles();
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
dataFilesToRead.findAny().isPresent()); dataFilesToRead.findAny().isPresent());
@@ -169,8 +169,8 @@ public class TestMergeOnReadTable {
compactor.compact(jsc, getConfig(), table); compactor.compact(jsc, getConfig(), table);
allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
fsView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
dataFilesToRead = fsView.getLatestDataFiles(); dataFilesToRead = roView.getLatestDataFiles();
assertTrue(dataFilesToRead.findAny().isPresent()); assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit // verify that there is a commit

View File

@@ -167,7 +167,7 @@ public class TestHoodieCompactor {
table = HoodieTable.getHoodieTable(metaClient, config); table = HoodieTable.getHoodieTable(metaClient, config);
for (String partitionPath : dataGen.getPartitionPaths()) { for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles = List<FileSlice> groupedLogFiles =
table.getFileSystemView().getLatestFileSlices(partitionPath) table.getRTFileSystemView().getLatestFileSlices(partitionPath)
.collect(Collectors.toList()); .collect(Collectors.toList());
for (FileSlice fileSlice : groupedLogFiles) { for (FileSlice fileSlice : groupedLogFiles) {
assertEquals("There should be 1 log file written for every data file", 1, assertEquals("There should be 1 log file written for every data file", 1,
@@ -192,7 +192,7 @@ public class TestHoodieCompactor {
HoodieTimeline.GREATER)); HoodieTimeline.GREATER));
for (String partitionPath : dataGen.getPartitionPaths()) { for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles = table.getFileSystemView() List<FileSlice> groupedLogFiles = table.getRTFileSystemView()
.getLatestFileSlices(partitionPath) .getLatestFileSlices(partitionPath)
.collect(Collectors.toList()); .collect(Collectors.toList());
for (FileSlice slice: groupedLogFiles) { for (FileSlice slice: groupedLogFiles) {

View File

@@ -82,7 +82,7 @@ public class FileSlice implements Serializable {
} }
public Optional<HoodieDataFile> getDataFile() { public Optional<HoodieDataFile> getDataFile() {
return Optional.of(dataFile); return Optional.ofNullable(dataFile);
} }
@Override @Override

View File

@@ -30,91 +30,69 @@ import java.util.stream.Stream;
/** /**
* Interface for viewing the table file system. * Interface for viewing the table file system.
* Dependening on the Hoodie Table Type - The view of the filesystem changes.
* <p>
* ReadOptimizedView - Lets queries run only on organized columnar data files at the expense of latency
* WriteOptimizedView - Lets queries run on columnar data as well as delta files (sequential) at the expense of query execution time
* *
* @since 0.3.0 * @since 0.3.0
*/ */
public interface TableFileSystemView { public interface TableFileSystemView {
/** /**
* Stream all the latest data files in the given partition * ReadOptimizedView - methods to provide a view of columnar data files only.
*
* @param partitionPath
* @return
*/ */
Stream<HoodieDataFile> getLatestDataFiles(String partitionPath); interface ReadOptimizedView {
/**
* Stream all the latest data files in the given partition
*/
Stream<HoodieDataFile> getLatestDataFiles(String partitionPath);
/**
* Stream all the latest data files, in the file system view
*/
Stream<HoodieDataFile> getLatestDataFiles();
/**
* Stream all the latest version data files in the given partition with precondition that
* commitTime(file) before maxCommitTime
*/
Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath,
String maxCommitTime);
/**
* Stream all the latest data files pass
*/
Stream<HoodieDataFile> getLatestDataFilesInRange(List<String> commitsToReturn);
/**
* Stream all the data file versions grouped by FileId for a given partition
*/
Stream<HoodieDataFile> getAllDataFiles(String partitionPath);
}
/** /**
* Stream all the latest data files, in the file system view * RealtimeView - methods to access a combination of columnar data files + log files with real time data.
*
* @return
*/ */
Stream<HoodieDataFile> getLatestDataFiles(); interface RealtimeView {
/**
* Stream all the latest file slices in the given partition
*/
Stream<FileSlice> getLatestFileSlices(String partitionPath);
/** /**
* Stream all the latest version data files in the given partition * Stream all the latest file slices in the given partition with precondition that
* with precondition that commitTime(file) before maxCommitTime * commitTime(file) before maxCommitTime
* */
* @param partitionPath Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
* @param maxCommitTime
* @return
*/
Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath,
String maxCommitTime); String maxCommitTime);
/** /**
* Stream all the latest data files pass * Stream all the latest file slices, in the given range
* */
* @param commitsToReturn Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn);
* @return
*/
Stream<HoodieDataFile> getLatestDataFilesInRange(List<String> commitsToReturn);
/** /**
* Stream all the data file versions grouped by FileId for a given partition * Stream all the file slices for a given partition, latest or not.
* */
* @param partitionPath Stream<FileSlice> getAllFileSlices(String partitionPath);
* @return }
*/
Stream<HoodieDataFile> getAllDataFiles(String partitionPath);
/**
* Stream all the latest file slices in the given partition
*
* @param partitionPath
* @return
*/
Stream<FileSlice> getLatestFileSlices(String partitionPath);
/**
* Stream all the latest file slices in the given partition
* with precondition that commitTime(file) before maxCommitTime
*
* @param partitionPath
* @param maxCommitTime
* @return
*/
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath,
String maxCommitTime);
/**
* Stream all the latest file slices, in the given range
*
* @param commitsToReturn
* @return
*/
Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn);
/**
* Stream all the file slices for a given partition, latest or not.
*
* @param partitionPath
* @return
*/
Stream<FileSlice> getAllFileSlices(String partitionPath);
/** /**
* Stream all the file groups for a given partition * Stream all the file groups for a given partition
@@ -123,12 +101,4 @@ public interface TableFileSystemView {
* @return * @return
*/ */
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath); Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
/**
* Get the file Status for the path specified
*
* @param path
* @return
*/
FileStatus getFileStatus(String path);
} }

View File

@@ -16,8 +16,6 @@
package com.uber.hoodie.common.table.view; package com.uber.hoodie.common.table.view;
import static java.util.stream.Collectors.toList;
import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroup;
@@ -44,7 +42,6 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -57,7 +54,8 @@ import java.util.stream.Stream;
* @see TableFileSystemView * @see TableFileSystemView
* @since 0.3.0 * @since 0.3.0
*/ */
public class HoodieTableFileSystemView implements TableFileSystemView, Serializable { public class HoodieTableFileSystemView implements TableFileSystemView, TableFileSystemView.ReadOptimizedView,
TableFileSystemView.RealtimeView, Serializable {
protected HoodieTableMetaClient metaClient; protected HoodieTableMetaClient metaClient;
protected transient FileSystem fs; protected transient FileSystem fs;
@@ -187,7 +185,7 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
@Override @Override
public Stream<HoodieDataFile> getLatestDataFiles() { public Stream<HoodieDataFile> getLatestDataFiles() {
return fileGroupMap.values().stream() return fileGroupMap.values().stream()
.map(fileGroup -> fileGroup.getLatestDataFile()) .map(fileGroup -> fileGroup.getLatestDataFile())
.filter(dataFileOpt -> dataFileOpt.isPresent()) .filter(dataFileOpt -> dataFileOpt.isPresent())
.map(Optional::get); .map(Optional::get);
@@ -271,13 +269,4 @@ public class HoodieTableFileSystemView implements TableFileSystemView, Serializa
"Failed to list data files in partition " + partitionPathStr, e); "Failed to list data files in partition " + partitionPathStr, e);
} }
} }
@Override
public FileStatus getFileStatus(String path) {
try {
return fs.getFileStatus(new Path(path));
} catch (IOException e) {
throw new HoodieIOException("Could not get FileStatus on path " + path);
}
}
} }

View File

@@ -18,8 +18,11 @@ package com.uber.hoodie.common.table.view;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieFileGroup; import com.uber.hoodie.common.model.HoodieFileGroup;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
@@ -35,11 +38,15 @@ import org.junit.rules.TemporaryFolder;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@@ -48,6 +55,8 @@ public class HoodieTableFileSystemViewTest {
private HoodieTableMetaClient metaClient; private HoodieTableMetaClient metaClient;
private String basePath; private String basePath;
private TableFileSystemView fsView; private TableFileSystemView fsView;
private TableFileSystemView.ReadOptimizedView roView;
private TableFileSystemView.RealtimeView rtView;
@Before @Before
public void init() throws IOException { public void init() throws IOException {
@@ -57,6 +66,8 @@ public class HoodieTableFileSystemViewTest {
metaClient = HoodieTestUtils.init(basePath); metaClient = HoodieTestUtils.init(basePath);
fsView = new HoodieTableFileSystemView(metaClient, fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
roView = (TableFileSystemView.ReadOptimizedView) fsView;
rtView = (TableFileSystemView.RealtimeView) fsView;
} }
private void refreshFsView(FileStatus[] statuses) { private void refreshFsView(FileStatus[] statuses) {
@@ -69,6 +80,8 @@ public class HoodieTableFileSystemViewTest {
fsView = new HoodieTableFileSystemView(metaClient, fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
} }
roView = (TableFileSystemView.ReadOptimizedView) fsView;
rtView = (TableFileSystemView.RealtimeView) fsView;
} }
@Test @Test
@@ -78,7 +91,7 @@ public class HoodieTableFileSystemViewTest {
String fileId = UUID.randomUUID().toString(); String fileId = UUID.randomUUID().toString();
assertFalse("No commit, should not find any data file", assertFalse("No commit, should not find any data file",
fsView.getLatestDataFiles(partitionPath) roView.getLatestDataFiles(partitionPath)
.filter(dfile -> dfile.getFileId().equals(fileId)).findFirst().isPresent()); .filter(dfile -> dfile.getFileId().equals(fileId)).findFirst().isPresent());
// Only one commit, but is not safe // Only one commit, but is not safe
@@ -87,7 +100,7 @@ public class HoodieTableFileSystemViewTest {
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile(); new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
refreshFsView(null); refreshFsView(null);
assertFalse("No commit, should not find any data file", assertFalse("No commit, should not find any data file",
fsView.getLatestDataFiles(partitionPath) roView.getLatestDataFiles(partitionPath)
.filter(dfile -> dfile.getFileId().equals(fileId)) .filter(dfile -> dfile.getFileId().equals(fileId))
.findFirst().isPresent()); .findFirst().isPresent());
@@ -97,7 +110,7 @@ public class HoodieTableFileSystemViewTest {
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);
commitTimeline.saveAsComplete(instant1, Optional.empty()); commitTimeline.saveAsComplete(instant1, Optional.empty());
refreshFsView(null); refreshFsView(null);
assertEquals("", fileName1, fsView assertEquals("", fileName1, roView
.getLatestDataFiles(partitionPath) .getLatestDataFiles(partitionPath)
.filter(dfile -> dfile.getFileId().equals(fileId)) .filter(dfile -> dfile.getFileId().equals(fileId))
.findFirst().get() .findFirst().get()
@@ -108,7 +121,7 @@ public class HoodieTableFileSystemViewTest {
String fileName2 = FSUtils.makeDataFileName(commitTime2, 1, fileId); String fileName2 = FSUtils.makeDataFileName(commitTime2, 1, fileId);
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile(); new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
refreshFsView(null); refreshFsView(null);
assertEquals("", fileName1, fsView assertEquals("", fileName1, roView
.getLatestDataFiles(partitionPath) .getLatestDataFiles(partitionPath)
.filter(dfile -> dfile.getFileId().equals(fileId)) .filter(dfile -> dfile.getFileId().equals(fileId))
.findFirst().get() .findFirst().get()
@@ -119,7 +132,7 @@ public class HoodieTableFileSystemViewTest {
new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2); new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2);
commitTimeline.saveAsComplete(instant2, Optional.empty()); commitTimeline.saveAsComplete(instant2, Optional.empty());
refreshFsView(null); refreshFsView(null);
assertEquals("", fileName2, fsView assertEquals("", fileName2, roView
.getLatestDataFiles(partitionPath) .getLatestDataFiles(partitionPath)
.filter(dfile -> dfile.getFileId().equals(fileId)) .filter(dfile -> dfile.getFileId().equals(fileId))
.findFirst().get() .findFirst().get()
@@ -138,21 +151,31 @@ public class HoodieTableFileSystemViewTest {
String fileId1 = UUID.randomUUID().toString(); String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString(); String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString(); String fileId3 = UUID.randomUUID().toString();
String fileId4 = UUID.randomUUID().toString();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))
.createNewFile();
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 1))
.createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0))
.createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))
.createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -161,13 +184,24 @@ public class HoodieTableFileSystemViewTest {
// Now we list the entire partition // Now we list the entire partition
FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath));
assertEquals(statuses.length, 7); assertEquals(11, statuses.length);
refreshFsView(null); refreshFsView(null);
// Check files as of lastest commit.
List<FileSlice> allSlices = rtView.getAllFileSlices("2016/05/01").collect(Collectors.toList());
assertEquals(8, allSlices.size());
Map<String, Long> fileSliceMap = allSlices.stream().collect(Collectors.groupingBy(
slice -> slice.getFileId(), Collectors.counting()));
assertEquals(2, fileSliceMap.get(fileId1).longValue());
assertEquals(3, fileSliceMap.get(fileId2).longValue());
assertEquals(2, fileSliceMap.get(fileId3).longValue());
assertEquals(1, fileSliceMap.get(fileId4).longValue());
List<HoodieDataFile> dataFileList = List<HoodieDataFile> dataFileList =
fsView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime4) roView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime4)
.collect(Collectors.toList()); .collect(Collectors.toList());
assertEquals(dataFileList.size(), 3); assertEquals(3, dataFileList.size());
Set<String> filenames = Sets.newHashSet(); Set<String> filenames = Sets.newHashSet();
for (HoodieDataFile status : dataFileList) { for (HoodieDataFile status : dataFileList) {
filenames.add(status.getFileName()); filenames.add(status.getFileName());
@@ -176,18 +210,40 @@ public class HoodieTableFileSystemViewTest {
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3)));
// Reset the max commit time
List<HoodieDataFile> statuses2 =
fsView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime3)
.collect(Collectors.toList());
assertEquals(statuses2.size(), 3);
filenames = Sets.newHashSet(); filenames = Sets.newHashSet();
for (HoodieDataFile status : statuses2) { List<HoodieLogFile> logFilesList =
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4)
.map(slice -> slice.getLogFiles())
.flatMap(logFileList -> logFileList)
.collect(Collectors.toList());
assertEquals(logFilesList.size(), 4);
for (HoodieLogFile logFile: logFilesList) {
filenames.add(logFile.getFileName());
}
assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)));
assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 1)));
assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0)));
assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)));
// Reset the max commit time
List<HoodieDataFile> dataFiles =
roView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime3)
.collect(Collectors.toList());
assertEquals(dataFiles.size(), 3);
filenames = Sets.newHashSet();
for (HoodieDataFile status : dataFiles) {
filenames.add(status.getFileName()); filenames.add(status.getFileName());
} }
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1)));
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3)));
logFilesList =
rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime3)
.map(slice -> slice.getLogFiles())
.flatMap(logFileList -> logFileList).collect(Collectors.toList());
assertEquals(logFilesList.size(), 1);
assertTrue(logFilesList.get(0).getFileName().equals(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0)));
} }
@Test @Test
@@ -225,12 +281,12 @@ public class HoodieTableFileSystemViewTest {
// Now we list the entire partition // Now we list the entire partition
FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath));
assertEquals(statuses.length, 7); assertEquals(7, statuses.length);
refreshFsView(null); refreshFsView(null);
List<HoodieFileGroup> fileGroups = List<HoodieFileGroup> fileGroups =
fsView.getAllFileGroups("2016/05/01").collect(Collectors.toList()); fsView.getAllFileGroups("2016/05/01").collect(Collectors.toList());
assertEquals(fileGroups.size(), 3); assertEquals(3, fileGroups.size());
for (HoodieFileGroup fileGroup : fileGroups) { for (HoodieFileGroup fileGroup : fileGroups) {
String fileId = fileGroup.getId(); String fileId = fileGroup.getId();
@@ -271,19 +327,28 @@ public class HoodieTableFileSystemViewTest {
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)) new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0))
.createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId1))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))
.createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3))
.createNewFile(); .createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile();
@@ -291,19 +356,40 @@ public class HoodieTableFileSystemViewTest {
// Now we list the entire partition // Now we list the entire partition
FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath));
assertEquals(statuses.length, 7); assertEquals(9, statuses.length);
refreshFsView(statuses); refreshFsView(statuses);
List<HoodieDataFile> statuses1 = fsView List<HoodieDataFile> dataFiles = roView
.getLatestDataFilesInRange(Lists.newArrayList(commitTime2, commitTime3)) .getLatestDataFilesInRange(Lists.newArrayList(commitTime2, commitTime3))
.collect(Collectors.toList()); .collect(Collectors.toList());
assertEquals(statuses1.size(), 2); assertEquals(3, dataFiles.size());
Set<String> filenames = Sets.newHashSet(); Set<String> filenames = Sets.newHashSet();
for (HoodieDataFile status : statuses1) { for (HoodieDataFile status : dataFiles) {
filenames.add(status.getFileName()); filenames.add(status.getFileName());
} }
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3)));
List<FileSlice> slices = rtView
.getLatestFileSliceInRange(Lists.newArrayList(commitTime3, commitTime4))
.collect(Collectors.toList());
assertEquals(3, slices.size());
for (FileSlice slice: slices) {
if (slice.getFileId().equals(fileId1)) {
assertEquals(slice.getBaseCommitTime(), commitTime3);
assertTrue(slice.getDataFile().isPresent());
assertEquals(slice.getLogFiles().count(), 0);
} else if (slice.getFileId().equals(fileId2)) {
assertEquals(slice.getBaseCommitTime(), commitTime4);
assertFalse(slice.getDataFile().isPresent());
assertEquals(slice.getLogFiles().count(), 1);
} else if (slice.getFileId().equals(fileId3)) {
assertEquals(slice.getBaseCommitTime(), commitTime4);
assertTrue(slice.getDataFile().isPresent());
assertEquals(slice.getLogFiles().count(), 0);
}
}
} }
@Test @Test
@@ -342,20 +428,19 @@ public class HoodieTableFileSystemViewTest {
// Now we list the entire partition // Now we list the entire partition
FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath));
assertEquals(statuses.length, 7); assertEquals(7, statuses.length);
refreshFsView(null); refreshFsView(null);
List<HoodieDataFile> statuses1 = List<HoodieDataFile> dataFiles =
fsView.getLatestDataFilesBeforeOrOn(partitionPath, commitTime2) roView.getLatestDataFilesBeforeOrOn(partitionPath, commitTime2)
.collect(Collectors.toList()); .collect(Collectors.toList());
assertEquals(statuses1.size(), 2); assertEquals(2, dataFiles.size());
Set<String> filenames = Sets.newHashSet(); Set<String> filenames = Sets.newHashSet();
for (HoodieDataFile status : statuses1) { for (HoodieDataFile status : dataFiles) {
filenames.add(status.getFileName()); filenames.add(status.getFileName());
} }
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1)));
assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime2, 1, fileId2))); assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime2, 1, fileId2)));
} }
@Test @Test
@@ -374,19 +459,28 @@ public class HoodieTableFileSystemViewTest {
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0))
.createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))
.createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0))
.createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3))
.createNewFile(); .createNewFile();
new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)) new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3))
.createNewFile(); .createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile(); new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile();
@@ -394,12 +488,35 @@ public class HoodieTableFileSystemViewTest {
// Now we list the entire partition // Now we list the entire partition
FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath)); FileStatus[] statuses = HoodieTestUtils.fs.listStatus(new Path(fullPartitionPath));
assertEquals(statuses.length, 7); assertEquals(10, statuses.length);
refreshFsView(statuses); refreshFsView(statuses);
List<HoodieFileGroup> fileGroups = fsView
.getAllFileGroups(partitionPath)
.collect(Collectors.toList());
assertEquals(3, fileGroups.size());
for (HoodieFileGroup fileGroup: fileGroups) {
List<FileSlice> slices = fileGroup.getAllFileSlices().collect(Collectors.toList());
if (fileGroup.getId().equals(fileId1)) {
assertEquals(2, slices.size());
assertEquals(commitTime4, slices.get(0).getBaseCommitTime());
assertEquals(commitTime1, slices.get(1).getBaseCommitTime());
} else if (fileGroup.getId().equals(fileId2)) {
assertEquals(3, slices.size());
assertEquals(commitTime3, slices.get(0).getBaseCommitTime());
assertEquals(commitTime2, slices.get(1).getBaseCommitTime());
assertEquals(commitTime1, slices.get(2).getBaseCommitTime());
} else if (fileGroup.getId().equals(fileId3)) {
assertEquals(2, slices.size());
assertEquals(commitTime4, slices.get(0).getBaseCommitTime());
assertEquals(commitTime3, slices.get(1).getBaseCommitTime());
}
}
List<HoodieDataFile> statuses1 = List<HoodieDataFile> statuses1 =
fsView.getLatestDataFiles().collect(Collectors.toList()); roView.getLatestDataFiles().collect(Collectors.toList());
assertEquals(statuses1.size(), 3); assertEquals(3, statuses1.size());
Set<String> filenames = Sets.newHashSet(); Set<String> filenames = Sets.newHashSet();
for (HoodieDataFile status : statuses1) { for (HoodieDataFile status : statuses1) {
filenames.add(status.getFileName()); filenames.add(status.getFileName());

View File

@@ -24,6 +24,7 @@ import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.InvalidDatasetException; import com.uber.hoodie.exception.InvalidDatasetException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@@ -101,7 +102,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat
String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName); String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
// Get all commits, delta commits, compactions, as all of them produce a base parquet file today // Get all commits, delta commits, compactions, as all of them produce a base parquet file today
HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants();
TableFileSystemView fsView = new HoodieTableFileSystemView(metadata, timeline, statuses); TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metadata, timeline, statuses);
if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) { if (HoodieHiveUtil.INCREMENTAL_SCAN_MODE.equals(mode)) {
// this is of the form commitTs_partition_sequenceNumber // this is of the form commitTs_partition_sequenceNumber
@@ -112,25 +113,25 @@ public class HoodieInputFormat extends MapredParquetInputFormat
List<String> commitsToReturn = List<String> commitsToReturn =
timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants() timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()); .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
List<HoodieDataFile> filteredFiles = fsView List<HoodieDataFile> filteredFiles = roView
.getLatestDataFilesInRange(commitsToReturn) .getLatestDataFilesInRange(commitsToReturn)
.collect(Collectors.toList()); .collect(Collectors.toList());
for (HoodieDataFile filteredFile : filteredFiles) { for (HoodieDataFile filteredFile : filteredFiles) {
LOG.info("Processing incremental hoodie file - " + filteredFile.getPath()); LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
filteredFile = checkFileStatus(fsView, filteredFile); filteredFile = checkFileStatus(filteredFile);
returns.add(filteredFile.getFileStatus()); returns.add(filteredFile.getFileStatus());
} }
LOG.info( LOG.info(
"Total paths to process after hoodie incremental filter " + filteredFiles.size()); "Total paths to process after hoodie incremental filter " + filteredFiles.size());
} else { } else {
// filter files on the latest commit found // filter files on the latest commit found
List<HoodieDataFile> filteredFiles = fsView.getLatestDataFiles().collect(Collectors.toList()); List<HoodieDataFile> filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList());
LOG.info("Total paths to process after hoodie filter " + filteredFiles.size()); LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
for (HoodieDataFile filteredFile : filteredFiles) { for (HoodieDataFile filteredFile : filteredFiles) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Processing latest hoodie file - " + filteredFile.getPath()); LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
} }
filteredFile = checkFileStatus(fsView, filteredFile); filteredFile = checkFileStatus(filteredFile);
returns.add(filteredFile.getFileStatus()); returns.add(filteredFile.getFileStatus());
} }
} }
@@ -140,21 +141,23 @@ public class HoodieInputFormat extends MapredParquetInputFormat
} }
/** /**
* Checks the file status for a race condition which can set the file size to 0. * Checks the file status for a race condition which can set the file size to 0. 1.
* 1. HiveInputFormat does super.listStatus() and gets back a FileStatus[] * HiveInputFormat does super.listStatus() and gets back a FileStatus[] 2. Then it creates the
* 2. Then it creates the HoodieTableMetaClient for the paths listed. * HoodieTableMetaClient for the paths listed. 3. Generation of splits looks at FileStatus size
* 3. Generation of splits looks at FileStatus size to create splits, which skips this file * to create splits, which skips this file
*
* @param fsView
* @param dataFile
* @return
*/ */
private HoodieDataFile checkFileStatus(TableFileSystemView fsView, HoodieDataFile dataFile) { private HoodieDataFile checkFileStatus(HoodieDataFile dataFile) throws IOException {
if(dataFile.getFileSize() == 0) { Path dataPath = dataFile.getFileStatus().getPath();
LOG.info("Refreshing file status " + dataFile.getPath()); try {
return new HoodieDataFile(fsView.getFileStatus(dataFile.getPath())); if (dataFile.getFileSize() == 0) {
FileSystem fs = dataPath.getFileSystem(conf);
LOG.info("Refreshing file status " + dataFile.getPath());
return new HoodieDataFile(fs.getFileStatus(dataPath));
}
return dataFile;
} catch (IOException e) {
throw new HoodieIOException("Could not get FileStatus on path " + dataPath);
} }
return dataFile;
} }
private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatus(FileStatus[] fileStatuses) private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatus(FileStatus[] fileStatuses)

View File

@@ -69,7 +69,7 @@ public class HoodieSnapshotCopier implements Serializable {
public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir, final boolean shouldAssumeDatePartitioning) throws IOException { public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir, final boolean shouldAssumeDatePartitioning) throws IOException {
FileSystem fs = FSUtils.getFs(); FileSystem fs = FSUtils.getFs();
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir); final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir);
final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata, final TableFileSystemView.ReadOptimizedView fsView = new HoodieTableFileSystemView(tableMetadata,
tableMetadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants()); tableMetadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants());
// Get the latest commit // Get the latest commit
Optional<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline() Optional<HoodieInstant> latestCommit = tableMetadata.getActiveTimeline()