diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index f34750973..3e4772a7d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -45,13 +45,12 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES = "true"; public static final String BLOOM_INDEX_USE_CACHING_PROP = "hoodie.bloom.index.use.caching"; public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true"; + public static final String BLOOM_INDEX_TREE_BASED_FILTER_PROP = "hoodie.bloom.index.use.treebased.filter"; + public static final String DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER = "true"; public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage" + ".level"; public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; - // ***** Bucketed Index Configs ***** - public static final String BUCKETED_INDEX_NUM_BUCKETS_PROP = "hoodie.index.bucketed.numbuckets"; - private HoodieIndexConfig(Properties props) { super(props); } @@ -114,12 +113,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } - public Builder numBucketsPerPartition(int numBuckets) { - props.setProperty(BUCKETED_INDEX_NUM_BUCKETS_PROP, String.valueOf(numBuckets)); + public Builder bloomIndexTreebasedFilter(boolean useTreeFilter) { + props.setProperty(BLOOM_INDEX_TREE_BASED_FILTER_PROP, String.valueOf(useTreeFilter)); return this; } - public Builder withBloomIndexInputStorageLevel(String level) { props.setProperty(BLOOM_INDEX_INPUT_STORAGE_LEVEL, level); return this; @@ -141,6 +139,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL), BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL); + setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP), + BLOOM_INDEX_TREE_BASED_FILTER_PROP, DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); return config; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 115dd51b3..ab6570f31 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -324,8 +324,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING_PROP)); } - public int getNumBucketsPerPartition() { - return Integer.parseInt(props.getProperty(HoodieIndexConfig.BUCKETED_INDEX_NUM_BUCKETS_PROP)); + public boolean useBloomIndexTreebasedFilter() { + return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER_PROP)); } public StorageLevel getBloomIndexInputStorageLevel() { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index 5e60fade4..9ac8db0b8 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -38,7 +38,6 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.MetadataNotFoundException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; - import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -304,9 +303,9 @@ public class HoodieBloomIndex extends HoodieIndex JavaPairRDD> explodeRecordRDDWithFileComparisons( final Map> partitionToFileIndexInfo, JavaPairRDD partitionRecordKeyPairRDD) { - IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges() + IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) - : new SimpleIndexFileFilter(partitionToFileIndexInfo); + : new ListBasedIndexFileFilter(partitionToFileIndexInfo); return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> { String recordKey = partitionRecordKeyPair._2(); String partitionPath = partitionRecordKeyPair._1(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java index 8f0f5a592..a313d2d91 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java @@ -21,6 +21,7 @@ package com.uber.hoodie.index.bloom; import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.HoodieTimer; import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIndexException; @@ -65,11 +66,12 @@ public class HoodieBloomIndexCheckFunction implements try { // Load all rowKeys from the file, to double-confirm if (!candidateRecordKeys.isEmpty()) { + HoodieTimer timer = new HoodieTimer().startTimer(); Set fileRowKeys = ParquetUtils.filterParquetRowKeys(configuration, filePath, new HashSet<>(candidateRecordKeys)); foundRecordKeys.addAll(fileRowKeys); - logger.info("After checking with row keys, we have " + foundRecordKeys.size() - + " results, for file " + filePath + " => " + foundRecordKeys); + logger.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, + timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size())); if (logger.isDebugEnabled()) { logger.debug("Keys matching for file " + filePath + " => " + foundRecordKeys); } @@ -98,6 +100,8 @@ public class HoodieBloomIndexCheckFunction implements private String currentPartitionPath; + private long totalKeysChecked; + LazyKeyCheckIterator( Iterator>> filePartitionRecordKeyTripletItr) { super(filePartitionRecordKeyTripletItr); @@ -105,6 +109,7 @@ public class HoodieBloomIndexCheckFunction implements candidateRecordKeys = new ArrayList<>(); bloomFilter = null; currentPartitionPath = null; + totalKeysChecked = 0; } @Override @@ -114,16 +119,42 @@ public class HoodieBloomIndexCheckFunction implements private void initState(String fileName, String partitionPath) throws HoodieIndexException { try { Path filePath = new Path(basePath + "/" + partitionPath + "/" + fileName); - bloomFilter = ParquetUtils - .readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath); + HoodieTimer timer = new HoodieTimer().startTimer(); + bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath); + logger.info(String.format("Read bloom filter from %s/%s in %d ms", partitionPath, fileName, timer.endTimer())); candidateRecordKeys = new ArrayList<>(); currentFile = fileName; currentPartitionPath = partitionPath; + totalKeysChecked = 0; } catch (Exception e) { throw new HoodieIndexException("Error checking candidate keys against file.", e); } } + // check record key against bloom filter of current file & add to possible keys if needed + private void checkAndAddCandidates(String recordKey) { + if (bloomFilter.mightContain(recordKey)) { + if (logger.isDebugEnabled()) { + logger.debug("Record key " + recordKey + " matches bloom filter in file " + currentPartitionPath + + "/" + currentFile); + } + candidateRecordKeys.add(recordKey); + } + totalKeysChecked++; + } + + private List checkAgainstCurrentFile() { + Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile); + if (logger.isDebugEnabled()) { + logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys); + } + List matchingKeys = checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath); + logger.info(String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", + totalKeysChecked, candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), + matchingKeys.size())); + return matchingKeys; + } + @Override protected List computeNext() { @@ -131,7 +162,6 @@ public class HoodieBloomIndexCheckFunction implements try { // process one file in each go. while (inputItr.hasNext()) { - Tuple2> currentTuple = inputItr.next(); String fileName = currentTuple._2._1; String partitionPath = currentTuple._2._2.getPartitionPath(); @@ -142,53 +172,22 @@ public class HoodieBloomIndexCheckFunction implements initState(fileName, partitionPath); } - // if continue on current file) + // if continue on current file if (fileName.equals(currentFile)) { - // check record key against bloom filter of current file & add to possible keys if - // needed - if (bloomFilter.mightContain(recordKey)) { - if (logger.isDebugEnabled()) { - logger.debug("#1 Adding " + recordKey + " as candidate for file " + fileName); - } - candidateRecordKeys.add(recordKey); - } + checkAndAddCandidates(recordKey); } else { // do the actual checking of file & break out - Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile); - logger.info( - "#1 After bloom filter, the candidate row keys is reduced to " + candidateRecordKeys - .size() + " for " + filePath); - if (logger.isDebugEnabled()) { - logger - .debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys); - } - ret.add(new KeyLookupResult(currentFile, - checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath))); - + ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile())); initState(fileName, partitionPath); - if (bloomFilter.mightContain(recordKey)) { - if (logger.isDebugEnabled()) { - logger.debug("#2 Adding " + recordKey + " as candidate for file " + fileName); - } - candidateRecordKeys.add(recordKey); - } + checkAndAddCandidates(recordKey); break; } } // handle case, where we ran out of input, close pending work, update return val if (!inputItr.hasNext()) { - Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile); - logger.info( - "#2 After bloom filter, the candidate row keys is reduced to " + candidateRecordKeys - .size() + " for " + filePath); - if (logger.isDebugEnabled()) { - logger.debug("#The candidate row keys for " + filePath + " => " + candidateRecordKeys); - } - ret.add(new KeyLookupResult(currentFile, - checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, filePath))); + ret.add(new KeyLookupResult(currentFile, checkAgainstCurrentFile())); } - } catch (Throwable e) { if (e instanceof HoodieException) { throw e; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java index a366d9135..05a5aa777 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java @@ -26,11 +26,12 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; - import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; - import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; @@ -85,7 +86,7 @@ public class HoodieGlobalBloomIndex extends Hoodi IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo) - : new SimpleGlobalIndexFileFilter(partitionToFileIndexInfo); + : new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo); return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> { String recordKey = partitionRecordKeyPair._2(); String partitionPath = partitionRecordKeyPair._1(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java index 0bb920701..1bcab22a7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java @@ -18,6 +18,7 @@ package com.uber.hoodie.index.bloom; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -40,8 +41,8 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s */ IntervalTreeBasedGlobalIndexFileFilter(final Map> partitionToFileIndexInfo) { - List allIndexFiles = partitionToFileIndexInfo.values().stream().flatMap( - indexFiles -> indexFiles.stream()).collect(Collectors.toList()); + List allIndexFiles = partitionToFileIndexInfo.values().stream().flatMap(Collection::stream) + .collect(Collectors.toList()); // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time. // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed // which could result in N search time instead of NlogN. @@ -60,9 +61,7 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { public Set getMatchingFiles(String partitionPath, String recordKey) { Set toReturn = new HashSet<>(); toReturn.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey)); - filesWithNoRanges.forEach(indexFile -> { - toReturn.add(indexFile); - }); + toReturn.addAll(filesWithNoRanges); return toReturn; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java index 25c4f158e..deb18da76 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java @@ -32,7 +32,7 @@ import java.util.Set; class IntervalTreeBasedIndexFileFilter implements IndexFileFilter { private final Map partitionToFileIndexLookUpTree = new HashMap<>(); - private final Map> filesWithNoRanges = new HashMap<>(); + private final Map> partitionToFilesWithNoRanges = new HashMap<>(); /** * Instantiates {@link IntervalTreeBasedIndexFileFilter} @@ -51,10 +51,10 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter { lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(), indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileName())); } else { - if (!filesWithNoRanges.containsKey(partition)) { - filesWithNoRanges.put(partition, new HashSet<>()); + if (!partitionToFilesWithNoRanges.containsKey(partition)) { + partitionToFilesWithNoRanges.put(partition, new HashSet<>()); } - filesWithNoRanges.get(partition).add(indexFileInfo.getFileName()); + partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileName()); } }); partitionToFileIndexLookUpTree.put(partition, lookUpTree); @@ -64,18 +64,12 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter { @Override public Set getMatchingFiles(String partitionPath, String recordKey) { Set toReturn = new HashSet<>(); - if (partitionToFileIndexLookUpTree - .containsKey(partitionPath)) { // could be null, if there are no files in a given partition yet or if all - // index files has no ranges - partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey) - .forEach(matchingFileName -> { - toReturn.add(matchingFileName); - }); + // could be null, if there are no files in a given partition yet or if all index files have no ranges + if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) { + toReturn.addAll(partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey)); } - if (filesWithNoRanges.containsKey(partitionPath)) { - filesWithNoRanges.get(partitionPath).forEach(fileName -> { - toReturn.add(fileName); - }); + if (partitionToFilesWithNoRanges.containsKey(partitionPath)) { + toReturn.addAll(partitionToFilesWithNoRanges.get(partitionPath)); } return toReturn; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleGlobalIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java similarity index 90% rename from hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleGlobalIndexFileFilter.java rename to hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java index baef30f06..358be4c11 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleGlobalIndexFileFilter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java @@ -23,14 +23,14 @@ import java.util.List; import java.util.Map; import java.util.Set; -class SimpleGlobalIndexFileFilter extends SimpleIndexFileFilter { +class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter { /** - * Instantiates {@link SimpleGlobalIndexFileFilter} + * Instantiates {@link ListBasedGlobalIndexFileFilter} * * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo} */ - SimpleGlobalIndexFileFilter( + ListBasedGlobalIndexFileFilter( Map> partitionToFileIndexInfo) { super(partitionToFileIndexInfo); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleIndexFileFilter.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java similarity index 91% rename from hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleIndexFileFilter.java rename to hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java index 3eb0207e0..eb763fb9c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/SimpleIndexFileFilter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java @@ -27,16 +27,16 @@ import java.util.Set; * Simple implementation of {@link IndexFileFilter}. Sequentially goes through every index file in a given partition to * search for potential index files to be searched for a given record key. */ -class SimpleIndexFileFilter implements IndexFileFilter { +class ListBasedIndexFileFilter implements IndexFileFilter { final Map> partitionToFileIndexInfo; /** - * Instantiates {@link SimpleIndexFileFilter} + * Instantiates {@link ListBasedIndexFileFilter} * * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo} */ - SimpleIndexFileFilter(final Map> partitionToFileIndexInfo) { + ListBasedIndexFileFilter(final Map> partitionToFileIndexInfo) { this.partitionToFileIndexInfo = partitionToFileIndexInfo; }