diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java index 5bc47e2fa..8ce72c343 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java @@ -19,12 +19,14 @@ package com.uber.hoodie.index.bloom; import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hashing; +import com.uber.hoodie.common.util.collection.Pair; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -139,11 +141,10 @@ public class BucketizedBloomCheckPartitioner extends Partitioner { @Override public int getPartition(Object key) { - String[] parts = ((String) key).split("#"); - String fileName = parts[0]; - final long hashOfKey = Hashing.md5().hashString(parts[1], StandardCharsets.UTF_8).asLong(); - List candidatePartitions = fileGroupToPartitions.get(fileName); - int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size()); + final Pair parts = (Pair) key; + final long hashOfKey = Hashing.md5().hashString(parts.getRight(), StandardCharsets.UTF_8).asLong(); + final List candidatePartitions = fileGroupToPartitions.get(parts.getLeft()); + final int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size()); assert idx >= 0; return candidatePartitions.get(idx); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index b5ef59e81..8dc15c1c6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -34,6 +34,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ParquetUtils; +import com.uber.hoodie.common.util.collection.Pair; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.MetadataNotFoundException; import com.uber.hoodie.index.HoodieIndex; @@ -42,9 +43,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -171,7 +175,7 @@ public class HoodieBloomIndex extends HoodieIndex // we will just try exploding the input and then count to determine comparisons // FIX(vc): Only do sampling here and extrapolate? fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, - partitionRecordKeyPairRDD).mapToPair(t -> t._2()).countByKey(); + partitionRecordKeyPairRDD).mapToPair(t -> t).countByKey(); } else { fileToComparisons = new HashMap<>(); partitionToFileInfo.entrySet().stream().forEach(e -> { @@ -290,8 +294,6 @@ public class HoodieBloomIndex extends HoodieIndex return true; } - - /** * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be * checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files @@ -301,24 +303,21 @@ public class HoodieBloomIndex extends HoodieIndex * recordKey ranges in the index info. */ @VisibleForTesting - JavaPairRDD> explodeRecordRDDWithFileComparisons( + JavaRDD> explodeRecordRDDWithFileComparisons( final Map> partitionToFileIndexInfo, JavaPairRDD partitionRecordKeyPairRDD) { IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) : new ListBasedIndexFileFilter(partitionToFileIndexInfo); + return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> { String recordKey = partitionRecordKeyPair._2(); String partitionPath = partitionRecordKeyPair._1(); - List>> recordComparisons = new ArrayList<>(); - indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> { - recordComparisons.add( - new Tuple2<>(String.format("%s#%s", matchingFile, recordKey), - new Tuple2<>(matchingFile, - new HoodieKey(recordKey, partitionPath)))); - }); - return recordComparisons; - }).flatMapToPair(List::iterator); + + return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream() + .map(matchingFile -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, partitionPath))) + .collect(Collectors.toList()); + }).flatMap(List::iterator); } /** @@ -332,28 +331,32 @@ public class HoodieBloomIndex extends HoodieIndex final Map> partitionToFileIndexInfo, JavaPairRDD partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient, Map fileGroupToComparisons) { - JavaPairRDD> fileSortedTripletRDD = + JavaRDD> fileComparisonsRDD = explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD); + if (config.useBloomIndexBucketizedChecking()) { - BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, - fileGroupToComparisons, config.getBloomIndexKeysPerBucket()); - fileSortedTripletRDD = fileSortedTripletRDD.repartitionAndSortWithinPartitions(partitioner); + Partitioner partitioner = new BucketizedBloomCheckPartitioner( + shuffleParallelism, + fileGroupToComparisons, + config.getBloomIndexKeysPerBucket() + ); + + fileComparisonsRDD = fileComparisonsRDD + .mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t)) + .repartitionAndSortWithinPartitions(partitioner) + .map(Tuple2::_2); } else { - // sort further based on filename, such that all checking for the file can happen within - // a single partition, on-the-fly - fileSortedTripletRDD = fileSortedTripletRDD.sortByKey(true, shuffleParallelism); + fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism); } - return fileSortedTripletRDD.mapPartitionsWithIndex( - new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true) + + return fileComparisonsRDD + .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true) .flatMap(List::iterator) - .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0) - .flatMapToPair(lookupResult -> { - List> vals = new ArrayList<>(); - for (String recordKey : lookupResult.getMatchingRecordKeys()) { - vals.add(new Tuple2<>(recordKey, lookupResult.getFileName())); - } - return vals.iterator(); - }); + .filter(lr -> lr.getMatchingRecordKeys().size() > 0) + .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream() + .map(recordKey -> new Tuple2<>(recordKey, lookupResult.getFileName())) + .collect(Collectors.toList()) + .iterator()); } /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java index a313d2d91..4595658cc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java @@ -43,7 +43,7 @@ import scala.Tuple2; * actual files */ public class HoodieBloomIndexCheckFunction implements - Function2>>, + Function2>, Iterator>> { private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class); @@ -84,13 +84,13 @@ public class HoodieBloomIndexCheckFunction implements @Override public Iterator> call(Integer partition, - Iterator>> fileParitionRecordKeyTripletItr) + Iterator> fileParitionRecordKeyTripletItr) throws Exception { return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr); } class LazyKeyCheckIterator extends - LazyIterableIterator>, List> { + LazyIterableIterator, List> { private List candidateRecordKeys; @@ -103,7 +103,7 @@ public class HoodieBloomIndexCheckFunction implements private long totalKeysChecked; LazyKeyCheckIterator( - Iterator>> filePartitionRecordKeyTripletItr) { + Iterator> filePartitionRecordKeyTripletItr) { super(filePartitionRecordKeyTripletItr); currentFile = null; candidateRecordKeys = new ArrayList<>(); @@ -162,10 +162,10 @@ public class HoodieBloomIndexCheckFunction implements try { // process one file in each go. while (inputItr.hasNext()) { - Tuple2> currentTuple = inputItr.next(); - String fileName = currentTuple._2._1; - String partitionPath = currentTuple._2._2.getPartitionPath(); - String recordKey = currentTuple._2._2.getRecordKey(); + Tuple2 currentTuple = inputItr.next(); + String fileName = currentTuple._1; + String partitionPath = currentTuple._2.getPartitionPath(); + String recordKey = currentTuple._2.getRecordKey(); // lazily init state if (currentFile == null) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java index 05a5aa777..7bf85cb54 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java @@ -27,12 +27,14 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; + import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; @@ -76,7 +78,7 @@ public class HoodieGlobalBloomIndex extends Hoodi @Override @VisibleForTesting - JavaPairRDD> explodeRecordRDDWithFileComparisons( + JavaRDD> explodeRecordRDDWithFileComparisons( final Map> partitionToFileIndexInfo, JavaPairRDD partitionRecordKeyPairRDD) { Map indexToPartitionMap = new HashMap<>(); @@ -87,17 +89,14 @@ public class HoodieGlobalBloomIndex extends Hoodi IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo) : new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo); + return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> { String recordKey = partitionRecordKeyPair._2(); String partitionPath = partitionRecordKeyPair._1(); - List>> recordComparisons = new ArrayList<>(); - indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> { - recordComparisons.add( - new Tuple2<>(String.format("%s#%s", matchingFile, recordKey), - new Tuple2<>(matchingFile, - new HoodieKey(recordKey, indexToPartitionMap.get(matchingFile))))); - }); - return recordComparisons; - }).flatMapToPair(List::iterator); + + return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream() + .map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file)))) + .collect(Collectors.toList()); + }).flatMap(List::iterator); } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index 664c146d3..277c9a392 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -259,12 +259,12 @@ public class TestHoodieBloomIndex { new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t); - List>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons( + List> comparisonKeyList = index.explodeRecordRDDWithFileComparisons( partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); assertEquals(10, comparisonKeyList.size()); Map> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy( - t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()))); + t -> t._2.getRecordKey(), Collectors.mapping(t -> t._1, Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002"))); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java index 0228c2b3b..b220231c5 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java @@ -190,25 +190,25 @@ public class TestHoodieGlobalBloomIndex { new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t); - List>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons( - partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); + List> comparisonKeyList = + index.explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); - /* epecting: - f4#003, f4, HoodieKey { recordKey=003 partitionPath=2017/10/23} - f1#003, f1, HoodieKey { recordKey=003 partitionPath=2017/10/22} - f3#003, f3, HoodieKey { recordKey=003 partitionPath=2017/10/22} - f4#002, f4, HoodieKey { recordKey=002 partitionPath=2017/10/23} - f1#002, f1, HoodieKey { recordKey=002 partitionPath=2017/10/22} - f3#002, f3, HoodieKey { recordKey=002 partitionPath=2017/10/22} - f4#005, f4, HoodieKey { recordKey=005 partitionPath=2017/10/23} - f1#005, f1, HoodieKey { recordKey=005 partitionPath=2017/10/22} - f4#004, f4, HoodieKey { recordKey=004 partitionPath=2017/10/23} - f1#004, f1, HoodieKey { recordKey=004 partitionPath=2017/10/22} + /* expecting: + f4, HoodieKey { recordKey=003 partitionPath=2017/10/23} + f1, HoodieKey { recordKey=003 partitionPath=2017/10/22} + f3, HoodieKey { recordKey=003 partitionPath=2017/10/22} + f4, HoodieKey { recordKey=002 partitionPath=2017/10/23} + f1, HoodieKey { recordKey=002 partitionPath=2017/10/22} + f3, HoodieKey { recordKey=002 partitionPath=2017/10/22} + f4, HoodieKey { recordKey=005 partitionPath=2017/10/23} + f1, HoodieKey { recordKey=005 partitionPath=2017/10/22} + f4, HoodieKey { recordKey=004 partitionPath=2017/10/23} + f1, HoodieKey { recordKey=004 partitionPath=2017/10/22} */ assertEquals(10, comparisonKeyList.size()); - Map> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy( - t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()))); + Map> recordKeyToFileComps = comparisonKeyList.stream() + .collect(Collectors.groupingBy(t -> t._2.getRecordKey(), Collectors.mapping(Tuple2::_1, Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002")));