From 7129dc5bb7dded620141c868cefd64ab18bac139 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 28 Nov 2018 11:00:02 -0800 Subject: [PATCH] Improving Tag location using interval trees for index files Adding interface for index look up Adding index filtering implementations for global bloom index too --- .../hoodie/index/bloom/HoodieBloomIndex.java | 85 +++------ .../bloom/HoodieBloomIndexCheckFunction.java | 18 +- .../index/bloom/HoodieGlobalBloomIndex.java | 60 +++--- .../hoodie/index/bloom/IndexFileFilter.java | 38 ++++ ...ntervalTreeBasedGlobalIndexFileFilter.java | 68 +++++++ .../IntervalTreeBasedIndexFileFilter.java | 82 ++++++++ ...LookupResult.java => KeyLookupResult.java} | 12 +- .../index/bloom/KeyRangeLookupTree.java | 156 ++++++++++++++++ .../uber/hoodie/index/bloom/KeyRangeNode.java | 153 +++++++++++++++ .../bloom/SimpleGlobalIndexFileFilter.java | 53 ++++++ .../index/bloom/SimpleIndexFileFilter.java | 65 +++++++ .../index/bloom/TestHoodieBloomIndex.java | 166 +++++++++-------- .../bloom/TestHoodieGlobalBloomIndex.java | 8 +- .../index/bloom/TestKeyRangeLookupTree.java | 176 ++++++++++++++++++ 14 files changed, 953 insertions(+), 187 deletions(-) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexFileFilter.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java rename hoodie-client/src/main/java/com/uber/hoodie/index/bloom/{IndexLookupResult.java => KeyLookupResult.java} (80%) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyRangeLookupTree.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyRangeNode.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleGlobalIndexFileFilter.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleIndexFileFilter.java create mode 100644 hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestKeyRangeLookupTree.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index 5c187870f..5e60fade4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -38,6 +38,7 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.MetadataNotFoundException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -52,8 +53,7 @@ import org.apache.spark.storage.StorageLevel; import scala.Tuple2; /** - * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in - * its metadata. + * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata. */ public class HoodieBloomIndex extends HoodieIndex { @@ -134,8 +134,8 @@ public class HoodieBloomIndex extends HoodieIndex } /** - * Lookup the location for each record key and return the pair for all record - * keys already present and drop the record keys if not present + * Lookup the location for each record key and return the pair for all record keys already + * present and drop the record keys if not present */ private JavaPairRDD lookupIndex( JavaPairRDD partitionRecordKeyPairRDD, final JavaSparkContext @@ -159,14 +159,11 @@ public class HoodieBloomIndex extends HoodieIndex } /** - * The index lookup can be skewed in three dimensions : #files, #partitions, #records - *

- * To be able to smoothly handle skews, we need to compute how to split each partitions into - * subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to < - * 2GB. - *

- * If {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is specified - * as a NON-zero number, then that is used explicitly. + * The index lookup can be skewed in three dimensions : #files, #partitions, #records

To be able to smoothly + * handle skews, we need to compute how to split each partitions into subpartitions. We do it here, in a way that + * keeps the amount of each Spark join partition to < 2GB.

If + * {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} + * is specified as a NON-zero number, then that is used explicitly. */ private int autoComputeParallelism(final Map recordsPerPartition, final Map> partitionToFileInfo, @@ -206,14 +203,10 @@ public class HoodieBloomIndex extends HoodieIndex } /** - * Its crucial to pick the right parallelism. - *

- * totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism : - * typically number of input file splits - *

- * We pick the max such that, we are always safe, but go higher if say a there are a lot of input - * files. (otherwise, we will fallback to number of partitions in input and end up with slow - * performance) + * Its crucial to pick the right parallelism.

totalSubPartitions : this is deemed safe limit, to be nice with + * Spark. inputParallelism : typically number of input file splits

We pick the max such that, we are always safe, + * but go higher if say a there are a lot of input files. (otherwise, we will fallback to number of partitions in + * input and end up with slow performance) */ private int determineParallelism(int inputParallelism, int totalSubPartitions) { // If bloom index parallelism is set, use it to to check against the input parallelism and @@ -297,58 +290,42 @@ public class HoodieBloomIndex extends HoodieIndex return true; } - /** - * if we dont have key ranges, then also we need to compare against the file. no other choice if - * we do, then only compare the file if the record key falls in range. - */ - boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) { - return !indexInfo.hasKeyRanges() || indexInfo.isKeyInRange(recordKey); - } /** - * For each incoming record, produce N output records, 1 each for each file against which the - * record's key needs to be checked. For datasets, where the keys have a definite insert order - * (e.g: timestamp as prefix), the number of files to be compared gets cut down a lot from range - * pruning. + * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be + * checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files + * to be compared gets cut down a lot from range pruning. * - * Sub-partition to ensure the records can be looked up against files & also prune - * file<=>record comparisons based on recordKey - * ranges in the index info. + * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on + * recordKey ranges in the index info. */ @VisibleForTesting JavaPairRDD> explodeRecordRDDWithFileComparisons( final Map> partitionToFileIndexInfo, JavaPairRDD partitionRecordKeyPairRDD) { + IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges() + ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) + : new SimpleIndexFileFilter(partitionToFileIndexInfo); return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> { String recordKey = partitionRecordKeyPair._2(); String partitionPath = partitionRecordKeyPair._1(); - - List indexInfos = partitionToFileIndexInfo.get(partitionPath); List>> recordComparisons = new ArrayList<>(); - if (indexInfos != null) { // could be null, if there are no files in a given partition yet. - // for each candidate file in partition, that needs to be compared. - for (BloomIndexFileInfo indexInfo : indexInfos) { - if (shouldCompareWithFile(indexInfo, recordKey)) { - recordComparisons.add( - new Tuple2<>(String.format("%s#%s", indexInfo.getFileName(), recordKey), - new Tuple2<>(indexInfo.getFileName(), - new HoodieKey(recordKey, partitionPath)))); - } - } - } + indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> { + recordComparisons.add( + new Tuple2<>(String.format("%s#%s", matchingFile, recordKey), + new Tuple2<>(matchingFile, + new HoodieKey(recordKey, partitionPath)))); + }); return recordComparisons; }).flatMapToPair(List::iterator); } /** - * Find out pair. All workload grouped by file-level. - *

- * Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition such - * that each RDD partition is a file, then for each file, we do (1) load bloom filter, (2) load - * rowKeys, (3) Tag rowKey - *

- * Make sure the parallelism is atleast the groupby parallelism for tagging location + * Find out pair. All workload grouped by file-level.

Join PairRDD(PartitionPath, RecordKey) + * and PairRDD(PartitionPath, File) & then repartition such that each RDD partition is a file, then for each file, we + * do (1) load bloom filter, (2) load rowKeys, (3) Tag rowKey

Make sure the parallelism is atleast the groupby + * parallelism for tagging location */ @VisibleForTesting JavaPairRDD findMatchingFilesForRecordKeys( diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java index 0c2ffa6a9..8f0f5a592 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java @@ -43,7 +43,7 @@ import scala.Tuple2; */ public class HoodieBloomIndexCheckFunction implements Function2>>, - Iterator>> { + Iterator>> { private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class); @@ -81,14 +81,14 @@ public class HoodieBloomIndexCheckFunction implements } @Override - public Iterator> call(Integer partition, - Iterator>> filePartitionRecordKeyTripletItr) + public Iterator> call(Integer partition, + Iterator>> fileParitionRecordKeyTripletItr) throws Exception { - return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr); + return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr); } class LazyKeyCheckIterator extends - LazyIterableIterator>, List> { + LazyIterableIterator>, List> { private List candidateRecordKeys; @@ -125,9 +125,9 @@ public class HoodieBloomIndexCheckFunction implements } @Override - protected List computeNext() { + protected List computeNext() { - List ret = new ArrayList<>(); + List ret = new ArrayList<>(); try { // process one file in each go. while (inputItr.hasNext()) { @@ -162,7 +162,7 @@ public class HoodieBloomIndexCheckFunction implements logger .debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys); } - ret.add(new IndexLookupResult(currentFile, + ret.add(new KeyLookupResult(currentFile, checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath))); initState(fileName, partitionPath); @@ -185,7 +185,7 @@ public class HoodieBloomIndexCheckFunction implements if (logger.isDebugEnabled()) { logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys); } - ret.add(new IndexLookupResult(currentFile, + ret.add(new KeyLookupResult(currentFile, checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath))); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java index db1f76f32..a366d9135 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java @@ -28,18 +28,16 @@ import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; +import java.util.*; +import java.util.Map.Entry; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; /** - * This filter will only work with hoodie dataset since it will only load partitions - * with .hoodie_partition_metadata file in it. + * This filter will only work with hoodie dataset since it will only load partitions with .hoodie_partition_metadata + * file in it. */ public class HoodieGlobalBloomIndex extends HoodieBloomIndex { @@ -53,7 +51,7 @@ public class HoodieGlobalBloomIndex extends Hoodi @Override @VisibleForTesting List> loadInvolvedFiles(List partitions, final JavaSparkContext jsc, - final HoodieTable hoodieTable) { + final HoodieTable hoodieTable) { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); try { List allPartitionPaths = FSUtils @@ -66,45 +64,39 @@ public class HoodieGlobalBloomIndex extends Hoodi } /** - * For each incoming record, produce N output records, 1 each for each file against which the - * record's key needs to be checked. For datasets, where the keys have a definite insert order - * (e.g: timestamp as prefix), the number of files to be compared gets cut down a lot from range - * pruning. + * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be + * checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files + * to be compared gets cut down a lot from range pruning. * - * Sub-partition to ensure the records can be looked up against files & also prune - * file<=>record comparisons based on recordKey - * ranges in the index info. - * the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will be ignored - * since the search scope should be bigger than that + * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on + * recordKey ranges in the index info. the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will + * be ignored since the search scope should be bigger than that */ + @Override @VisibleForTesting JavaPairRDD> explodeRecordRDDWithFileComparisons( final Map> partitionToFileIndexInfo, JavaPairRDD partitionRecordKeyPairRDD) { - List> indexInfos = - partitionToFileIndexInfo.entrySet().stream() - .flatMap(e1 -> e1.getValue().stream() - .map(e2 -> new Tuple2<>(e1.getKey(), e2))) - .collect(Collectors.toList()); + Map indexToPartitionMap = new HashMap<>(); + for (Entry> entry : partitionToFileIndexInfo.entrySet()) { + entry.getValue().forEach(indexFile -> indexToPartitionMap.put(indexFile.getFileName(), entry.getKey())); + } + IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges() + ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo) + : new SimpleGlobalIndexFileFilter(partitionToFileIndexInfo); return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> { String recordKey = partitionRecordKeyPair._2(); - + String partitionPath = partitionRecordKeyPair._1(); List>> recordComparisons = new ArrayList<>(); - if (indexInfos != null) { // could be null, if there are no files in a given partition yet. - // for each candidate file in partition, that needs to be compared. - for (Tuple2 indexInfo : indexInfos) { - if (shouldCompareWithFile(indexInfo._2(), recordKey)) { - recordComparisons.add( - new Tuple2<>(String.format("%s#%s", indexInfo._2().getFileName(), recordKey), - new Tuple2<>(indexInfo._2().getFileName(), - new HoodieKey(recordKey, indexInfo._1())))); - } - } - } + indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> { + recordComparisons.add( + new Tuple2<>(String.format("%s#%s", matchingFile, recordKey), + new Tuple2<>(matchingFile, + new HoodieKey(recordKey, indexToPartitionMap.get(matchingFile))))); + }); return recordComparisons; }).flatMapToPair(List::iterator); } - } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexFileFilter.java new file mode 100644 index 000000000..9eac34758 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexFileFilter.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import java.io.Serializable; +import java.util.Set; + +/** + * IndexFile filter to assist in look up of a record key. + */ +public interface IndexFileFilter extends Serializable { + + /** + * Fetches all matching files for a given record key and partition. + * + * @param partitionPath the partition path of interest + * @param recordKey the record key to be looked up + * @return the {@link Set} of matching file names where the record could potentially be present. + */ + Set getMatchingFiles(String partitionPath, String recordKey); + +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java new file mode 100644 index 000000000..0bb920701 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Interval Tree based index look up for Global Index. Builds an {@link KeyRangeLookupTree} for all index files (across + * all partitions) and uses it to search for matching index files for any given recordKey that needs to be looked up. + */ +class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { + + private final KeyRangeLookupTree indexLookUpTree = new KeyRangeLookupTree(); + private final Set filesWithNoRanges = new HashSet<>(); + + /** + * Instantiates {@link IntervalTreeBasedGlobalIndexFileFilter} + * + * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s + */ + IntervalTreeBasedGlobalIndexFileFilter(final Map> partitionToFileIndexInfo) { + List allIndexFiles = partitionToFileIndexInfo.values().stream().flatMap( + indexFiles -> indexFiles.stream()).collect(Collectors.toList()); + // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time. + // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed + // which could result in N search time instead of NlogN. + Collections.shuffle(allIndexFiles); + allIndexFiles.forEach(indexFile -> { + if (indexFile.hasKeyRanges()) { + indexLookUpTree.insert(new KeyRangeNode(indexFile.getMinRecordKey(), + indexFile.getMaxRecordKey(), indexFile.getFileName())); + } else { + filesWithNoRanges.add(indexFile.getFileName()); + } + }); + } + + @Override + public Set getMatchingFiles(String partitionPath, String recordKey) { + Set toReturn = new HashSet<>(); + toReturn.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey)); + filesWithNoRanges.forEach(indexFile -> { + toReturn.add(indexFile); + }); + return toReturn; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java new file mode 100644 index 000000000..25c4f158e --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Interval Tree based index look up. Builds an {@link KeyRangeLookupTree} for every partition and uses it to search for + * matching index files for any given recordKey that needs to be looked up. + */ +class IntervalTreeBasedIndexFileFilter implements IndexFileFilter { + + private final Map partitionToFileIndexLookUpTree = new HashMap<>(); + private final Map> filesWithNoRanges = new HashMap<>(); + + /** + * Instantiates {@link IntervalTreeBasedIndexFileFilter} + * + * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s + */ + IntervalTreeBasedIndexFileFilter(final Map> partitionToFileIndexInfo) { + partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> { + // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time. + // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be + // skewed which could result in N search time instead of logN. + Collections.shuffle(bloomIndexFiles); + KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree(); + bloomIndexFiles.forEach(indexFileInfo -> { + if (indexFileInfo.hasKeyRanges()) { + lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), + indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileName())); + } else { + if (!filesWithNoRanges.containsKey(partition)) { + filesWithNoRanges.put(partition, new HashSet<>()); + } + filesWithNoRanges.get(partition).add(indexFileInfo.getFileName()); + } + }); + partitionToFileIndexLookUpTree.put(partition, lookUpTree); + }); + } + + @Override + public Set getMatchingFiles(String partitionPath, String recordKey) { + Set toReturn = new HashSet<>(); + if (partitionToFileIndexLookUpTree + .containsKey(partitionPath)) { // could be null, if there are no files in a given partition yet or if all + // index files has no ranges + partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey) + .forEach(matchingFileName -> { + toReturn.add(matchingFileName); + }); + } + if (filesWithNoRanges.containsKey(partitionPath)) { + filesWithNoRanges.get(partitionPath).forEach(fileName -> { + toReturn.add(fileName); + }); + } + return toReturn; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexLookupResult.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java similarity index 80% rename from hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexLookupResult.java rename to hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java index 37760646e..e713c8b6d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexLookupResult.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyLookupResult.java @@ -21,16 +21,14 @@ package com.uber.hoodie.index.bloom; import java.util.List; /** - * Encapsulates the result from an index lookup + * Encapsulates the result from a key lookup */ -public class IndexLookupResult { +public class KeyLookupResult { - private String fileName; + private final String fileName; + private final List matchingRecordKeys; - - private List matchingRecordKeys; - - public IndexLookupResult(String fileName, List matchingRecordKeys) { + public KeyLookupResult(String fileName, List matchingRecordKeys) { this.fileName = fileName; this.matchingRecordKeys = matchingRecordKeys; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyRangeLookupTree.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyRangeLookupTree.java new file mode 100644 index 000000000..84ec21034 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyRangeLookupTree.java @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; + +/** + * Look up tree implemented as interval trees to search for any given key in (N logN) time complexity. + */ +class KeyRangeLookupTree implements Serializable { + + private KeyRangeNode root; + + /** + * @return the root of the tree. Could be {@code null} + */ + public KeyRangeNode getRoot() { + return root; + } + + /** + * Inserts a new {@link KeyRangeNode} to this look up tree. + * + * @param newNode the new {@link KeyRangeNode} to be inserted + */ + void insert(KeyRangeNode newNode) { + root = insert(getRoot(), newNode); + } + + /** + * Inserts a new {@link KeyRangeNode} to this look up tree. + * + * If no root exists, make {@code newNode} as the root and return the new root. + * + * If current root and newNode matches with min record key and max record key, + * merge two nodes. In other words, add files from {@code newNode} to current root. + * Return current root. + * + * If current root is < newNode + * if current root has no right sub tree + * update current root's right sub tree max and min + * set newNode as right sub tree + * else + * update root's right sub tree min and max with newNode's min and max record key as applicable + * recursively call insert() with root's right subtree as new root + * + * else // current root is >= newNode + * if current root has no left sub tree + * update current root's left sub tree max and min + * set newNode as left sub tree + * else + * update root's left sub tree min and max with newNode's min and max record key as applicable + * recursively call insert() with root's left subtree as new root + * + * @param root refers to the current root of the look up tree + * @param newNode newNode the new {@link KeyRangeNode} to be inserted + */ + private KeyRangeNode insert(KeyRangeNode root, KeyRangeNode newNode) { + if (root == null) { + root = newNode; + return root; + } + + if (root.compareTo(newNode) == 0) { + root.addFiles(newNode.getFileNameList()); + return root; + } + + if (root.compareTo(newNode) < 0) { + if (root.getRight() == null) { + root.setRightSubTreeMax(newNode.getMaxRecordKey()); + root.setRightSubTreeMin(newNode.getMinRecordKey()); + root.setRight(newNode); + } else { + if (root.getRightSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) { + root.setRightSubTreeMax(newNode.getMaxRecordKey()); + } + if (root.getRightSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) { + root.setRightSubTreeMin(newNode.getMinRecordKey()); + } + insert(root.getRight(), newNode); + } + } else { + if (root.getLeft() == null) { + root.setLeftSubTreeMax(newNode.getMaxRecordKey()); + root.setLeftSubTreeMin(newNode.getMinRecordKey()); + root.setLeft(newNode); + } else { + if (root.getLeftSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) { + root.setLeftSubTreeMax(newNode.getMaxRecordKey()); + } + if (root.getLeftSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) { + root.setLeftSubTreeMin(newNode.getMinRecordKey()); + } + insert(root.getLeft(), newNode); + } + } + return root; + } + + /** + * Fetches all the matching index files where the key could possibly be present. + * + * @param lookupKey the key to be searched for + * @return the {@link Set} of matching index file names + */ + Set getMatchingIndexFiles(String lookupKey) { + Set matchingFileNameSet = new HashSet<>(); + getMatchingIndexFiles(getRoot(), lookupKey, matchingFileNameSet); + return matchingFileNameSet; + } + + /** + * Fetches all the matching index files where the key could possibly be present. + * + * @param root refers to the current root of the look up tree + * @param lookupKey the key to be searched for + */ + private void getMatchingIndexFiles(KeyRangeNode root, String lookupKey, Set matchingFileNameSet) { + if (root == null) { + return; + } + + if (root.getMinRecordKey().compareTo(lookupKey) <= 0 && lookupKey.compareTo(root.getMaxRecordKey()) <= 0) { + matchingFileNameSet.addAll(root.getFileNameList()); + } + + if (root.getLeftSubTreeMax() != null && root.getLeftSubTreeMin().compareTo(lookupKey) <= 0 + && lookupKey.compareTo(root.getLeftSubTreeMax()) <= 0) { + getMatchingIndexFiles(root.getLeft(), lookupKey, matchingFileNameSet); + } + + if (root.getRightSubTreeMax() != null && root.getRightSubTreeMin().compareTo(lookupKey) <= 0 + && lookupKey.compareTo(root.getRightSubTreeMax()) <= 0) { + getMatchingIndexFiles(root.getRight(), lookupKey, matchingFileNameSet); + } + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyRangeNode.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyRangeNode.java new file mode 100644 index 000000000..be1ec0238 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/KeyRangeNode.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Represents a node in the {@link KeyRangeLookupTree}. Holds information pertaining to a single index file, viz file + * name, min record key and max record key. + */ +class KeyRangeNode implements Comparable, Serializable { + + private final List fileNameList = new ArrayList<>(); + private final String minRecordKey; + private final String maxRecordKey; + private String rightSubTreeMax = null; + private String leftSubTreeMax = null; + private String rightSubTreeMin = null; + private String leftSubTreeMin = null; + private KeyRangeNode left = null; + private KeyRangeNode right = null; + + /** + * Instantiates a new {@link KeyRangeNode} + * + * @param minRecordKey min record key of the index file + * @param maxRecordKey max record key of the index file + * @param fileName file name of the index file + */ + KeyRangeNode(String minRecordKey, String maxRecordKey, String fileName) { + this.fileNameList.add(fileName); + this.minRecordKey = minRecordKey; + this.maxRecordKey = maxRecordKey; + } + + /** + * Adds a new file name list to existing list of file names. + * + * @param newFiles {@link List} of file names to be added + */ + void addFiles(List newFiles) { + this.fileNameList.addAll(newFiles); + } + + @Override + public String toString() { + return "KeyRangeNode{" + + "minRecordKey='" + minRecordKey + '\'' + + ", maxRecordKey='" + maxRecordKey + '\'' + + ", fileNameList=" + fileNameList + + ", rightSubTreeMax='" + rightSubTreeMax + '\'' + + ", leftSubTreeMax='" + leftSubTreeMax + '\'' + + ", rightSubTreeMin='" + rightSubTreeMin + '\'' + + ", leftSubTreeMin='" + leftSubTreeMin + '\'' + + '}'; + } + + /** + * Compares the min record key of two nodes, followed by max record key. + * + * @param that the {@link KeyRangeNode} to be compared with + * @return the result of comparison. 0 if both min and max are equal in both. 1 if this {@link KeyRangeNode} is + * greater than the {@code that} keyRangeNode. -1 if {@code that} keyRangeNode is greater than this {@link + * KeyRangeNode} + */ + @Override + public int compareTo(KeyRangeNode that) { + int compareValue = minRecordKey.compareTo(that.minRecordKey); + if (compareValue == 0) { + return maxRecordKey.compareTo(that.maxRecordKey); + } else { + return compareValue; + } + } + + public List getFileNameList() { + return fileNameList; + } + + public String getMinRecordKey() { + return minRecordKey; + } + + public String getMaxRecordKey() { + return maxRecordKey; + } + + public String getRightSubTreeMin() { + return rightSubTreeMin; + } + + public void setRightSubTreeMin(String rightSubTreeMin) { + this.rightSubTreeMin = rightSubTreeMin; + } + + public String getLeftSubTreeMin() { + return leftSubTreeMin; + } + + public void setLeftSubTreeMin(String leftSubTreeMin) { + this.leftSubTreeMin = leftSubTreeMin; + } + + public String getRightSubTreeMax() { + return rightSubTreeMax; + } + + public void setRightSubTreeMax(String rightSubTreeMax) { + this.rightSubTreeMax = rightSubTreeMax; + } + + public String getLeftSubTreeMax() { + return leftSubTreeMax; + } + + public void setLeftSubTreeMax(String leftSubTreeMax) { + this.leftSubTreeMax = leftSubTreeMax; + } + + public KeyRangeNode getLeft() { + return left; + } + + public void setLeft(KeyRangeNode left) { + this.left = left; + } + + public KeyRangeNode getRight() { + return right; + } + + public void setRight(KeyRangeNode right) { + this.right = right; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleGlobalIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleGlobalIndexFileFilter.java new file mode 100644 index 000000000..baef30f06 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleGlobalIndexFileFilter.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +class SimpleGlobalIndexFileFilter extends SimpleIndexFileFilter { + + /** + * Instantiates {@link SimpleGlobalIndexFileFilter} + * + * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo} + */ + SimpleGlobalIndexFileFilter( + Map> partitionToFileIndexInfo) { + super(partitionToFileIndexInfo); + } + + @Override + public Set getMatchingFiles(String partitionPath, String recordKey) { + Set toReturn = new HashSet<>(); + partitionToFileIndexInfo.values().forEach(indexInfos -> { + if (indexInfos != null) { // could be null, if there are no files in a given partition yet. + // for each candidate file in partition, that needs to be compared. + for (BloomIndexFileInfo indexInfo : indexInfos) { + if (shouldCompareWithFile(indexInfo, recordKey)) { + toReturn.add(indexInfo.getFileName()); + } + } + } + }); + return toReturn; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleIndexFileFilter.java new file mode 100644 index 000000000..3eb0207e0 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleIndexFileFilter.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Simple implementation of {@link IndexFileFilter}. Sequentially goes through every index file in a given partition to + * search for potential index files to be searched for a given record key. + */ +class SimpleIndexFileFilter implements IndexFileFilter { + + final Map> partitionToFileIndexInfo; + + /** + * Instantiates {@link SimpleIndexFileFilter} + * + * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo} + */ + SimpleIndexFileFilter(final Map> partitionToFileIndexInfo) { + this.partitionToFileIndexInfo = partitionToFileIndexInfo; + } + + @Override + public Set getMatchingFiles(String partitionPath, String recordKey) { + List indexInfos = partitionToFileIndexInfo.get(partitionPath); + Set toReturn = new HashSet<>(); + if (indexInfos != null) { // could be null, if there are no files in a given partition yet. + // for each candidate file in partition, that needs to be compared. + for (BloomIndexFileInfo indexInfo : indexInfos) { + if (shouldCompareWithFile(indexInfo, recordKey)) { + toReturn.add(indexInfo.getFileName()); + } + } + } + return toReturn; + } + + /** + * if we dont have key ranges, then also we need to compare against the file. no other choice if we do, then only + * compare the file if the record key falls in range. + */ + protected boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) { + return !indexInfo.hasKeyRanges() || indexInfo.isKeyInRange(recordKey); + } +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index 5f602d7f7..a52881d2d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -40,10 +40,7 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.commons.io.IOUtils; @@ -174,7 +171,7 @@ public class TestHoodieBloomIndex { HoodieClientTestUtils .writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet", Arrays.asList(record2, record3, record4), schema, null, - false); + false); List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); @@ -211,31 +208,34 @@ public class TestHoodieBloomIndex { @Test public void testRangePruning() { + for (Boolean rangePruning : new boolean[]{false, true}) { + Map props = new HashMap<>(); + props.put("hoodie.bloom.index.prune.by" + ".ranges", rangePruning.toString()); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withProps(props).build(); + HoodieBloomIndex index = new HoodieBloomIndex(config); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); - HoodieBloomIndex index = new HoodieBloomIndex(config); + final Map> partitionToFileIndexInfo = new HashMap<>(); + partitionToFileIndexInfo.put("2017/10/22", Arrays.asList(new BloomIndexFileInfo("f1"), + new BloomIndexFileInfo("f2", "000", "000"), new BloomIndexFileInfo("f3", "001", "003"), + new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010"))); - final Map> partitionToFileIndexInfo = new HashMap<>(); - partitionToFileIndexInfo.put("2017/10/22", Arrays.asList(new BloomIndexFileInfo("f1"), - new BloomIndexFileInfo("f2", "000", "000"), new BloomIndexFileInfo("f3", "001", "003"), - new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010"))); + JavaPairRDD partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList( + new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), + new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t); - JavaPairRDD partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList( - new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), - new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t); + List>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons( + partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); - List>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons( - partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); + assertEquals(10, comparisonKeyList.size()); + Map> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy( + t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()))); - assertEquals(10, comparisonKeyList.size()); - Map> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy( - t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()))); - - assertEquals(4, recordKeyToFileComps.size()); - assertEquals(Arrays.asList("f1", "f3", "f4"), recordKeyToFileComps.get("002")); - assertEquals(Arrays.asList("f1", "f3", "f4"), recordKeyToFileComps.get("003")); - assertEquals(Arrays.asList("f1", "f4"), recordKeyToFileComps.get("004")); - assertEquals(Arrays.asList("f1", "f4"), recordKeyToFileComps.get("005")); + assertEquals(4, recordKeyToFileComps.size()); + assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002"))); + assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("003"))); + assertEquals(new HashSet<>(Arrays.asList("f1", "f4")), new HashSet<>(recordKeyToFileComps.get("004"))); + assertEquals(new HashSet<>(Arrays.asList("f1", "f4")), new HashSet<>(recordKeyToFileComps.get("005"))); + } } @Test @@ -315,68 +315,76 @@ public class TestHoodieBloomIndex { @Test public void testTagLocation() throws Exception { - // We have some records to be tagged (two different partitions) + for (Boolean rangePruning : new boolean[]{false, true}) { + Map props = new HashMap<>(); + props.put("hoodie.bloom.index.prune.by" + ".ranges", rangePruning.toString()); + // We have some records to be tagged (two different partitions) - String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; - String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; - String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; - String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," - + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; - TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); - HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), - rowChange1); - TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); - HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), - rowChange2); - TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); - HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), - rowChange3); - TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); - HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), - rowChange4); - JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); + String rowKey1 = UUID.randomUUID().toString(); + String rowKey2 = UUID.randomUUID().toString(); + String rowKey3 = UUID.randomUUID().toString(); + String rowKey4 = UUID.randomUUID().toString(); + String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + String recordStr4 = "{\"_row_key\":\"" + rowKey4 + "\"," + + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; + TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); + HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), + rowChange1); + TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); + HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), + rowChange2); + TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); + HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), + rowChange3); + TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); + HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), + rowChange4); + JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); - // Also create the metadata and config - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + // Also create the metadata and config + HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withProps(props).build(); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); - // Let's tag - HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); - JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); + // Let's tag + HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); + JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); - // Should not find any files - for (HoodieRecord record : taggedRecordRDD.collect()) { - assertTrue(!record.isCurrentLocationKnown()); - } + // Should not find any files + for (HoodieRecord record : taggedRecordRDD.collect()) { + assertFalse(record.isCurrentLocationKnown()); + } - // We create three parquet file, each having one record. (two different partitions) - String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true); - String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true); - String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true); + // We create three parquet file, each having one record. (two different partitions) + String filename1 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true); + String filename2 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true); + String filename3 = + HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true); - // We do the tag again - metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metadata, config, jsc); + // We do the tag again + metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + table = HoodieTable.getHoodieTable(metadata, config, jsc); - taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); + taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); - // Check results - for (HoodieRecord record : taggedRecordRDD.collect()) { - if (record.getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename1))); - } else if (record.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2))); - } else if (record.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) { - assertTrue(!record.isCurrentLocationKnown()); - } else if (record.getRecordKey().equals("4eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); + // Check results + for (HoodieRecord record : taggedRecordRDD.collect()) { + if (record.getRecordKey().equals(rowKey1)) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename1))); + } else if (record.getRecordKey().equals(rowKey2)) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2))); + } else if (record.getRecordKey().equals(rowKey3)) { + assertTrue(!record.isCurrentLocationKnown()); + } else if (record.getRecordKey().equals(rowKey4)) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); + } } } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java index 8f222b37f..0228c2b3b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java @@ -211,10 +211,10 @@ public class TestHoodieGlobalBloomIndex { t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); - assertEquals(Arrays.asList("f4", "f1", "f3"), recordKeyToFileComps.get("002")); - assertEquals(Arrays.asList("f4", "f1", "f3"), recordKeyToFileComps.get("003")); - assertEquals(Arrays.asList("f4", "f1"), recordKeyToFileComps.get("004")); - assertEquals(Arrays.asList("f4", "f1"), recordKeyToFileComps.get("005")); + assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002"))); + assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("003"))); + assertEquals(new HashSet<>(Arrays.asList("f4", "f1")), new HashSet<>(recordKeyToFileComps.get("004"))); + assertEquals(new HashSet<>(Arrays.asList("f4", "f1")), new HashSet<>(recordKeyToFileComps.get("005"))); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestKeyRangeLookupTree.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestKeyRangeLookupTree.java new file mode 100644 index 000000000..9bfd87551 --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestKeyRangeLookupTree.java @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import org.junit.Test; + +/** + * Tests {@link KeyRangeLookupTree} + */ +public class TestKeyRangeLookupTree { + + private static final Random RANDOM = new Random(); + private KeyRangeLookupTree keyRangeLookupTree; + private Map> expectedMatches; + + public TestKeyRangeLookupTree() { + keyRangeLookupTree = new KeyRangeLookupTree(); + expectedMatches = new HashMap<>(); + } + + /** + * Tests for single node in the tree for different inputs. + */ + @Test + public void testFileGroupLookUpOneEntry() { + KeyRangeNode toInsert = new KeyRangeNode(Long.toString(300), Long.toString(450), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + testRangeOfInputs(290, 305); + testRangeOfInputs(390, 400); + testRangeOfInputs(445, 455); + testRangeOfInputs(600, 605); + } + + /** + * Tests for many entries in the tree with same start value and different end values + */ + @Test + public void testFileGroupLookUpManyEntriesWithSameStartValue() { + String startKey = Long.toString(120); + long endKey = 250; + KeyRangeNode toInsert = new KeyRangeNode(startKey, Long.toString(endKey), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + for (int i = 0; i < 10; i++) { + endKey += 1 + RANDOM.nextInt(100); + toInsert = new KeyRangeNode(startKey, Long.toString(endKey), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + } + testRangeOfInputs(110, endKey + 5); + } + + /** + * Tests for many duplicte entries in the tree + */ + @Test + public void testFileGroupLookUpManyDulicateEntries() { + KeyRangeNode toInsert = new KeyRangeNode(Long.toString(1200), Long.toString(2000), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + for (int i = 0; i < 10; i++) { + toInsert = new KeyRangeNode(Long.toString(1200), Long.toString(2000), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + } + testRangeOfInputs(1050, 1100); + testRangeOfInputs(1500, 1600); + testRangeOfInputs(1990, 2100); + } + + // Tests helpers + + /** + * Tests for curated entries in look up tree. + */ + @Test + public void testFileGroupLookUp() { + + // testing with hand curated inputs + KeyRangeNode toInsert = new KeyRangeNode(Long.toString(500), Long.toString(600), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(750), Long.toString(950), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(120), Long.toString(620), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(550), Long.toString(775), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(725), Long.toString(850), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(750), Long.toString(825), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(750), Long.toString(990), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(800), Long.toString(820), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(200), Long.toString(550), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(520), Long.toString(600), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + toInsert = new KeyRangeNode(Long.toString(120), Long.toString(620), UUID.randomUUID().toString()); + updateExpectedMatchesToTest(toInsert); + keyRangeLookupTree.insert(toInsert); + testRangeOfInputs(110, 999); + } + + /** + * Method to test the look up tree for different range of input keys. + * + * @param start starting value of the look up key + * @param end end value of the look up tree + */ + private void testRangeOfInputs(long start, long end) { + for (long i = start; i <= end; i++) { + String iStr = Long.toString(i); + if (!expectedMatches.containsKey(iStr)) { + assertEquals(Collections.EMPTY_SET, keyRangeLookupTree.getMatchingIndexFiles(iStr)); + } else { + assertTrue(expectedMatches.get(iStr).equals(keyRangeLookupTree.getMatchingIndexFiles(iStr))); + } + } + } + + /** + * Updates the expected matches for a given {@link KeyRangeNode} + * + * @param toInsert the {@link KeyRangeNode} to be inserted + */ + private void updateExpectedMatchesToTest(KeyRangeNode toInsert) { + long startKey = Long.parseLong(toInsert.getMinRecordKey()); + long endKey = Long.parseLong(toInsert.getMaxRecordKey()); + for (long i = startKey; i <= endKey; i++) { + String iStr = Long.toString(i); + if (!expectedMatches.containsKey(iStr)) { + expectedMatches.put(iStr, new HashSet<>()); + } + expectedMatches.get(iStr).add(toInsert.getFileNameList().get(0)); + } + } + +}