1
0

Improving Tag location using interval trees for index files

Adding interface for index look up

Adding index filtering implementations for global bloom index too
This commit is contained in:
Sivabalan Narayanan
2018-11-28 11:00:02 -08:00
committed by vinoth chandar
parent 461ce18bd1
commit 7129dc5bb7
14 changed files with 953 additions and 187 deletions

View File

@@ -38,6 +38,7 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.MetadataNotFoundException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -52,8 +53,7 @@ import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
/**
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in
* its metadata.
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
*/
public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
@@ -134,8 +134,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
}
/**
* Lookup the location for each record key and return the pair<record_key,location> for all record
* keys already present and drop the record keys if not present
* Lookup the location for each record key and return the pair<record_key,location> for all record keys already
* present and drop the record keys if not present
*/
private JavaPairRDD<String, String> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext
@@ -159,14 +159,11 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
}
/**
* The index lookup can be skewed in three dimensions : #files, #partitions, #records
* <p>
* To be able to smoothly handle skews, we need to compute how to split each partitions into
* subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to <
* 2GB.
* <p>
* If {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is specified
* as a NON-zero number, then that is used explicitly.
* The index lookup can be skewed in three dimensions : #files, #partitions, #records <p> To be able to smoothly
* handle skews, we need to compute how to split each partitions into subpartitions. We do it here, in a way that
* keeps the amount of each Spark join partition to < 2GB. <p> If
* {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP}
* is specified as a NON-zero number, then that is used explicitly.
*/
private int autoComputeParallelism(final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
@@ -206,14 +203,10 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
}
/**
* Its crucial to pick the right parallelism.
* <p>
* totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism :
* typically number of input file splits
* <p>
* We pick the max such that, we are always safe, but go higher if say a there are a lot of input
* files. (otherwise, we will fallback to number of partitions in input and end up with slow
* performance)
* Its crucial to pick the right parallelism. <p> totalSubPartitions : this is deemed safe limit, to be nice with
* Spark. inputParallelism : typically number of input file splits <p> We pick the max such that, we are always safe,
* but go higher if say a there are a lot of input files. (otherwise, we will fallback to number of partitions in
* input and end up with slow performance)
*/
private int determineParallelism(int inputParallelism, int totalSubPartitions) {
// If bloom index parallelism is set, use it to to check against the input parallelism and
@@ -297,58 +290,42 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
return true;
}
/**
* if we dont have key ranges, then also we need to compare against the file. no other choice if
* we do, then only compare the file if the record key falls in range.
*/
boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) {
return !indexInfo.hasKeyRanges() || indexInfo.isKeyInRange(recordKey);
}
/**
* For each incoming record, produce N output records, 1 each for each file against which the
* record's key needs to be checked. For datasets, where the keys have a definite insert order
* (e.g: timestamp as prefix), the number of files to be compared gets cut down a lot from range
* pruning.
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
*
* Sub-partition to ensure the records can be looked up against files & also prune
* file<=>record comparisons based on recordKey
* ranges in the index info.
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
@VisibleForTesting
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
: new SimpleIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
List<BloomIndexFileInfo> indexInfos = partitionToFileIndexInfo.get(partitionPath);
List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
// for each candidate file in partition, that needs to be compared.
for (BloomIndexFileInfo indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo, recordKey)) {
recordComparisons.add(
new Tuple2<>(String.format("%s#%s", indexInfo.getFileName(), recordKey),
new Tuple2<>(indexInfo.getFileName(),
new HoodieKey(recordKey, partitionPath))));
}
}
}
indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
recordComparisons.add(
new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
new Tuple2<>(matchingFile,
new HoodieKey(recordKey, partitionPath))));
});
return recordComparisons;
}).flatMapToPair(List::iterator);
}
/**
* Find out <RowKey, filename> pair. All workload grouped by file-level.
* <p>
* Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition such
* that each RDD partition is a file, then for each file, we do (1) load bloom filter, (2) load
* rowKeys, (3) Tag rowKey
* <p>
* Make sure the parallelism is atleast the groupby parallelism for tagging location
* Find out <RowKey, filename> pair. All workload grouped by file-level. <p> Join PairRDD(PartitionPath, RecordKey)
* and PairRDD(PartitionPath, File) & then repartition such that each RDD partition is a file, then for each file, we
* do (1) load bloom filter, (2) load rowKeys, (3) Tag rowKey <p> Make sure the parallelism is atleast the groupby
* parallelism for tagging location
*/
@VisibleForTesting
JavaPairRDD<String, String> findMatchingFilesForRecordKeys(

View File

@@ -43,7 +43,7 @@ import scala.Tuple2;
*/
public class HoodieBloomIndexCheckFunction implements
Function2<Integer, Iterator<Tuple2<String, Tuple2<String, HoodieKey>>>,
Iterator<List<IndexLookupResult>>> {
Iterator<List<KeyLookupResult>>> {
private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class);
@@ -81,14 +81,14 @@ public class HoodieBloomIndexCheckFunction implements
}
@Override
public Iterator<List<IndexLookupResult>> call(Integer partition,
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> filePartitionRecordKeyTripletItr)
public Iterator<List<KeyLookupResult>> call(Integer partition,
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> fileParitionRecordKeyTripletItr)
throws Exception {
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
}
class LazyKeyCheckIterator extends
LazyIterableIterator<Tuple2<String, Tuple2<String, HoodieKey>>, List<IndexLookupResult>> {
LazyIterableIterator<Tuple2<String, Tuple2<String, HoodieKey>>, List<KeyLookupResult>> {
private List<String> candidateRecordKeys;
@@ -125,9 +125,9 @@ public class HoodieBloomIndexCheckFunction implements
}
@Override
protected List<IndexLookupResult> computeNext() {
protected List<KeyLookupResult> computeNext() {
List<IndexLookupResult> ret = new ArrayList<>();
List<KeyLookupResult> ret = new ArrayList<>();
try {
// process one file in each go.
while (inputItr.hasNext()) {
@@ -162,7 +162,7 @@ public class HoodieBloomIndexCheckFunction implements
logger
.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys);
}
ret.add(new IndexLookupResult(currentFile,
ret.add(new KeyLookupResult(currentFile,
checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath)));
initState(fileName, partitionPath);
@@ -185,7 +185,7 @@ public class HoodieBloomIndexCheckFunction implements
if (logger.isDebugEnabled()) {
logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys);
}
ret.add(new IndexLookupResult(currentFile,
ret.add(new KeyLookupResult(currentFile,
checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath)));
}

View File

@@ -28,18 +28,16 @@ import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.*;
import java.util.Map.Entry;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
/**
* This filter will only work with hoodie dataset since it will only load partitions
* with .hoodie_partition_metadata file in it.
* This filter will only work with hoodie dataset since it will only load partitions with .hoodie_partition_metadata
* file in it.
*/
public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends HoodieBloomIndex<T> {
@@ -53,7 +51,7 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
@Override
@VisibleForTesting
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
try {
List<String> allPartitionPaths = FSUtils
@@ -66,45 +64,39 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
}
/**
* For each incoming record, produce N output records, 1 each for each file against which the
* record's key needs to be checked. For datasets, where the keys have a definite insert order
* (e.g: timestamp as prefix), the number of files to be compared gets cut down a lot from range
* pruning.
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
* checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
* to be compared gets cut down a lot from range pruning.
*
* Sub-partition to ensure the records can be looked up against files & also prune
* file<=>record comparisons based on recordKey
* ranges in the index info.
* the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will be ignored
* since the search scope should be bigger than that
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info. the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will
* be ignored since the search scope should be bigger than that
*/
@Override
@VisibleForTesting
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
List<Tuple2<String, BloomIndexFileInfo>> indexInfos =
partitionToFileIndexInfo.entrySet().stream()
.flatMap(e1 -> e1.getValue().stream()
.map(e2 -> new Tuple2<>(e1.getKey(), e2)))
.collect(Collectors.toList());
Map<String, String> indexToPartitionMap = new HashMap<>();
for (Entry<String, List<BloomIndexFileInfo>> entry : partitionToFileIndexInfo.entrySet()) {
entry.getValue().forEach(indexFile -> indexToPartitionMap.put(indexFile.getFileName(), entry.getKey()));
}
IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
: new SimpleGlobalIndexFileFilter(partitionToFileIndexInfo);
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
// for each candidate file in partition, that needs to be compared.
for (Tuple2<String, BloomIndexFileInfo> indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo._2(), recordKey)) {
recordComparisons.add(
new Tuple2<>(String.format("%s#%s", indexInfo._2().getFileName(), recordKey),
new Tuple2<>(indexInfo._2().getFileName(),
new HoodieKey(recordKey, indexInfo._1()))));
}
}
}
indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
recordComparisons.add(
new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
new Tuple2<>(matchingFile,
new HoodieKey(recordKey, indexToPartitionMap.get(matchingFile)))));
});
return recordComparisons;
}).flatMapToPair(List::iterator);
}
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import java.io.Serializable;
import java.util.Set;
/**
* IndexFile filter to assist in look up of a record key.
*/
public interface IndexFileFilter extends Serializable {
/**
* Fetches all matching files for a given record key and partition.
*
* @param partitionPath the partition path of interest
* @param recordKey the record key to be looked up
* @return the {@link Set} of matching file names where the record could potentially be present.
*/
Set<String> getMatchingFiles(String partitionPath, String recordKey);
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Interval Tree based index look up for Global Index. Builds an {@link KeyRangeLookupTree} for all index files (across
* all partitions) and uses it to search for matching index files for any given recordKey that needs to be looked up.
*/
class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter {
private final KeyRangeLookupTree indexLookUpTree = new KeyRangeLookupTree();
private final Set<String> filesWithNoRanges = new HashSet<>();
/**
* Instantiates {@link IntervalTreeBasedGlobalIndexFileFilter}
*
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s
*/
IntervalTreeBasedGlobalIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
List<BloomIndexFileInfo> allIndexFiles = partitionToFileIndexInfo.values().stream().flatMap(
indexFiles -> indexFiles.stream()).collect(Collectors.toList());
// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed
// which could result in N search time instead of NlogN.
Collections.shuffle(allIndexFiles);
allIndexFiles.forEach(indexFile -> {
if (indexFile.hasKeyRanges()) {
indexLookUpTree.insert(new KeyRangeNode(indexFile.getMinRecordKey(),
indexFile.getMaxRecordKey(), indexFile.getFileName()));
} else {
filesWithNoRanges.add(indexFile.getFileName());
}
});
}
@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
Set<String> toReturn = new HashSet<>();
toReturn.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey));
filesWithNoRanges.forEach(indexFile -> {
toReturn.add(indexFile);
});
return toReturn;
}
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Interval Tree based index look up. Builds an {@link KeyRangeLookupTree} for every partition and uses it to search for
* matching index files for any given recordKey that needs to be looked up.
*/
class IntervalTreeBasedIndexFileFilter implements IndexFileFilter {
private final Map<String, KeyRangeLookupTree> partitionToFileIndexLookUpTree = new HashMap<>();
private final Map<String, Set<String>> filesWithNoRanges = new HashMap<>();
/**
* Instantiates {@link IntervalTreeBasedIndexFileFilter}
*
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s
*/
IntervalTreeBasedIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
partitionToFileIndexInfo.forEach((partition, bloomIndexFiles) -> {
// Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time.
// So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be
// skewed which could result in N search time instead of logN.
Collections.shuffle(bloomIndexFiles);
KeyRangeLookupTree lookUpTree = new KeyRangeLookupTree();
bloomIndexFiles.forEach(indexFileInfo -> {
if (indexFileInfo.hasKeyRanges()) {
lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(),
indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileName()));
} else {
if (!filesWithNoRanges.containsKey(partition)) {
filesWithNoRanges.put(partition, new HashSet<>());
}
filesWithNoRanges.get(partition).add(indexFileInfo.getFileName());
}
});
partitionToFileIndexLookUpTree.put(partition, lookUpTree);
});
}
@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
Set<String> toReturn = new HashSet<>();
if (partitionToFileIndexLookUpTree
.containsKey(partitionPath)) { // could be null, if there are no files in a given partition yet or if all
// index files has no ranges
partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey)
.forEach(matchingFileName -> {
toReturn.add(matchingFileName);
});
}
if (filesWithNoRanges.containsKey(partitionPath)) {
filesWithNoRanges.get(partitionPath).forEach(fileName -> {
toReturn.add(fileName);
});
}
return toReturn;
}
}

View File

@@ -21,16 +21,14 @@ package com.uber.hoodie.index.bloom;
import java.util.List;
/**
* Encapsulates the result from an index lookup
* Encapsulates the result from a key lookup
*/
public class IndexLookupResult {
public class KeyLookupResult {
private String fileName;
private final String fileName;
private final List<String> matchingRecordKeys;
private List<String> matchingRecordKeys;
public IndexLookupResult(String fileName, List<String> matchingRecordKeys) {
public KeyLookupResult(String fileName, List<String> matchingRecordKeys) {
this.fileName = fileName;
this.matchingRecordKeys = matchingRecordKeys;
}

View File

@@ -0,0 +1,156 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
/**
* Look up tree implemented as interval trees to search for any given key in (N logN) time complexity.
*/
class KeyRangeLookupTree implements Serializable {
private KeyRangeNode root;
/**
* @return the root of the tree. Could be {@code null}
*/
public KeyRangeNode getRoot() {
return root;
}
/**
* Inserts a new {@link KeyRangeNode} to this look up tree.
*
* @param newNode the new {@link KeyRangeNode} to be inserted
*/
void insert(KeyRangeNode newNode) {
root = insert(getRoot(), newNode);
}
/**
* Inserts a new {@link KeyRangeNode} to this look up tree.
*
* If no root exists, make {@code newNode} as the root and return the new root.
*
* If current root and newNode matches with min record key and max record key,
* merge two nodes. In other words, add files from {@code newNode} to current root.
* Return current root.
*
* If current root is < newNode
* if current root has no right sub tree
* update current root's right sub tree max and min
* set newNode as right sub tree
* else
* update root's right sub tree min and max with newNode's min and max record key as applicable
* recursively call insert() with root's right subtree as new root
*
* else // current root is >= newNode
* if current root has no left sub tree
* update current root's left sub tree max and min
* set newNode as left sub tree
* else
* update root's left sub tree min and max with newNode's min and max record key as applicable
* recursively call insert() with root's left subtree as new root
*
* @param root refers to the current root of the look up tree
* @param newNode newNode the new {@link KeyRangeNode} to be inserted
*/
private KeyRangeNode insert(KeyRangeNode root, KeyRangeNode newNode) {
if (root == null) {
root = newNode;
return root;
}
if (root.compareTo(newNode) == 0) {
root.addFiles(newNode.getFileNameList());
return root;
}
if (root.compareTo(newNode) < 0) {
if (root.getRight() == null) {
root.setRightSubTreeMax(newNode.getMaxRecordKey());
root.setRightSubTreeMin(newNode.getMinRecordKey());
root.setRight(newNode);
} else {
if (root.getRightSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) {
root.setRightSubTreeMax(newNode.getMaxRecordKey());
}
if (root.getRightSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) {
root.setRightSubTreeMin(newNode.getMinRecordKey());
}
insert(root.getRight(), newNode);
}
} else {
if (root.getLeft() == null) {
root.setLeftSubTreeMax(newNode.getMaxRecordKey());
root.setLeftSubTreeMin(newNode.getMinRecordKey());
root.setLeft(newNode);
} else {
if (root.getLeftSubTreeMax().compareTo(newNode.getMaxRecordKey()) < 0) {
root.setLeftSubTreeMax(newNode.getMaxRecordKey());
}
if (root.getLeftSubTreeMin().compareTo(newNode.getMinRecordKey()) > 0) {
root.setLeftSubTreeMin(newNode.getMinRecordKey());
}
insert(root.getLeft(), newNode);
}
}
return root;
}
/**
* Fetches all the matching index files where the key could possibly be present.
*
* @param lookupKey the key to be searched for
* @return the {@link Set} of matching index file names
*/
Set<String> getMatchingIndexFiles(String lookupKey) {
Set<String> matchingFileNameSet = new HashSet<>();
getMatchingIndexFiles(getRoot(), lookupKey, matchingFileNameSet);
return matchingFileNameSet;
}
/**
* Fetches all the matching index files where the key could possibly be present.
*
* @param root refers to the current root of the look up tree
* @param lookupKey the key to be searched for
*/
private void getMatchingIndexFiles(KeyRangeNode root, String lookupKey, Set<String> matchingFileNameSet) {
if (root == null) {
return;
}
if (root.getMinRecordKey().compareTo(lookupKey) <= 0 && lookupKey.compareTo(root.getMaxRecordKey()) <= 0) {
matchingFileNameSet.addAll(root.getFileNameList());
}
if (root.getLeftSubTreeMax() != null && root.getLeftSubTreeMin().compareTo(lookupKey) <= 0
&& lookupKey.compareTo(root.getLeftSubTreeMax()) <= 0) {
getMatchingIndexFiles(root.getLeft(), lookupKey, matchingFileNameSet);
}
if (root.getRightSubTreeMax() != null && root.getRightSubTreeMin().compareTo(lookupKey) <= 0
&& lookupKey.compareTo(root.getRightSubTreeMax()) <= 0) {
getMatchingIndexFiles(root.getRight(), lookupKey, matchingFileNameSet);
}
}
}

View File

@@ -0,0 +1,153 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* Represents a node in the {@link KeyRangeLookupTree}. Holds information pertaining to a single index file, viz file
* name, min record key and max record key.
*/
class KeyRangeNode implements Comparable<KeyRangeNode>, Serializable {
private final List<String> fileNameList = new ArrayList<>();
private final String minRecordKey;
private final String maxRecordKey;
private String rightSubTreeMax = null;
private String leftSubTreeMax = null;
private String rightSubTreeMin = null;
private String leftSubTreeMin = null;
private KeyRangeNode left = null;
private KeyRangeNode right = null;
/**
* Instantiates a new {@link KeyRangeNode}
*
* @param minRecordKey min record key of the index file
* @param maxRecordKey max record key of the index file
* @param fileName file name of the index file
*/
KeyRangeNode(String minRecordKey, String maxRecordKey, String fileName) {
this.fileNameList.add(fileName);
this.minRecordKey = minRecordKey;
this.maxRecordKey = maxRecordKey;
}
/**
* Adds a new file name list to existing list of file names.
*
* @param newFiles {@link List} of file names to be added
*/
void addFiles(List<String> newFiles) {
this.fileNameList.addAll(newFiles);
}
@Override
public String toString() {
return "KeyRangeNode{"
+ "minRecordKey='" + minRecordKey + '\''
+ ", maxRecordKey='" + maxRecordKey + '\''
+ ", fileNameList=" + fileNameList
+ ", rightSubTreeMax='" + rightSubTreeMax + '\''
+ ", leftSubTreeMax='" + leftSubTreeMax + '\''
+ ", rightSubTreeMin='" + rightSubTreeMin + '\''
+ ", leftSubTreeMin='" + leftSubTreeMin + '\''
+ '}';
}
/**
* Compares the min record key of two nodes, followed by max record key.
*
* @param that the {@link KeyRangeNode} to be compared with
* @return the result of comparison. 0 if both min and max are equal in both. 1 if this {@link KeyRangeNode} is
* greater than the {@code that} keyRangeNode. -1 if {@code that} keyRangeNode is greater than this {@link
* KeyRangeNode}
*/
@Override
public int compareTo(KeyRangeNode that) {
int compareValue = minRecordKey.compareTo(that.minRecordKey);
if (compareValue == 0) {
return maxRecordKey.compareTo(that.maxRecordKey);
} else {
return compareValue;
}
}
public List<String> getFileNameList() {
return fileNameList;
}
public String getMinRecordKey() {
return minRecordKey;
}
public String getMaxRecordKey() {
return maxRecordKey;
}
public String getRightSubTreeMin() {
return rightSubTreeMin;
}
public void setRightSubTreeMin(String rightSubTreeMin) {
this.rightSubTreeMin = rightSubTreeMin;
}
public String getLeftSubTreeMin() {
return leftSubTreeMin;
}
public void setLeftSubTreeMin(String leftSubTreeMin) {
this.leftSubTreeMin = leftSubTreeMin;
}
public String getRightSubTreeMax() {
return rightSubTreeMax;
}
public void setRightSubTreeMax(String rightSubTreeMax) {
this.rightSubTreeMax = rightSubTreeMax;
}
public String getLeftSubTreeMax() {
return leftSubTreeMax;
}
public void setLeftSubTreeMax(String leftSubTreeMax) {
this.leftSubTreeMax = leftSubTreeMax;
}
public KeyRangeNode getLeft() {
return left;
}
public void setLeft(KeyRangeNode left) {
this.left = left;
}
public KeyRangeNode getRight() {
return right;
}
public void setRight(KeyRangeNode right) {
this.right = right;
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
class SimpleGlobalIndexFileFilter extends SimpleIndexFileFilter {
/**
* Instantiates {@link SimpleGlobalIndexFileFilter}
*
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}
*/
SimpleGlobalIndexFileFilter(
Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
super(partitionToFileIndexInfo);
}
@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
Set<String> toReturn = new HashSet<>();
partitionToFileIndexInfo.values().forEach(indexInfos -> {
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
// for each candidate file in partition, that needs to be compared.
for (BloomIndexFileInfo indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo, recordKey)) {
toReturn.add(indexInfo.getFileName());
}
}
}
});
return toReturn;
}
}

View File

@@ -0,0 +1,65 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Simple implementation of {@link IndexFileFilter}. Sequentially goes through every index file in a given partition to
* search for potential index files to be searched for a given record key.
*/
class SimpleIndexFileFilter implements IndexFileFilter {
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo;
/**
* Instantiates {@link SimpleIndexFileFilter}
*
* @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}
*/
SimpleIndexFileFilter(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo) {
this.partitionToFileIndexInfo = partitionToFileIndexInfo;
}
@Override
public Set<String> getMatchingFiles(String partitionPath, String recordKey) {
List<BloomIndexFileInfo> indexInfos = partitionToFileIndexInfo.get(partitionPath);
Set<String> toReturn = new HashSet<>();
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
// for each candidate file in partition, that needs to be compared.
for (BloomIndexFileInfo indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo, recordKey)) {
toReturn.add(indexInfo.getFileName());
}
}
}
return toReturn;
}
/**
* if we dont have key ranges, then also we need to compare against the file. no other choice if we do, then only
* compare the file if the record key falls in range.
*/
protected boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) {
return !indexInfo.hasKeyRanges() || indexInfo.isKeyInRange(recordKey);
}
}