1
0

Merge and pull master commits

This commit is contained in:
Prasanna Rajaperumal
2017-02-21 17:53:28 -08:00
parent eb46e7c72b
commit 1132f3533d
9 changed files with 221 additions and 125 deletions

View File

@@ -34,7 +34,7 @@ import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieInsertException;
import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.InsertMapFunction;
import com.uber.hoodie.func.BulkInsertMapFunction;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.HoodieCleaner;
import com.uber.hoodie.io.HoodieCommitArchiveLog;
@@ -158,59 +158,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, table);
// Cache the tagged records, so we don't end up computing both
taggedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
WorkloadProfile profile = null;
if (table.isWorkloadProfileNeeded()) {
profile = new WorkloadProfile<>(taggedRecords);
logger.info("Workload profile :" + profile);
}
// obtain the upsert partitioner, and the run the tagger records through that & get a partitioned RDD.
final Partitioner upsertPartitioner = table.getUpsertPartitioner(profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = taggedRecords.mapToPair(
new PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>() {
@Override
public Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> call(
HoodieRecord<T> record) throws Exception {
return new Tuple2<>(new Tuple2<>(record.getKey(),
Option.apply(record.getCurrentLocation())), record);
}
}).partitionBy(upsertPartitioner).map(
new Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(
Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> tuple)
throws Exception {
return tuple._2();
}
});
// Perform the actual writing.
JavaRDD<WriteStatus> upsertStatusRDD = partitionedRecords.mapPartitionsWithIndex(
new Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>>() {
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
return table.handleUpsertPartition(commitTime, partition, recordItr,
upsertPartitioner);
}
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses;
}
});
// Update the index back.
JavaRDD<WriteStatus> resultRDD = index.updateLocation(upsertStatusRDD, table);
resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel());
commitOnAutoCommit(commitTime, resultRDD);
return resultRDD;
return upsertRecordsInternal(taggedRecords, commitTime, table, true);
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
@@ -219,36 +167,51 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}
private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD) {
if(config.shouldAutoCommit()) {
logger.info("Auto commit enabled: Committing " + commitTime);
boolean commitResult = commit(commitTime, resultRDD);
if (!commitResult) {
throw new HoodieCommitException("Failed to commit " + commitTime);
}
} else {
logger.info("Auto commit disabled for " + commitTime);
}
}
/**
* Inserts the given HoodieRecords, into the table. This API is intended to be used for normal
* writes.
*
* This implementation skips the index check and is able to leverage benefits such as
* small file handling/blocking alignment, as with upsert(), by profiling the workload
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
writeContext = metrics.getCommitCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records,
config.getInsertShuffleParallelism());
private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition,
JavaRDD<HoodieRecord<T>> records, int parallelism) {
if(condition) {
return deduplicateRecords(records, parallelism);
return upsertRecordsInternal(dedupedRecords, commitTime, table, false);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e);
}
return records;
}
/**
* Loads the given HoodieRecords, as inserts into the table.
* (This implementation uses sortBy and attempts to control the numbers of files with less memory)
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
* loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
* Hoodie).
*
* @param records HoodieRecords to insert
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and
* attempts to control the numbers of files with less memory compared to the {@link
* HoodieWriteClient#insert(JavaRDD, String)}
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
writeContext = metrics.getCommitCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable
@@ -273,28 +236,124 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}, true, config.getInsertShuffleParallelism());
JavaRDD<WriteStatus> writeStatusRDD = sortedRecords
.mapPartitionsWithIndex(new InsertMapFunction<T>(commitTime, config, table),
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table),
true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
public Iterator<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses;
return writeStatuses.iterator();
}
});
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, table);
// Trigger the insert and collect statuses
statuses = statuses.persist(config.getWriteStatusStorageLevel());
commitOnAutoCommit(commitTime, statuses);
return statuses;
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e);
throw new HoodieInsertException("Failed to bulk insert for commit time " + commitTime, e);
}
}
private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD) {
if(config.shouldAutoCommit()) {
logger.info("Auto commit enabled: Committing " + commitTime);
boolean commitResult = commit(commitTime, resultRDD);
if (!commitResult) {
throw new HoodieCommitException("Failed to commit " + commitTime);
}
} else {
logger.info("Auto commit disabled for " + commitTime);
}
}
private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition,
JavaRDD<HoodieRecord<T>> records, int parallelism) {
if(condition) {
return deduplicateRecords(records, parallelism);
}
return records;
}
private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords,
String commitTime,
HoodieTable<T> hoodieTable,
final boolean isUpsert) {
// Cache the tagged records, so we don't end up computing both
preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
WorkloadProfile profile = null;
if (hoodieTable.isWorkloadProfileNeeded()) {
profile = new WorkloadProfile(preppedRecords);
logger.info("Workload profile :" + profile);
}
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert, profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords, partitioner);
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex(
new Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>>() {
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
if (isUpsert) {
return hoodieTable
.handleUpsertPartition(commitTime, partition, recordItr, partitioner);
} else {
return hoodieTable
.handleInsertPartition(commitTime, partition, recordItr, partitioner);
}
}
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterator<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses.iterator();
}
});
return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime);
}
private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) {
if (isUpsert) {
return table.getUpsertPartitioner(profile);
} else {
return table.getInsertPartitioner(profile);
}
}
private JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTable<T> table,
String commitTime) {
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, table);
// Trigger the insert and collect statuses
statuses = statuses.persist(config.getWriteStatusStorageLevel());
commitOnAutoCommit(commitTime, statuses);
return statuses;
}
private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
return dedupedRecords.mapToPair(
new PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>() {
@Override
public Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> call(
HoodieRecord<T> record) throws Exception {
return new Tuple2<>(new Tuple2<>(record.getKey(),
Option.apply(record.getCurrentLocation())), record);
}
}).partitionBy(partitioner).map(
new Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(
Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> tuple)
throws Exception {
return tuple._2();
}
});
}
/**
* Commit changes performed at the given commitTime marker
*/

View File

@@ -31,14 +31,14 @@ import java.util.List;
/**
* Map function that handles a sorted stream of HoodieRecords
*/
public class InsertMapFunction<T extends HoodieRecordPayload>
public class BulkInsertMapFunction<T extends HoodieRecordPayload>
implements Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>> {
private String commitTime;
private HoodieWriteConfig config;
private HoodieTable<T> hoodieTable;
public InsertMapFunction(String commitTime, HoodieWriteConfig config,
public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config,
HoodieTable<T> hoodieTable) {
this.commitTime = commitTime;
this.config = config;

View File

@@ -42,6 +42,7 @@ import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -114,10 +115,10 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
});
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair(
new PairFunction<Tuple2<String, Tuple2<HoodieKey, Optional<String>>>, HoodieKey, Optional<String>>() {
new PairFunction<Tuple2<String, Tuple2<HoodieKey, org.apache.spark.api.java.Optional<String>>>, HoodieKey, Optional<String>>() {
@Override
public Tuple2<HoodieKey, Optional<String>> call(
Tuple2<String, Tuple2<HoodieKey, Optional<String>>> keyPathTuple)
Tuple2<String, Tuple2<HoodieKey, org.apache.spark.api.java.Optional<String>>> keyPathTuple)
throws Exception {
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
@@ -145,13 +146,13 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
private JavaPairRDD<String, String> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final HoodieTable<T> hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Object> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
JavaPairRDD<String, String> partitionFilePairRDD =
loadInvolvedFiles(affectedPartitionPathList, hoodieTable);
Map<String, Object> filesPerPartition = partitionFilePairRDD.countByKey();
Map<String, Long> filesPerPartition = partitionFilePairRDD.countByKey();
// Compute total subpartitions, to split partitions into.
Map<String, Long> subpartitionCountMap =
@@ -173,7 +174,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
* @param filesPerPartition
* @return
*/
private Map<String, Long> computeSubPartitions(Map<String, Object> recordsPerPartition, Map<String, Object> filesPerPartition) {
private Map<String, Long> computeSubPartitions(Map<String, Long> recordsPerPartition, Map<String, Long> filesPerPartition) {
Map<String, Long> subpartitionCountMap = new HashMap<>();
long totalRecords = 0;
long totalFiles = 0;
@@ -214,7 +215,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
.flatMapToPair(new PairFlatMapFunction<String, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(String partitionPath) {
public Iterator<Tuple2<String, String>> call(String partitionPath) {
java.util.Optional<HoodieInstant> latestCommitTime =
hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant();
List<Tuple2<String, String>> list = new ArrayList<>();
@@ -226,7 +227,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
list.add(new Tuple2<>(partitionPath, file.getFileName()));
}
}
return list;
return list.iterator();
}
});
}
@@ -266,8 +267,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
})
.flatMapToPair(new PairFlatMapFunction<List<Tuple2<String, String>>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(List<Tuple2<String, String>> exploded) throws Exception {
return exploded;
public Iterator<Tuple2<String, String>> call(List<Tuple2<String, String>> exploded) throws Exception {
return exploded.iterator();
}
});
@@ -367,9 +368,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true)
.flatMap(new FlatMapFunction<List<IndexLookupResult>, IndexLookupResult>() {
@Override
public Iterable<IndexLookupResult> call(List<IndexLookupResult> indexLookupResults)
public Iterator<IndexLookupResult> call(List<IndexLookupResult> indexLookupResults)
throws Exception {
return indexLookupResults;
return indexLookupResults.iterator();
}
}).filter(new Function<IndexLookupResult, Boolean>() {
@Override
@@ -378,13 +379,13 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
}
}).flatMapToPair(new PairFlatMapFunction<IndexLookupResult, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(IndexLookupResult lookupResult)
public Iterator<Tuple2<String, String>> call(IndexLookupResult lookupResult)
throws Exception {
List<Tuple2<String, String>> vals = new ArrayList<>();
for (String recordKey : lookupResult.getMatchingRecordKeys()) {
vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
}
return vals;
return vals.iterator();
}
});
}
@@ -404,9 +405,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(
new Function<Tuple2<HoodieRecord<T>, Optional<String>>, HoodieRecord<T>>() {
new Function<Tuple2<HoodieRecord<T>, org.apache.spark.api.java.Optional<String>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(Tuple2<HoodieRecord<T>, Optional<String>> v1) throws Exception {
public HoodieRecord<T> call(Tuple2<HoodieRecord<T>, org.apache.spark.api.java.Optional<String>> v1) throws Exception {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
String filename = v1._2().get();

View File

@@ -50,6 +50,8 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.*;
/**
* HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage.
* Computes all possible compactions, passes it through a CompactionFilter and executes
@@ -75,13 +77,11 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
List<CompactionOperation> operations =
jsc.parallelize(partitionPaths, partitionPaths.size())
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> {
return hoodieTable.getFileSystemView()
.groupLatestDataFileWithLogFiles(partitionPath).entrySet()
.stream()
.map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue()))
.collect(Collectors.toList());
}).collect();
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> hoodieTable.getFileSystemView()
.groupLatestDataFileWithLogFiles(partitionPath).entrySet()
.stream()
.map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue()))
.collect(toList()).iterator()).collect();
log.info("Total of " + operations.size() + " compactions are retrieved");
// Filter the compactions with the passed in filter. This lets us choose most effective compactions only
@@ -98,7 +98,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
metaClient, config, compactionOperation, compactionCommit)).flatMap(
(FlatMapFunction<Iterator<List<WriteStatus>>, WriteStatus>) listIterator -> {
List<List<WriteStatus>> collected = IteratorUtils.toList(listIterator);
return collected.stream().flatMap(List::stream).collect(Collectors.toList());
return collected.stream().flatMap(List::stream).collect(toList()).iterator();
}).mapToPair(new PairFunction<WriteStatus, String, HoodieWriteStat>() {
@Override
public Tuple2<String, HoodieWriteStat> call(WriteStatus writeStatus)

View File

@@ -386,7 +386,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
return null;
return getUpsertPartitioner(profile);
}
@Override
@@ -463,4 +463,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieUpsertException(msg, t);
}
}
@Override
public Iterator<List<WriteStatus>> handleInsertPartition(String commitTime, Integer partition,
Iterator recordItr,
Partitioner partitioner) {
return handleUpsertPartition(commitTime, partition, recordItr, partitioner);
}
}

View File

@@ -202,6 +202,16 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
public abstract Iterator<List<WriteStatus>> handleUpsertPartition(String commitTime,
Integer partition, Iterator<HoodieRecord<T>> recordIterator, Partitioner partitioner);
/**
* Perform the ultimate IO for a given inserted (RDD) partition
*
* @param partition
* @param recordIterator
* @param partitioner
*/
public abstract Iterator<List<WriteStatus>> handleInsertPartition(String commitTime,
Integer partition, Iterator<HoodieRecord<T>> recordIterator, Partitioner partitioner);
public static <T extends HoodieRecordPayload> HoodieTable<T> getHoodieTable(
HoodieTableMetaClient metaClient, HoodieWriteConfig config) {

View File

@@ -42,6 +42,7 @@ import com.uber.hoodie.io.HoodieCleaner;
import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
@@ -65,6 +66,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -743,15 +745,18 @@ public class TestHoodieClient implements Serializable {
FileSystem fs = FSUtils.getFs();
HoodieTableMetadata metadata = new HoodieTableMetadata(fs, basePath);
FileStatus[] files = metadata.getLatestVersionInPartition(fs, TEST_PARTITION_PATH, commitTime3);
assertEquals("Total of 2 valid data files", 2, files.length);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config);
List<HoodieDataFile> files =
table.getFileSystemView().getLatestVersionInPartition(TEST_PARTITION_PATH, commitTime3)
.collect(Collectors.toList());
assertEquals("Total of 2 valid data files", 2, files.size());
int totalInserts = 0;
for (FileStatus file: files) {
assertEquals("All files must be at commit 3", commitTime3, FSUtils.getCommitTime(file.getPath().getName()));
records = ParquetUtils.readAvroRecords(file.getPath());
for (HoodieDataFile file: files) {
assertEquals("All files must be at commit 3", commitTime3, file.getCommitTime());
records = ParquetUtils.readAvroRecords(new Path(file.getPath()));
totalInserts += records.size();
}
assertEquals("Total number of records must add up", totalInserts, inserts1.size() + inserts2.size() + insert3.size());

View File

@@ -15,7 +15,10 @@
*/
package com.uber.hoodie.hadoop;
import com.uber.hoodie.common.model.HoodieTableMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.InvalidDatasetException;
@@ -30,6 +33,9 @@ import org.apache.hadoop.fs.PathFilter;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Given a path is a part of
@@ -114,16 +120,22 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
Path baseDir = safeGetParentsParent(folder);
if (baseDir != null) {
try {
HoodieTableMetadata metadata = new HoodieTableMetadata(fs, baseDir.toString());
FileStatus[] latestFiles = metadata.getLatestVersions(fs.listStatus(folder));
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(fs, baseDir.toString());
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants());
List<HoodieDataFile>
latestFiles = fsView.getLatestVersions(fs.listStatus(folder)).collect(
Collectors.toList());
// populate the cache
if (!hoodiePathCache.containsKey(folder.toString())) {
hoodiePathCache.put(folder.toString(), new HashSet<Path>());
}
LOG.info("Based on hoodie metadata from base path: " + baseDir.toString() +
", caching " + latestFiles.length+" files under "+ folder);
for (FileStatus lfile: latestFiles) {
hoodiePathCache.get(folder.toString()).add(lfile.getPath());
", caching " + latestFiles.size() + " files under "+ folder);
for (HoodieDataFile lfile: latestFiles) {
hoodiePathCache.get(folder.toString()).add(new Path(lfile.getPath()));
}
// accept the path, if its among the latest files.
@@ -133,7 +145,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
hoodiePathCache.get(folder.toString()).contains(path)));
}
return hoodiePathCache.get(folder.toString()).contains(path);
} catch (InvalidDatasetException e) {
} catch (DatasetNotFoundException e) {
// Non-hoodie path, accept it.
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("(1) Caching non-hoodie path under %s \n",

View File

@@ -17,6 +17,7 @@ package com.uber.hoodie.hadoop;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -35,7 +36,8 @@ public class TestHoodieROTablePathFilter {
@Test
public void testHoodiePaths() throws IOException {
// Create a temp folder as the base path
String basePath = HoodieTestUtils.initializeTempHoodieBasePath();
HoodieTableMetaClient metaClient = HoodieTestUtils.initOnTemp();
String basePath = metaClient.getBasePath();
HoodieTestUtils.createCommitFiles(basePath, "001", "002");
HoodieTestUtils.createInflightCommitFiles(basePath, "003");