From 1132f3533d0bf76c323a1728ef35301e3b55e38e Mon Sep 17 00:00:00 2001 From: Prasanna Rajaperumal Date: Tue, 21 Feb 2017 17:53:28 -0800 Subject: [PATCH] Merge and pull master commits --- .../com/uber/hoodie/HoodieWriteClient.java | 229 +++++++++++------- .../hoodie/func/BulkInsertMapFunction.java | 4 +- .../uber/hoodie/index/HoodieBloomIndex.java | 31 +-- .../compact/HoodieRealtimeTableCompactor.java | 16 +- .../hoodie/table/HoodieCopyOnWriteTable.java | 9 +- .../com/uber/hoodie/table/HoodieTable.java | 10 + .../com/uber/hoodie/TestHoodieClient.java | 17 +- .../hadoop/HoodieROTablePathFilter.java | 26 +- .../hadoop/TestHoodieROTablePathFilter.java | 4 +- 9 files changed, 221 insertions(+), 125 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 570292b0c..814030dec 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -34,7 +34,7 @@ import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieUpsertException; -import com.uber.hoodie.func.InsertMapFunction; +import com.uber.hoodie.func.BulkInsertMapFunction; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.HoodieCleaner; import com.uber.hoodie.io.HoodieCommitArchiveLog; @@ -158,59 +158,7 @@ public class HoodieWriteClient implements Seriali // perform index loop up to get existing location of records JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, table); - - // Cache the tagged records, so we don't end up computing both - taggedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); - - WorkloadProfile profile = null; - if (table.isWorkloadProfileNeeded()) { - profile = new WorkloadProfile<>(taggedRecords); - logger.info("Workload profile :" + profile); - } - - // obtain the upsert partitioner, and the run the tagger records through that & get a partitioned RDD. - final Partitioner upsertPartitioner = table.getUpsertPartitioner(profile); - JavaRDD> partitionedRecords = taggedRecords.mapToPair( - new PairFunction, Tuple2>, HoodieRecord>() { - @Override - public Tuple2>, HoodieRecord> call( - HoodieRecord record) throws Exception { - return new Tuple2<>(new Tuple2<>(record.getKey(), - Option.apply(record.getCurrentLocation())), record); - } - }).partitionBy(upsertPartitioner).map( - new Function>, HoodieRecord>, HoodieRecord>() { - @Override - public HoodieRecord call( - Tuple2>, HoodieRecord> tuple) - throws Exception { - return tuple._2(); - } - }); - - - // Perform the actual writing. - JavaRDD upsertStatusRDD = partitionedRecords.mapPartitionsWithIndex( - new Function2>, Iterator>>() { - @Override - public Iterator> call(Integer partition, - Iterator> recordItr) throws Exception { - return table.handleUpsertPartition(commitTime, partition, recordItr, - upsertPartitioner); - } - }, true).flatMap(new FlatMapFunction, WriteStatus>() { - @Override - public Iterable call(List writeStatuses) - throws Exception { - return writeStatuses; - } - }); - - // Update the index back. - JavaRDD resultRDD = index.updateLocation(upsertStatusRDD, table); - resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel()); - commitOnAutoCommit(commitTime, resultRDD); - return resultRDD; + return upsertRecordsInternal(taggedRecords, commitTime, table, true); } catch (Throwable e) { if (e instanceof HoodieUpsertException) { throw (HoodieUpsertException) e; @@ -219,36 +167,51 @@ public class HoodieWriteClient implements Seriali } } - private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD) { - if(config.shouldAutoCommit()) { - logger.info("Auto commit enabled: Committing " + commitTime); - boolean commitResult = commit(commitTime, resultRDD); - if (!commitResult) { - throw new HoodieCommitException("Failed to commit " + commitTime); - } - } else { - logger.info("Auto commit disabled for " + commitTime); - } - } + /** + * Inserts the given HoodieRecords, into the table. This API is intended to be used for normal + * writes. + * + * This implementation skips the index check and is able to leverage benefits such as + * small file handling/blocking alignment, as with upsert(), by profiling the workload + * + * @param records HoodieRecords to insert + * @param commitTime Commit Time handle + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts + */ + public JavaRDD insert(JavaRDD> records, final String commitTime) { + writeContext = metrics.getCommitCtx(); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + try { + // De-dupe/merge if needed + JavaRDD> dedupedRecords = + combineOnCondition(config.shouldCombineBeforeInsert(), records, + config.getInsertShuffleParallelism()); - private JavaRDD> combineOnCondition(boolean condition, - JavaRDD> records, int parallelism) { - if(condition) { - return deduplicateRecords(records, parallelism); + return upsertRecordsInternal(dedupedRecords, commitTime, table, false); + } catch (Throwable e) { + if (e instanceof HoodieInsertException) { + throw e; + } + throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e); } - return records; } /** - * Loads the given HoodieRecords, as inserts into the table. - * (This implementation uses sortBy and attempts to control the numbers of files with less memory) + * Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk + * loads into a Hoodie table for the very first time (e.g: converting an existing dataset to + * Hoodie). * - * @param records HoodieRecords to insert + * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and + * attempts to control the numbers of files with less memory compared to the {@link + * HoodieWriteClient#insert(JavaRDD, String)} + * + * @param records HoodieRecords to insert * @param commitTime Commit Time handle * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts - * */ - public JavaRDD insert(JavaRDD> records, final String commitTime) { + public JavaRDD bulkInsert(JavaRDD> records, final String commitTime) { writeContext = metrics.getCommitCtx(); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable @@ -273,28 +236,124 @@ public class HoodieWriteClient implements Seriali } }, true, config.getInsertShuffleParallelism()); JavaRDD writeStatusRDD = sortedRecords - .mapPartitionsWithIndex(new InsertMapFunction(commitTime, config, table), + .mapPartitionsWithIndex(new BulkInsertMapFunction(commitTime, config, table), true).flatMap(new FlatMapFunction, WriteStatus>() { @Override - public Iterable call(List writeStatuses) + public Iterator call(List writeStatuses) throws Exception { - return writeStatuses; + return writeStatuses.iterator(); } }); - // Update the index back - JavaRDD statuses = index.updateLocation(writeStatusRDD, table); - // Trigger the insert and collect statuses - statuses = statuses.persist(config.getWriteStatusStorageLevel()); - commitOnAutoCommit(commitTime, statuses); - return statuses; + + return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime); } catch (Throwable e) { if (e instanceof HoodieInsertException) { throw e; } - throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e); + throw new HoodieInsertException("Failed to bulk insert for commit time " + commitTime, e); } } + private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD) { + if(config.shouldAutoCommit()) { + logger.info("Auto commit enabled: Committing " + commitTime); + boolean commitResult = commit(commitTime, resultRDD); + if (!commitResult) { + throw new HoodieCommitException("Failed to commit " + commitTime); + } + } else { + logger.info("Auto commit disabled for " + commitTime); + } + } + + private JavaRDD> combineOnCondition(boolean condition, + JavaRDD> records, int parallelism) { + if(condition) { + return deduplicateRecords(records, parallelism); + } + return records; + } + + private JavaRDD upsertRecordsInternal(JavaRDD> preppedRecords, + String commitTime, + HoodieTable hoodieTable, + final boolean isUpsert) { + + // Cache the tagged records, so we don't end up computing both + preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); + + WorkloadProfile profile = null; + if (hoodieTable.isWorkloadProfileNeeded()) { + profile = new WorkloadProfile(preppedRecords); + logger.info("Workload profile :" + profile); + } + + // partition using the insert partitioner + final Partitioner partitioner = getPartitioner(hoodieTable, isUpsert, profile); + JavaRDD> partitionedRecords = partition(preppedRecords, partitioner); + JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex( + new Function2>, Iterator>>() { + @Override + public Iterator> call(Integer partition, + Iterator> recordItr) throws Exception { + if (isUpsert) { + return hoodieTable + .handleUpsertPartition(commitTime, partition, recordItr, partitioner); + } else { + return hoodieTable + .handleInsertPartition(commitTime, partition, recordItr, partitioner); + } + } + }, true).flatMap(new FlatMapFunction, WriteStatus>() { + @Override + public Iterator call(List writeStatuses) + throws Exception { + return writeStatuses.iterator(); + } + }); + + return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime); + } + + private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) { + if (isUpsert) { + return table.getUpsertPartitioner(profile); + } else { + return table.getInsertPartitioner(profile); + } + } + + private JavaRDD updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, + HoodieTable table, + String commitTime) { + // Update the index back + JavaRDD statuses = index.updateLocation(writeStatusRDD, table); + // Trigger the insert and collect statuses + statuses = statuses.persist(config.getWriteStatusStorageLevel()); + commitOnAutoCommit(commitTime, statuses); + return statuses; + } + + private JavaRDD> partition(JavaRDD> dedupedRecords, Partitioner partitioner) { + return dedupedRecords.mapToPair( + new PairFunction, Tuple2>, HoodieRecord>() { + @Override + public Tuple2>, HoodieRecord> call( + HoodieRecord record) throws Exception { + return new Tuple2<>(new Tuple2<>(record.getKey(), + Option.apply(record.getCurrentLocation())), record); + } + }).partitionBy(partitioner).map( + new Function>, HoodieRecord>, HoodieRecord>() { + @Override + public HoodieRecord call( + Tuple2>, HoodieRecord> tuple) + throws Exception { + return tuple._2(); + } + }); + } + /** * Commit changes performed at the given commitTime marker */ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java index 74478450d..ae130a62d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java @@ -31,14 +31,14 @@ import java.util.List; /** * Map function that handles a sorted stream of HoodieRecords */ -public class InsertMapFunction +public class BulkInsertMapFunction implements Function2>, Iterator>> { private String commitTime; private HoodieWriteConfig config; private HoodieTable hoodieTable; - public InsertMapFunction(String commitTime, HoodieWriteConfig config, + public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config, HoodieTable hoodieTable) { this.commitTime = commitTime; this.config = config; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java index a36ab999d..96d3270fe 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java @@ -42,6 +42,7 @@ import scala.Tuple2; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -114,10 +115,10 @@ public class HoodieBloomIndex extends HoodieIndex }); return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).mapToPair( - new PairFunction>>, HoodieKey, Optional>() { + new PairFunction>>, HoodieKey, Optional>() { @Override public Tuple2> call( - Tuple2>> keyPathTuple) + Tuple2>> keyPathTuple) throws Exception { Optional recordLocationPath; if (keyPathTuple._2._2.isPresent()) { @@ -145,13 +146,13 @@ public class HoodieBloomIndex extends HoodieIndex private JavaPairRDD lookupIndex( JavaPairRDD partitionRecordKeyPairRDD, final HoodieTable hoodieTable) { // Obtain records per partition, in the incoming records - Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); + Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // Step 2: Load all involved files as pairs JavaPairRDD partitionFilePairRDD = loadInvolvedFiles(affectedPartitionPathList, hoodieTable); - Map filesPerPartition = partitionFilePairRDD.countByKey(); + Map filesPerPartition = partitionFilePairRDD.countByKey(); // Compute total subpartitions, to split partitions into. Map subpartitionCountMap = @@ -173,7 +174,7 @@ public class HoodieBloomIndex extends HoodieIndex * @param filesPerPartition * @return */ - private Map computeSubPartitions(Map recordsPerPartition, Map filesPerPartition) { + private Map computeSubPartitions(Map recordsPerPartition, Map filesPerPartition) { Map subpartitionCountMap = new HashMap<>(); long totalRecords = 0; long totalFiles = 0; @@ -214,7 +215,7 @@ public class HoodieBloomIndex extends HoodieIndex return jsc.parallelize(partitions, Math.max(partitions.size(), 1)) .flatMapToPair(new PairFlatMapFunction() { @Override - public Iterable> call(String partitionPath) { + public Iterator> call(String partitionPath) { java.util.Optional latestCommitTime = hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); List> list = new ArrayList<>(); @@ -226,7 +227,7 @@ public class HoodieBloomIndex extends HoodieIndex list.add(new Tuple2<>(partitionPath, file.getFileName())); } } - return list; + return list.iterator(); } }); } @@ -266,8 +267,8 @@ public class HoodieBloomIndex extends HoodieIndex }) .flatMapToPair(new PairFlatMapFunction>, String, String>() { @Override - public Iterable> call(List> exploded) throws Exception { - return exploded; + public Iterator> call(List> exploded) throws Exception { + return exploded.iterator(); } }); @@ -367,9 +368,9 @@ public class HoodieBloomIndex extends HoodieIndex .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true) .flatMap(new FlatMapFunction, IndexLookupResult>() { @Override - public Iterable call(List indexLookupResults) + public Iterator call(List indexLookupResults) throws Exception { - return indexLookupResults; + return indexLookupResults.iterator(); } }).filter(new Function() { @Override @@ -378,13 +379,13 @@ public class HoodieBloomIndex extends HoodieIndex } }).flatMapToPair(new PairFlatMapFunction() { @Override - public Iterable> call(IndexLookupResult lookupResult) + public Iterator> call(IndexLookupResult lookupResult) throws Exception { List> vals = new ArrayList<>(); for (String recordKey : lookupResult.getMatchingRecordKeys()) { vals.add(new Tuple2<>(recordKey, lookupResult.getFileName())); } - return vals; + return vals.iterator(); } }); } @@ -404,9 +405,9 @@ public class HoodieBloomIndex extends HoodieIndex // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join. return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map( - new Function, Optional>, HoodieRecord>() { + new Function, org.apache.spark.api.java.Optional>, HoodieRecord>() { @Override - public HoodieRecord call(Tuple2, Optional> v1) throws Exception { + public HoodieRecord call(Tuple2, org.apache.spark.api.java.Optional> v1) throws Exception { HoodieRecord record = v1._1(); if (v1._2().isPresent()) { String filename = v1._2().get(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index da787c13a..052ab54f0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -50,6 +50,8 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import static java.util.stream.Collectors.*; + /** * HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. * Computes all possible compactions, passes it through a CompactionFilter and executes @@ -75,13 +77,11 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) - .flatMap((FlatMapFunction) partitionPath -> { - return hoodieTable.getFileSystemView() - .groupLatestDataFileWithLogFiles(partitionPath).entrySet() - .stream() - .map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue())) - .collect(Collectors.toList()); - }).collect(); + .flatMap((FlatMapFunction) partitionPath -> hoodieTable.getFileSystemView() + .groupLatestDataFileWithLogFiles(partitionPath).entrySet() + .stream() + .map(s -> new CompactionOperation(s.getKey(), partitionPath, s.getValue())) + .collect(toList()).iterator()).collect(); log.info("Total of " + operations.size() + " compactions are retrieved"); // Filter the compactions with the passed in filter. This lets us choose most effective compactions only @@ -98,7 +98,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { metaClient, config, compactionOperation, compactionCommit)).flatMap( (FlatMapFunction>, WriteStatus>) listIterator -> { List> collected = IteratorUtils.toList(listIterator); - return collected.stream().flatMap(List::stream).collect(Collectors.toList()); + return collected.stream().flatMap(List::stream).collect(toList()).iterator(); }).mapToPair(new PairFunction() { @Override public Tuple2 call(WriteStatus writeStatus) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index b4e10ff8e..269445b6c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -386,7 +386,7 @@ public class HoodieCopyOnWriteTable extends Hoodi @Override public Partitioner getInsertPartitioner(WorkloadProfile profile) { - return null; + return getUpsertPartitioner(profile); } @Override @@ -463,4 +463,11 @@ public class HoodieCopyOnWriteTable extends Hoodi throw new HoodieUpsertException(msg, t); } } + + @Override + public Iterator> handleInsertPartition(String commitTime, Integer partition, + Iterator recordItr, + Partitioner partitioner) { + return handleUpsertPartition(commitTime, partition, recordItr, partitioner); + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index e7ab29adf..274e0c9ab 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -202,6 +202,16 @@ public abstract class HoodieTable implements Seri public abstract Iterator> handleUpsertPartition(String commitTime, Integer partition, Iterator> recordIterator, Partitioner partitioner); + /** + * Perform the ultimate IO for a given inserted (RDD) partition + * + * @param partition + * @param recordIterator + * @param partitioner + */ + public abstract Iterator> handleInsertPartition(String commitTime, + Integer partition, Iterator> recordIterator, Partitioner partitioner); + public static HoodieTable getHoodieTable( HoodieTableMetaClient metaClient, HoodieWriteConfig config) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index ed14a2f1c..a67d4f93d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -42,6 +42,7 @@ import com.uber.hoodie.io.HoodieCleaner; import com.uber.hoodie.table.HoodieTable; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; @@ -65,6 +66,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -743,15 +745,18 @@ public class TestHoodieClient implements Serializable { FileSystem fs = FSUtils.getFs(); - HoodieTableMetadata metadata = new HoodieTableMetadata(fs, basePath); - FileStatus[] files = metadata.getLatestVersionInPartition(fs, TEST_PARTITION_PATH, commitTime3); - assertEquals("Total of 2 valid data files", 2, files.length); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); + List files = + table.getFileSystemView().getLatestVersionInPartition(TEST_PARTITION_PATH, commitTime3) + .collect(Collectors.toList()); + assertEquals("Total of 2 valid data files", 2, files.size()); int totalInserts = 0; - for (FileStatus file: files) { - assertEquals("All files must be at commit 3", commitTime3, FSUtils.getCommitTime(file.getPath().getName())); - records = ParquetUtils.readAvroRecords(file.getPath()); + for (HoodieDataFile file: files) { + assertEquals("All files must be at commit 3", commitTime3, file.getCommitTime()); + records = ParquetUtils.readAvroRecords(new Path(file.getPath())); totalInserts += records.size(); } assertEquals("Total number of records must add up", totalInserts, inserts1.size() + inserts2.size() + insert3.size()); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java index 5bf482580..00f003ff6 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java @@ -15,7 +15,10 @@ */ package com.uber.hoodie.hadoop; -import com.uber.hoodie.common.model.HoodieTableMetadata; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; +import com.uber.hoodie.exception.DatasetNotFoundException; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.InvalidDatasetException; @@ -30,6 +33,9 @@ import org.apache.hadoop.fs.PathFilter; import java.io.Serializable; import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Given a path is a part of @@ -114,16 +120,22 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { Path baseDir = safeGetParentsParent(folder); if (baseDir != null) { try { - HoodieTableMetadata metadata = new HoodieTableMetadata(fs, baseDir.toString()); - FileStatus[] latestFiles = metadata.getLatestVersions(fs.listStatus(folder)); + HoodieTableMetaClient metaClient = + new HoodieTableMetaClient(fs, baseDir.toString()); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitTimeline() + .filterCompletedInstants()); + List + latestFiles = fsView.getLatestVersions(fs.listStatus(folder)).collect( + Collectors.toList()); // populate the cache if (!hoodiePathCache.containsKey(folder.toString())) { hoodiePathCache.put(folder.toString(), new HashSet()); } LOG.info("Based on hoodie metadata from base path: " + baseDir.toString() + - ", caching " + latestFiles.length+" files under "+ folder); - for (FileStatus lfile: latestFiles) { - hoodiePathCache.get(folder.toString()).add(lfile.getPath()); + ", caching " + latestFiles.size() + " files under "+ folder); + for (HoodieDataFile lfile: latestFiles) { + hoodiePathCache.get(folder.toString()).add(new Path(lfile.getPath())); } // accept the path, if its among the latest files. @@ -133,7 +145,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable { hoodiePathCache.get(folder.toString()).contains(path))); } return hoodiePathCache.get(folder.toString()).contains(path); - } catch (InvalidDatasetException e) { + } catch (DatasetNotFoundException e) { // Non-hoodie path, accept it. if (LOG.isDebugEnabled()) { LOG.debug(String.format("(1) Caching non-hoodie path under %s \n", diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestHoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestHoodieROTablePathFilter.java index a6ce1c39a..52fff93c5 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestHoodieROTablePathFilter.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/TestHoodieROTablePathFilter.java @@ -17,6 +17,7 @@ package com.uber.hoodie.hadoop; import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.HoodieTableMetaClient; import org.apache.hadoop.fs.Path; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -35,7 +36,8 @@ public class TestHoodieROTablePathFilter { @Test public void testHoodiePaths() throws IOException { // Create a temp folder as the base path - String basePath = HoodieTestUtils.initializeTempHoodieBasePath(); + HoodieTableMetaClient metaClient = HoodieTestUtils.initOnTemp(); + String basePath = metaClient.getBasePath(); HoodieTestUtils.createCommitFiles(basePath, "001", "002"); HoodieTestUtils.createInflightCommitFiles(basePath, "003");