From 86209640f76d64767a24c344ec0e2b22c2a381e6 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Fri, 14 Jul 2017 09:29:16 -0700 Subject: [PATCH] Adding range based pruning to bloom index - keys compared lexicographically using String::compareTo - Range metadata additionally written into parquet file footers - Trim fat & few optimizations to speed up indexing - Add param to control whether input shall be cached, to speed up lookup - Add param to turn on/off range pruning - Auto compute of parallelism now simply factors in amount of comparisons done - More accurate parallelism computation when range pruning is on - tests added & hardened, docs updated --- docs/configurations.md | 10 +- .../com/uber/hoodie/HoodieReadClient.java | 3 +- .../uber/hoodie/config/HoodieIndexConfig.java | 18 + .../uber/hoodie/config/HoodieWriteConfig.java | 8 + .../uber/hoodie/index/HoodieBloomIndex.java | 347 ---------------- .../com/uber/hoodie/index/HoodieIndex.java | 6 +- .../index/bloom/BloomIndexFileInfo.java | 99 +++++ .../hoodie/index/bloom/HoodieBloomIndex.java | 380 ++++++++++++++++++ .../HoodieBloomIndexCheckFunction.java | 14 +- .../index/{ => bloom}/IndexLookupResult.java | 14 +- .../index/{ => bucketed}/BucketedIndex.java | 4 +- .../hoodie/index/{ => hbase}/HBaseIndex.java | 15 +- .../com/uber/hoodie/TestHoodieClient.java | 3 +- .../com/uber/hoodie/TestMergeOnReadTable.java | 22 +- .../hoodie/common/HoodieClientTestUtils.java | 11 + .../uber/hoodie/index/TestHoodieIndex.java | 3 + .../{ => bloom}/TestHoodieBloomIndex.java | 181 ++++++--- .../uber/hoodie/io/TestHoodieCompactor.java | 11 +- .../hoodie/table/TestCopyOnWriteTable.java | 3 +- .../hoodie/avro/HoodieAvroWriteSupport.java | 25 +- .../hoodie/common/model/HoodieDataFile.java | 3 +- .../hoodie/common/model/HoodieLogFile.java | 3 +- .../uber/hoodie/common/util/ParquetUtils.java | 40 +- .../exception/HoodieIndexException.java | 2 - .../exception/MetadataNotFoundException.java | 32 ++ 25 files changed, 784 insertions(+), 473 deletions(-) delete mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java rename hoodie-client/src/main/java/com/uber/hoodie/index/{ => bloom}/HoodieBloomIndexCheckFunction.java (95%) rename hoodie-client/src/main/java/com/uber/hoodie/index/{ => bloom}/IndexLookupResult.java (70%) rename hoodie-client/src/main/java/com/uber/hoodie/index/{ => bucketed}/BucketedIndex.java (97%) rename hoodie-client/src/main/java/com/uber/hoodie/index/{ => hbase}/HBaseIndex.java (96%) rename hoodie-client/src/test/java/com/uber/hoodie/index/{ => bloom}/TestHoodieBloomIndex.java (76%) create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/exception/MetadataNotFoundException.java diff --git a/docs/configurations.md b/docs/configurations.md index 23c8b8795..0cacdec04 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -29,9 +29,15 @@ summary: "Here we list all possible configurations and what they mean" - [withIndexType](#withIndexType) (indexType = BLOOM)
Type of index to use. Default is Bloom filter. Possible options are [BLOOM | HBASE | INMEMORY]. Bloom filters removes the dependency on a external system and is stored in the footer of the Parquet Data Files - [bloomFilterNumEntries](#bloomFilterNumEntries) (60000)
- Only application if index type is BLOOM.
This is the number of entries to be stored in the bloom filter. We assume the maxParquetFileSize is 128MB and averageRecordSize is 1024B and hence we approx a total of 130K records in a file. The default (60000) is roughly half of this approximation. [#70](https://github.com/uber/hoodie/issues/70) tracks computing this dynamically. Warning: Setting this very low, will generate a lot of false positives and index lookup will have to scan a lot more files than it has to and Setting this to a very high number will increase the size every data file linearly (roughly 4KB for every 50000 entries).
+ Only applies if index type is BLOOM.
This is the number of entries to be stored in the bloom filter. We assume the maxParquetFileSize is 128MB and averageRecordSize is 1024B and hence we approx a total of 130K records in a file. The default (60000) is roughly half of this approximation. [#70](https://github.com/uber/hoodie/issues/70) tracks computing this dynamically. Warning: Setting this very low, will generate a lot of false positives and index lookup will have to scan a lot more files than it has to and Setting this to a very high number will increase the size every data file linearly (roughly 4KB for every 50000 entries).
- [bloomFilterFPP](#bloomFilterFPP) (0.000000001)
- Only application if index type is BLOOM.
Error rate allowed given the number of entries. This is used to calculate how many bits should be assigned for the bloom filter and the number of hash functions. This is usually set very low (default: 0.000000001), we like to tradeoff disk space for lower false positives
+ Only applies if index type is BLOOM.
Error rate allowed given the number of entries. This is used to calculate how many bits should be assigned for the bloom filter and the number of hash functions. This is usually set very low (default: 0.000000001), we like to tradeoff disk space for lower false positives
+ - [bloomIndexPruneByRanges](#bloomIndexPruneByRanges) (true)
+ Only applies if index type is BLOOM.
When true, range information from files to leveraged speed up index lookups. Particularly helpful, if the key has a monotonously increasing prefix, such as timestamp.
+ - [bloomIndexUseCaching](#bloomIndexUseCaching) (true)
+ Only applies if index type is BLOOM.
When true, the input RDD will cached to speed up index lookup by reducing IO for computing parallelism or affected partitions
+ - [bloomIndexParallelism](#bloomIndexParallelism) (0)
+ Only applies if index type is BLOOM.
This is the amount of parallelism for index lookup, which involves a Spark Shuffle. By default, this is auto computed based on input workload characteristics
- [hbaseZkQuorum](#hbaseZkQuorum) (zkString)
Only application if index type is HBASE. HBase ZK Quorum url to connect to. - [hbaseZkPort](#hbaseZkPort) (port)
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index 1123b2d1f..b57bd3285 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -30,7 +30,7 @@ import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; -import com.uber.hoodie.index.HoodieBloomIndex; +import com.uber.hoodie.index.bloom.HoodieBloomIndex; import com.uber.hoodie.table.HoodieTable; @@ -50,7 +50,6 @@ import org.apache.spark.sql.types.StructType; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index e9026e97d..9a5fefbbe 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -42,6 +42,10 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism"; // Disable explicit bloom index parallelism setting by default - hoodie auto computes public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0"; + public static final String BLOOM_INDEX_PRUNE_BY_RANGES_PROP = "hoodie.bloom.index.prune.by.ranges"; + public static final String DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES = "true"; + public static final String BLOOM_INDEX_USE_CACHING_PROP = "hoodie.bloom.index.use.caching"; + public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true"; // ***** HBase Index Configs ***** public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum"; @@ -112,6 +116,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder bloomIndexPruneByRanges(boolean pruneRanges) { + props.setProperty(BLOOM_INDEX_PRUNE_BY_RANGES_PROP, String.valueOf(pruneRanges)); + return this; + } + + public Builder bloomIndexUseCaching(boolean useCaching) { + props.setProperty(BLOOM_INDEX_USE_CACHING_PROP, String.valueOf(useCaching)); + return this; + } + public Builder numBucketsPerPartition(int numBuckets) { props.setProperty(BUCKETED_INDEX_NUM_BUCKETS_PROP, String.valueOf(numBuckets)); return this; @@ -127,6 +141,10 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_PARALLELISM_PROP), BLOOM_INDEX_PARALLELISM_PROP, DEFAULT_BLOOM_INDEX_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_PRUNE_BY_RANGES_PROP), + BLOOM_INDEX_PRUNE_BY_RANGES_PROP, DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES); + setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_USE_CACHING_PROP), + BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); return config; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 6133952d3..08e6a89ab 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -203,6 +203,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP)); } + public boolean getBloomIndexPruneByRanges() { + return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PRUNE_BY_RANGES_PROP)); + } + + public boolean getBloomIndexUseCaching() { + return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING_PROP)); + } + public int getNumBucketsPerPartition() { return Integer.parseInt(props.getProperty(HoodieIndexConfig.BUCKETED_INDEX_NUM_BUCKETS_PROP)); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java deleted file mode 100644 index ef213810c..000000000 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndex.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.uber.hoodie.index; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; - -import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieDataFile; -import com.uber.hoodie.common.model.HoodieKey; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordLocation; -import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.table.timeline.HoodieInstant; -import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.table.HoodieTable; - -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; - -import scala.Tuple2; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in - * its metadata. - */ -public class HoodieBloomIndex extends HoodieIndex { - - private static Logger logger = LogManager.getLogger(HoodieBloomIndex.class); - - // we need to limit the join such that it stays within 1.5GB per Spark partition. (SPARK-1476) - private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1500 * 1024 * 1024; - // this is how much a triplet of (partitionPath, fileId, recordKey) costs. - private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300; - private static int MAX_ITEMS_PER_JOIN_PARTITION = SPARK_MAXIMUM_BYTES_PER_PARTITION / BYTES_PER_PARTITION_FILE_KEY_TRIPLET; - - public HoodieBloomIndex(HoodieWriteConfig config, JavaSparkContext jsc) { - super(config, jsc); - } - - @Override - public JavaRDD> tagLocation(JavaRDD> recordRDD, final HoodieTable hoodieTable) { - - // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) - JavaPairRDD partitionRecordKeyPairRDD = recordRDD - .mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())); - - // Lookup indexes for all the partition/recordkey pair - JavaPairRDD rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, hoodieTable); - - // Cache the result, for subsequent stages. - rowKeyFilenamePairRDD.cache(); - long totalTaggedRecords = rowKeyFilenamePairRDD.count(); - logger.info("Number of update records (ones tagged with a fileID): " + totalTaggedRecords); - - - // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys - // Cost: 4 sec. - return tagLocationBacktoRecords(rowKeyFilenamePairRDD, recordRDD); - } - - public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTable table) { - JavaPairRDD partitionRecordKeyPairRDD = - hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); - - // Lookup indexes for all the partition/recordkey pair - JavaPairRDD rowKeyFilenamePairRDD = - lookupIndex(partitionRecordKeyPairRDD, table); - - JavaPairRDD rowKeyHoodieKeyPairRDD = - hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key)); - - return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD) - .mapToPair(keyPathTuple -> { - Optional recordLocationPath; - if (keyPathTuple._2._2.isPresent()) { - String fileName = keyPathTuple._2._2.get(); - String partitionPath = keyPathTuple._2._1.getPartitionPath(); - recordLocationPath = Optional.of(new Path( - new Path(table.getMetaClient().getBasePath(), partitionPath), - fileName).toUri().getPath()); - } else { - recordLocationPath = Optional.absent(); - } - return new Tuple2<>(keyPathTuple._2._1, recordLocationPath); - }); - } - - /** - * Lookup the location for each record key and return the pair for all - * record keys already present and drop the record keys if not present - */ - private JavaPairRDD lookupIndex( - JavaPairRDD partitionRecordKeyPairRDD, final HoodieTable hoodieTable) { - // Obtain records per partition, in the incoming records - Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); - List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); - - // Step 2: Load all involved files as pairs - JavaPairRDD partitionFilePairRDD = - loadInvolvedFiles(affectedPartitionPathList, hoodieTable); - Map filesPerPartition = partitionFilePairRDD.countByKey(); - - // Compute total subpartitions, to split partitions into. - Map subpartitionCountMap = - computeSubPartitions(recordsPerPartition, filesPerPartition); - - // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, that contains it. - return findMatchingFilesForRecordKeys(partitionFilePairRDD, partitionRecordKeyPairRDD, - subpartitionCountMap); - } - - /** - * The index lookup can be skewed in three dimensions : #files, #partitions, #records - * - * To be able to smoothly handle skews, we need to compute how to split each partitions into - * subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to - * < 2GB. - */ - private Map computeSubPartitions(Map recordsPerPartition, Map filesPerPartition) { - Map subpartitionCountMap = new HashMap<>(); - long totalRecords = 0; - long totalFiles = 0; - - for (String partitionPath : recordsPerPartition.keySet()) { - long numRecords = recordsPerPartition.get(partitionPath); - long numFiles = filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 1L; - subpartitionCountMap.put(partitionPath, ((numFiles * numRecords) / MAX_ITEMS_PER_JOIN_PARTITION) + 1); - - totalFiles += filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 0L; - totalRecords += numRecords; - } - logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles + ", TotalAffectedPartitions:" + recordsPerPartition.size()); - logger.info("Sub Partition Counts : " + subpartitionCountMap); - return subpartitionCountMap; - } - - /** - * Load the input records as in memory. - */ - @VisibleForTesting - Map> getPartitionToRowKeys(JavaRDD> recordRDD) { - // Have to wrap the map into a hashmap becuase of the need to braoadcast (see: http://php.sabscape.com/blog/?p=671) - return recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())) - .groupByKey().collectAsMap(); - } - - /** - * Load all involved files as pair RDD. - */ - @VisibleForTesting - JavaPairRDD loadInvolvedFiles(List partitions, - final HoodieTable hoodieTable) { - return jsc.parallelize(partitions, Math.max(partitions.size(), 1)) - .flatMapToPair(partitionPath -> { - java.util.Optional latestCommitTime = - hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); - List> list = new ArrayList<>(); - if (latestCommitTime.isPresent()) { - List filteredFiles = - hoodieTable.getROFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath, - latestCommitTime.get().getTimestamp()).collect(Collectors.toList()); - for (HoodieDataFile file : filteredFiles) { - list.add(new Tuple2<>(partitionPath, file.getFileName())); - } - } - return list.iterator(); - }); - } - - - @Override - public boolean rollbackCommit(String commitTime) { - // Nope, don't need to do anything. - return true; - } - - - /** - * When we subpartition records going into a partition, we still need to check them against all - * the files within the partition. Thus, we need to explode the (partition, file) pairs to - * (partition_subpartnum, file), so we can later join. - */ - private JavaPairRDD explodePartitionFilePairRDD(JavaPairRDD partitionFilePairRDD, - final Map subpartitionCountMap) { - return partitionFilePairRDD - .map(partitionFilePair -> { - List> explodedPartitionFilePairs = new ArrayList<>(); - for (long l = 0; l < subpartitionCountMap.get(partitionFilePair._1); l++) { - explodedPartitionFilePairs.add(new Tuple2<>( - String.format("%s#%d", partitionFilePair._1, l), - partitionFilePair._2)); - } - return explodedPartitionFilePairs; - }) - .flatMapToPair(exploded -> exploded.iterator()); - } - - /** - * To handle tons of incoming records to a partition, we need to split them into groups or - * create subpartitions. Here, we do a simple hash mod splitting, based on computed sub - * partitions. - */ - private JavaPairRDD splitPartitionRecordKeysPairRDD(JavaPairRDD partitionRecordKeyPairRDD, - final Map subpartitionCountMap) { - return partitionRecordKeyPairRDD - .mapToPair(partitionRecordKeyPair -> { - long subpart = Math.abs(partitionRecordKeyPair._2.hashCode()) % subpartitionCountMap.get(partitionRecordKeyPair._1); - return new Tuple2<>( - String.format("%s#%d", partitionRecordKeyPair._1, subpart), - partitionRecordKeyPair._2); - }); - } - - - /** - * Its crucial to pick the right parallelism. - * - * totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism : - * typically number of input files. - * - * We pick the max such that, we are always safe, but go higher if say a there are a lot of - * input files. (otherwise, we will fallback to number of partitions in input and end up with - * slow performance) - */ - private int determineParallelism(int inputParallelism, final Map subpartitionCountMap) { - // size the join parallelism to max(total number of sub partitions, total number of files). - int totalSubparts = 0; - for (long subparts : subpartitionCountMap.values()) { - totalSubparts += (int) subparts; - } - // If bloom index parallelism is set, use it to to check against the input parallelism and take the max - int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); - int joinParallelism = Math.max(totalSubparts, indexParallelism); - logger.info("InputParallelism: ${" + inputParallelism + "}, " + - "IndexParallelism: ${" + config.getBloomIndexParallelism() + "}, " + - "TotalSubParts: ${" + totalSubparts + "}, " + - "Join Parallelism set to : " + joinParallelism); - return joinParallelism; - } - - - /** - * Find out pair. All workload grouped by file-level. - * - * // Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition - * such that // each RDD partition is a file, then for each file, we do (1) load bloom filter, - * (2) load rowKeys, (3) Tag rowKey // Make sure the parallelism is atleast the groupby - * parallelism for tagging location - */ - private JavaPairRDD findMatchingFilesForRecordKeys(JavaPairRDD partitionFilePairRDD, - JavaPairRDD partitionRecordKeyPairRDD, - final Map subpartitionCountMap) { - - // prepare the two RDDs and their join parallelism - JavaPairRDD subpartitionFilePairRDD = explodePartitionFilePairRDD(partitionFilePairRDD, subpartitionCountMap); - JavaPairRDD subpartitionRecordKeyPairRDD = splitPartitionRecordKeysPairRDD(partitionRecordKeyPairRDD, - subpartitionCountMap); - int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), subpartitionCountMap); - - // Perform a join, to bring all the files in each subpartition ,together with the record keys to be tested against them - JavaPairRDD> joinedTripletRDD = subpartitionFilePairRDD - .join(subpartitionRecordKeyPairRDD, joinParallelism); - - // sort further based on filename, such that all checking for the file can happen within a single partition, on-the-fly - JavaPairRDD> fileSortedTripletRDD = joinedTripletRDD - /** - * Incoming triplet is (partitionPath_subpart) => (file, recordKey) - */ - .mapToPair(joinedTriplet -> { - String partitionPath = joinedTriplet._1.split("#")[0]; // throw away the subpart - String fileName = joinedTriplet._2._1; - String recordKey = joinedTriplet._2._2; - - // make a sort key as #, to handle skews - return new Tuple2<>(String.format("%s#%s", fileName, recordKey), - new Tuple2<>(fileName, new HoodieKey(recordKey, partitionPath))); - }).sortByKey(true, joinParallelism); - - return fileSortedTripletRDD - .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true) - .flatMap(indexLookupResults -> indexLookupResults.iterator()) - .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0) - .flatMapToPair(lookupResult -> { - List> vals = new ArrayList<>(); - for (String recordKey : lookupResult.getMatchingRecordKeys()) { - vals.add(new Tuple2<>(recordKey, lookupResult.getFileName())); - } - return vals.iterator(); - }); - } - - /** - * Tag the back to the original HoodieRecord RDD. - */ - private JavaRDD> tagLocationBacktoRecords(JavaPairRDD rowKeyFilenamePairRDD, - JavaRDD> recordRDD) { - JavaPairRDD> rowKeyRecordPairRDD = recordRDD - .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record)); - - // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join. - return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map( - v1 -> { - HoodieRecord record = v1._1(); - if (v1._2().isPresent()) { - String filename = v1._2().get(); - if (filename != null && !filename.isEmpty()) { - record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), - FSUtils.getFileId(filename))); - } - } - return record; - } - ); - } - - @Override - public JavaRDD updateLocation(JavaRDD writeStatusRDD, HoodieTable hoodieTable) { - return writeStatusRDD; - } -} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java index 0c1bc52bd..b3f260a44 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java @@ -17,8 +17,7 @@ package com.uber.hoodie.index; import com.google.common.base.Optional; -import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.HoodieTimeline; + import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieKey; @@ -26,6 +25,9 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.index.bloom.HoodieBloomIndex; +import com.uber.hoodie.index.bucketed.BucketedIndex; +import com.uber.hoodie.index.hbase.HBaseIndex; import com.uber.hoodie.table.HoodieTable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java new file mode 100644 index 000000000..abe482094 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import com.google.common.base.Objects; + +import java.io.Serializable; + +/** + * Metadata about a given file, useful for index lookup + */ +public class BloomIndexFileInfo implements Serializable { + + private final String fileName; + + private final String minRecordKey; + + private final String maxRecordKey; + + public BloomIndexFileInfo(String fileName, String minRecordKey, String maxRecordKey) { + this.fileName = fileName; + this.minRecordKey = minRecordKey; + this.maxRecordKey = maxRecordKey; + } + + public BloomIndexFileInfo(String fileName) { + this.fileName = fileName; + this.minRecordKey = null; + this.maxRecordKey = null; + } + + public String getFileName() { + return fileName; + } + + public String getMinRecordKey() { + return minRecordKey; + } + + public String getMaxRecordKey() { + return maxRecordKey; + } + + public boolean hasKeyRanges() { + return minRecordKey != null && maxRecordKey != null; + } + + /** + * Does the given key fall within the range (inclusive) + * @param recordKey + * @return + */ + public boolean isKeyInRange(String recordKey) { + return minRecordKey.compareTo(recordKey) <= 0 && + maxRecordKey.compareTo(recordKey) >= 0; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BloomIndexFileInfo that = (BloomIndexFileInfo) o; + return Objects.equal(that.fileName, fileName) && + Objects.equal(that.minRecordKey, minRecordKey) && + Objects.equal(that.maxRecordKey, maxRecordKey); + + } + + @Override + public int hashCode() { + return Objects.hashCode(fileName, minRecordKey, maxRecordKey); + } + + public String toString() { + final StringBuilder sb = new StringBuilder("BloomIndexFileInfo {"); + sb.append(" fileName=").append(fileName); + sb.append(" minRecordKey=").append(minRecordKey); + sb.append(" maxRecordKey=").append(maxRecordKey); + sb.append('}'); + return sb.toString(); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java new file mode 100644 index 000000000..9b6eae768 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -0,0 +1,380 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; + +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieDataFile; +import com.uber.hoodie.common.model.HoodieKey; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordLocation; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.ParquetUtils; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.MetadataNotFoundException; +import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.storage.StorageLevel; + +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.*; + +/** + * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in + * its metadata. + */ +public class HoodieBloomIndex extends HoodieIndex { + + private static Logger logger = LogManager.getLogger(HoodieBloomIndex.class); + + // we need to limit the join such that it stays within 1.5GB per Spark partition. (SPARK-1476) + private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1500 * 1024 * 1024; + // this is how much a triplet of (partitionPath, fileId, recordKey) costs. + private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300; + private static int MAX_ITEMS_PER_SHUFFLE_PARTITION = SPARK_MAXIMUM_BYTES_PER_PARTITION / BYTES_PER_PARTITION_FILE_KEY_TRIPLET; + + public HoodieBloomIndex(HoodieWriteConfig config, JavaSparkContext jsc) { + super(config, jsc); + } + + @Override + public JavaRDD> tagLocation(JavaRDD> recordRDD, final HoodieTable hoodieTable) { + + // Step 0: cache the input record RDD + if (config.getBloomIndexUseCaching()) { + recordRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); + } + + // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) + JavaPairRDD partitionRecordKeyPairRDD = recordRDD + .mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())); + + // Lookup indexes for all the partition/recordkey pair + JavaPairRDD rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, hoodieTable); + + // Cache the result, for subsequent stages. + if (config.getBloomIndexUseCaching()) { + rowKeyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); + } + if (logger.isDebugEnabled()) { + long totalTaggedRecords = rowKeyFilenamePairRDD.count(); + logger.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords); + } + + // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys + // Cost: 4 sec. + JavaRDD> taggedRecordRDD = tagLocationBacktoRecords(rowKeyFilenamePairRDD, recordRDD); + + if (config.getBloomIndexUseCaching()) { + recordRDD.unpersist(); // unpersist the input Record RDD + rowKeyFilenamePairRDD.unpersist(); + } + + return taggedRecordRDD; + } + + public JavaPairRDD> fetchRecordLocation( + JavaRDD hoodieKeys, final HoodieTable table) { + JavaPairRDD partitionRecordKeyPairRDD = + hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); + + // Lookup indexes for all the partition/recordkey pair + JavaPairRDD rowKeyFilenamePairRDD = + lookupIndex(partitionRecordKeyPairRDD, table); + + JavaPairRDD rowKeyHoodieKeyPairRDD = + hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key)); + + return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD) + .mapToPair(keyPathTuple -> { + Optional recordLocationPath; + if (keyPathTuple._2._2.isPresent()) { + String fileName = keyPathTuple._2._2.get(); + String partitionPath = keyPathTuple._2._1.getPartitionPath(); + recordLocationPath = Optional.of(new Path( + new Path(table.getMetaClient().getBasePath(), partitionPath), + fileName).toUri().getPath()); + } else { + recordLocationPath = Optional.absent(); + } + return new Tuple2<>(keyPathTuple._2._1, recordLocationPath); + }); + } + + /** + * Lookup the location for each record key and return the pair for all + * record keys already present and drop the record keys if not present + */ + private JavaPairRDD lookupIndex( + JavaPairRDD partitionRecordKeyPairRDD, final HoodieTable hoodieTable) { + // Obtain records per partition, in the incoming records + Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); + List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); + + // Step 2: Load all involved files as pairs + List> fileInfoList = loadInvolvedFiles(affectedPartitionPathList, hoodieTable); + final Map> partitionToFileInfo = fileInfoList.stream() + .collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList()))); + + // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, that contains it. + int parallelism = autoComputeParallelism(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD); + return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, parallelism); + } + + /** + * The index lookup can be skewed in three dimensions : #files, #partitions, #records + * + * To be able to smoothly handle skews, we need to compute how to split each partitions into + * subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to + * < 2GB. + * + * If {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is specified as a NON-zero number, + * then that is used explicitly. + * + */ + private int autoComputeParallelism(final Map recordsPerPartition, + final Map> partitionToFileInfo, + JavaPairRDD partitionRecordKeyPairRDD) { + + long totalComparisons = 0; + if (config.getBloomIndexPruneByRanges()) { + // we will just try exploding the input and then count to determine comparisons + totalComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD).count(); + } else { + // if not pruning by ranges, then each file in a partition needs to compared against all + // records for a partition. + Map filesPerPartition = partitionToFileInfo.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Long.valueOf(e.getValue().size()))); + long totalFiles = 0, totalRecords = 0; + for (String partitionPath : recordsPerPartition.keySet()) { + long numRecords = recordsPerPartition.get(partitionPath); + long numFiles = filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 1L; + + totalComparisons += numFiles * numRecords; + totalFiles += filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 0L; + totalRecords += numRecords; + } + logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles + ", TotalAffectedPartitions:" + recordsPerPartition.size()); + } + + // each partition will have an item per comparison. + int parallelism = (int) (totalComparisons/ MAX_ITEMS_PER_SHUFFLE_PARTITION + 1); + logger.info("Auto computed parallelism :" + parallelism + ", totalComparisons: " + totalComparisons); + return parallelism; + } + + /** + * Its crucial to pick the right parallelism. + * + * totalSubPartitions : this is deemed safe limit, to be nice with Spark. + * inputParallelism : typically number of input file splits + * + * We pick the max such that, we are always safe, but go higher if say a there are a lot of + * input files. (otherwise, we will fallback to number of partitions in input and end up with + * slow performance) + */ + private int determineParallelism(int inputParallelism, int totalSubPartitions) { + // If bloom index parallelism is set, use it to to check against the input parallelism and take the max + int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); + int joinParallelism = Math.max(totalSubPartitions, indexParallelism); + logger.info("InputParallelism: ${" + inputParallelism + "}, " + + "IndexParallelism: ${" + config.getBloomIndexParallelism() + "}, " + + "TotalSubParts: ${" + totalSubPartitions + "}, " + + "Join Parallelism set to : " + joinParallelism); + return joinParallelism; + } + + /** + * Load all involved files as pair RDD. + */ + @VisibleForTesting + List> loadInvolvedFiles(List partitions, final HoodieTable hoodieTable) { + // Obtain the latest data files from all the partitions. + List> dataFilesList = jsc.parallelize(partitions, Math.max(partitions.size(), 1)) + .flatMapToPair(partitionPath -> { + java.util.Optional latestCommitTime = + hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); + List> filteredFiles = new ArrayList<>(); + if (latestCommitTime.isPresent()) { + filteredFiles = + hoodieTable.getROFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath, + latestCommitTime.get().getTimestamp()) + .map(f -> new Tuple2<>(partitionPath, f)) + .collect(toList()); + } + return filteredFiles.iterator(); + }).collect(); + + if (config.getBloomIndexPruneByRanges()) { + // also obtain file ranges, if range pruning is enabled + return jsc.parallelize(dataFilesList, Math.max(dataFilesList.size(), 1)) + .mapToPair(ft -> { + try { + String[] minMaxKeys = ParquetUtils.readMinMaxRecordKeys(ft._2().getFileStatus().getPath()); + return new Tuple2<>(ft._1(), new BloomIndexFileInfo(ft._2().getFileName(), minMaxKeys[0], minMaxKeys[1])); + } catch (MetadataNotFoundException me) { + logger.warn("Unable to find range metadata in file :" + ft._2()); + return new Tuple2<>(ft._1(), new BloomIndexFileInfo(ft._2().getFileName())); + } + }).collect(); + } else { + return dataFilesList.stream() + .map(ft -> new Tuple2<>(ft._1(), new BloomIndexFileInfo(ft._2().getFileName()))) + .collect(toList()); + } + } + + + @Override + public boolean rollbackCommit(String commitTime) { + // Nope, don't need to do anything. + return true; + } + + /** + * if we dont have key ranges, then also we need to compare against the file. no other choice + * if we do, then only compare the file if the record key falls in range. + + * @param indexInfo + * @param recordKey + * @return + */ + private boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) { + return !indexInfo.hasKeyRanges() || indexInfo.isKeyInRange(recordKey); + } + + + /** + * For each incoming record, produce N output records, 1 each for each file against which the record's key + * needs to be checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), + * the number of files to be compared gets cut down a lot from range pruning. + * + * + * @param partitionToFileIndexInfo + * @param partitionRecordKeyPairRDD + * @return + */ + // sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on recordKey + // ranges in the index info. + @VisibleForTesting + JavaPairRDD> explodeRecordRDDWithFileComparisons(final Map> partitionToFileIndexInfo, + JavaPairRDD partitionRecordKeyPairRDD) { + return partitionRecordKeyPairRDD + .map(partitionRecordKeyPair -> { + String recordKey = partitionRecordKeyPair._2(); + String partitionPath = partitionRecordKeyPair._1(); + + List indexInfos = partitionToFileIndexInfo.get(partitionPath); + List>> recordComparisons = new ArrayList<>(); + if (indexInfos != null) { // could be null, if there are no files in a given partition yet. + // for each candidate file in partition, that needs to be compared. + for (BloomIndexFileInfo indexInfo : indexInfos) { + if (shouldCompareWithFile(indexInfo, recordKey)) { + recordComparisons.add( + new Tuple2<>(String.format("%s#%s", indexInfo.getFileName(), recordKey), + new Tuple2<>(indexInfo.getFileName(), new HoodieKey(recordKey, partitionPath)))); + } + } + } + return recordComparisons; + }) + .flatMapToPair(t -> t.iterator()); + } + + /** + * Find out pair. All workload grouped by file-level. + * + * Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition + * such that each RDD partition is a file, then for each file, we do + * (1) load bloom filter, + * (2) load rowKeys, + * (3) Tag rowKey + * + * Make sure the parallelism is atleast the groupby parallelism for tagging location + */ + @VisibleForTesting + JavaPairRDD findMatchingFilesForRecordKeys(final Map> partitionToFileIndexInfo, + JavaPairRDD partitionRecordKeyPairRDD, + int totalSubpartitions) { + + int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), totalSubpartitions); + + JavaPairRDD> fileSortedTripletRDD = explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD) + // sort further based on filename, such that all checking for the file can happen within a single partition, on-the-fly + .sortByKey(true, joinParallelism); + + return fileSortedTripletRDD + .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true) + .flatMap(indexLookupResults -> indexLookupResults.iterator()) + .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0) + .flatMapToPair(lookupResult -> { + List> vals = new ArrayList<>(); + for (String recordKey : lookupResult.getMatchingRecordKeys()) { + vals.add(new Tuple2<>(recordKey, lookupResult.getFileName())); + } + return vals.iterator(); + }); + } + + /** + * Tag the back to the original HoodieRecord RDD. + */ + private JavaRDD> tagLocationBacktoRecords(JavaPairRDD rowKeyFilenamePairRDD, + JavaRDD> recordRDD) { + JavaPairRDD> rowKeyRecordPairRDD = recordRDD + .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record)); + + // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join. + return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map( + v1 -> { + HoodieRecord record = v1._1(); + if (v1._2().isPresent()) { + String filename = v1._2().get(); + if (filename != null && !filename.isEmpty()) { + record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), + FSUtils.getFileId(filename))); + } + } + return record; + } + ); + } + + @Override + public JavaRDD updateLocation(JavaRDD writeStatusRDD, HoodieTable hoodieTable) { + return writeStatusRDD; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndexCheckFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java similarity index 95% rename from hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndexCheckFunction.java rename to hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java index 28334f243..9eb3c8996 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieBloomIndexCheckFunction.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java @@ -1,20 +1,22 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package com.uber.hoodie.index; +package com.uber.hoodie.index.bloom; import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.model.HoodieKey; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/IndexLookupResult.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexLookupResult.java similarity index 70% rename from hoodie-client/src/main/java/com/uber/hoodie/index/IndexLookupResult.java rename to hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexLookupResult.java index 7f9666d78..23a89b945 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/IndexLookupResult.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IndexLookupResult.java @@ -1,20 +1,22 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package com.uber.hoodie.index; +package com.uber.hoodie.index.bloom; import java.util.List; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/BucketedIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bucketed/BucketedIndex.java similarity index 97% rename from hoodie-client/src/main/java/com/uber/hoodie/index/BucketedIndex.java rename to hoodie-client/src/main/java/com/uber/hoodie/index/bucketed/BucketedIndex.java index 6152b0407..5dc697fa1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/BucketedIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bucketed/BucketedIndex.java @@ -16,7 +16,7 @@ * */ -package com.uber.hoodie.index; +package com.uber.hoodie.index.bucketed; import com.google.common.base.Optional; @@ -25,9 +25,9 @@ import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; -import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import org.apache.log4j.LogManager; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java similarity index 96% rename from hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java rename to hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java index 83ad5652e..03ac1438f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java @@ -1,20 +1,22 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package com.uber.hoodie.index; +package com.uber.hoodie.index.hbase; import com.google.common.base.Optional; import com.uber.hoodie.common.table.HoodieTimeline; @@ -29,6 +31,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException; import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index ef013622e..fadb9255f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -91,8 +91,7 @@ public class TestHoodieClient implements Serializable { @Before public void init() throws IOException { // Initialize a local spark env - SparkConf sparkConf = new SparkConf().setAppName("TestHoodieClient").setMaster("local[4]"); - jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieClient")); //SQLContext stuff sqlContext = new SQLContext(jsc); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java index 63ac22f3d..fcc281043 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java @@ -16,6 +16,7 @@ package com.uber.hoodie; +import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieMergeOnReadTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.minicluster.HdfsTestService; @@ -30,32 +31,21 @@ import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.table.HoodieTable; -import org.apache.avro.Schema; + import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; @@ -68,8 +58,6 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -77,7 +65,6 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; -import static com.uber.hoodie.common.HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat; import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -126,10 +113,7 @@ public class TestMergeOnReadTable { this.fs = FSUtils.getFs(); // Initialize a local spark env - SparkConf sparkConf = - new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .setAppName("TestHoodieCompactor").setMaster("local[1]"); - jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeOnReadTable")); jsc.hadoopConfiguration().addResource(FSUtils.getFs().getConf()); // Create a temp folder as the base path diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java index 0f43910ab..5b0158565 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java @@ -16,12 +16,15 @@ package com.uber.hoodie.common; +import com.uber.hoodie.HoodieReadClient; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.util.FSUtils; +import org.apache.spark.SparkConf; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -79,4 +82,12 @@ public class HoodieClientTestUtils { new File(path).createNewFile(); new RandomAccessFile(path, "rw").setLength(length); } + + public static SparkConf getSparkConfForTest(String appName) { + SparkConf sparkConf = new SparkConf() + .setAppName(appName) + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .setMaster("local[4]"); + return HoodieReadClient.addHoodieSupport(sparkConf); + } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java index 07ebff5ad..de9c2d368 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieIndex.java @@ -19,6 +19,9 @@ package com.uber.hoodie.index; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.index.bloom.HoodieBloomIndex; +import com.uber.hoodie.index.hbase.HBaseIndex; + import org.junit.Test; import static org.junit.Assert.*; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java similarity index 76% rename from hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java rename to hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index 0cf66657c..1a49b5953 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -1,25 +1,29 @@ /* - * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package com.uber.hoodie.index; +package com.uber.hoodie.index.bloom; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.avro.HoodieAvroWriteSupport; import com.uber.hoodie.common.BloomFilter; @@ -30,14 +34,22 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.index.bloom.BloomIndexFileInfo; +import com.uber.hoodie.index.bloom.HoodieBloomIndex; +import com.uber.hoodie.index.bloom.HoodieBloomIndexCheckFunction; +import com.uber.hoodie.io.storage.HoodieParquetConfig; +import com.uber.hoodie.io.storage.HoodieParquetWriter; import com.uber.hoodie.table.HoodieTable; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -47,6 +59,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; import scala.Tuple2; @@ -54,6 +67,8 @@ import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.*; @@ -61,6 +76,8 @@ public class TestHoodieBloomIndex { private JavaSparkContext jsc = null; private String basePath = null; private transient final FileSystem fs; + private String schemaStr; + private Schema schema; public TestHoodieBloomIndex() throws Exception { fs = FSUtils.getFs(); @@ -69,13 +86,15 @@ public class TestHoodieBloomIndex { @Before public void init() throws IOException { // Initialize a local spark env - SparkConf sparkConf = new SparkConf().setAppName("TestHoodieBloomIndex").setMaster("local[4]"); - jsc = new JavaSparkContext(sparkConf); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieBloomIndex")); // Create a temp folder as the base path TemporaryFolder folder = new TemporaryFolder(); folder.create(); basePath = folder.getRoot().getAbsolutePath(); HoodieTestUtils.init(basePath); + // We have some records to be tagged (two different partitions) + schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); + schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); } @Test @@ -86,8 +105,6 @@ public class TestHoodieBloomIndex { String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; - String schemaStr = - IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); @@ -101,8 +118,10 @@ public class TestHoodieBloomIndex { // Load to memory HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); - HoodieBloomIndex index = new HoodieBloomIndex(config, jsc); - Map> map = index.getPartitionToRowKeys(recordRDD); + + Map> map = recordRDD + .mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())) + .groupByKey().collectAsMap(); assertEquals(map.size(), 2); List list1 = Lists.newArrayList(map.get("2016/01/31")); List list2 = Lists.newArrayList(map.get("2015/01/31")); @@ -112,7 +131,9 @@ public class TestHoodieBloomIndex { @Test public void testLoadInvolvedFiles() throws IOException { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .build(); HoodieBloomIndex index = new HoodieBloomIndex(config, jsc); // Create some partitions, and put some files @@ -122,51 +143,105 @@ public class TestHoodieBloomIndex { new File(basePath + "/2016/01/21").mkdirs(); new File(basePath + "/2016/04/01").mkdirs(); new File(basePath + "/2015/03/12").mkdirs(); - new File(basePath + "/2016/04/01/2_0_20160401010101.parquet").createNewFile(); - new File(basePath + "/2015/03/12/1_0_20150312101010.parquet").createNewFile(); - new File(basePath + "/2015/03/12/3_0_20150312101010.parquet").createNewFile(); - new File(basePath + "/2015/03/12/4_0_20150312101010.parquet").createNewFile(); + + TestRawTripPayload rowChange1 = new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); + TestRawTripPayload rowChange2 = new TestRawTripPayload("{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); + TestRawTripPayload rowChange3 = new TestRawTripPayload("{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); + TestRawTripPayload rowChange4 = new TestRawTripPayload("{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); + + + writeParquetFile("2016/04/01","2_0_20160401010101.parquet", Lists.newArrayList(), schema, null, false); + writeParquetFile("2015/03/12","1_0_20150312101010.parquet", Lists.newArrayList(), schema, null, false); + writeParquetFile("2015/03/12","3_0_20150312101010.parquet", Arrays.asList(record1), schema, null, false); + writeParquetFile("2015/03/12","4_0_20150312101010.parquet", Arrays.asList(record2, record3, record4), schema, null, false); + List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metadata, config); - JavaPairRDD rdd = index.loadInvolvedFiles(partitions, table); + List> filesList = index.loadInvolvedFiles(partitions, table); // Still 0, as no valid commit - assertEquals(rdd.count(), 0); + assertEquals(filesList.size(), 0); // Add some commits new File(basePath + "/.hoodie").mkdirs(); new File(basePath + "/.hoodie/20160401010101.commit").createNewFile(); new File(basePath + "/.hoodie/20150312101010.commit").createNewFile(); - metadata = new HoodieTableMetaClient(fs, basePath); - rdd = index.loadInvolvedFiles(partitions, table); - final List> filesList = rdd.collect(); + + filesList = index.loadInvolvedFiles(partitions, table); assertEquals(filesList.size(), 4); + // these files will not have the key ranges + assertNull(filesList.get(0)._2().getMaxRecordKey()); + assertNull(filesList.get(0)._2().getMinRecordKey()); + assertFalse(filesList.get(1)._2().hasKeyRanges()); + assertNotNull(filesList.get(2)._2().getMaxRecordKey()); + assertNotNull(filesList.get(2)._2().getMinRecordKey()); + assertTrue(filesList.get(3)._2().hasKeyRanges()); // no longer sorted, but should have same files. - Set actualFiles = new HashSet(){{ - add(filesList.get(0)._1 + "/" + filesList.get(0)._2); - add(filesList.get(1)._1 + "/" + filesList.get(1)._2); - add(filesList.get(2)._1 + "/" + filesList.get(2)._2); - add(filesList.get(3)._1 + "/" + filesList.get(3)._2); - }}; - Set expected = new HashSet() {{ - add("2016/04/01/2_0_20160401010101.parquet"); - add("2015/03/12/1_0_20150312101010.parquet"); - add("2015/03/12/3_0_20150312101010.parquet"); - add("2015/03/12/4_0_20150312101010.parquet"); - }}; - assertEquals(expected, actualFiles); + List> expected = Arrays.asList( + new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2_0_20160401010101.parquet")), + new Tuple2<>("2015/03/12",new BloomIndexFileInfo("1_0_20150312101010.parquet")), + new Tuple2<>("2015/03/12",new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")), + new Tuple2<>("2015/03/12",new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003")) + ); + assertEquals(expected, filesList); + } + + @Test + public void testRangePruning() { + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .build(); + HoodieBloomIndex index = new HoodieBloomIndex(config, jsc); + + + final Map> partitionToFileIndexInfo = new HashMap<>(); + partitionToFileIndexInfo.put("2017/10/22", Arrays.asList( + new BloomIndexFileInfo("f1"), + new BloomIndexFileInfo("f2", "000", "000"), + new BloomIndexFileInfo("f3", "001", "003"), + new BloomIndexFileInfo("f4", "002", "007"), + new BloomIndexFileInfo("f5", "009", "010") + )); + + JavaPairRDD partitionRecordKeyPairRDD = jsc + .parallelize(Arrays.asList( + new Tuple2<>("2017/10/22","003"), + new Tuple2<>("2017/10/22","002"), + new Tuple2<>("2017/10/22","005"), + new Tuple2<>("2017/10/22","004") + )) + .mapToPair(t -> t); + + + List>> comparisonKeyList = index + .explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD) + .collect(); + + assertEquals(10, comparisonKeyList.size()); + Map> recordKeyToFileComps = comparisonKeyList.stream() + .collect(Collectors.groupingBy( + t -> t._2()._2().getRecordKey(), + Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList() + ) + )); + + assertEquals(4, recordKeyToFileComps.size()); + assertEquals(Arrays.asList("f1", "f3", "f4"), recordKeyToFileComps.get("002")); + assertEquals(Arrays.asList("f1", "f3", "f4"), recordKeyToFileComps.get("003")); + assertEquals(Arrays.asList("f1", "f4"), recordKeyToFileComps.get("004")); + assertEquals(Arrays.asList("f1", "f4"), recordKeyToFileComps.get("005")); } @Test public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedException, ClassNotFoundException { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); - HoodieBloomIndex index = new HoodieBloomIndex(config, jsc); - String schemaStr = - IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); - Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); // Create some records to use String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; @@ -232,9 +307,6 @@ public class TestHoodieBloomIndex { @Test public void testTagLocation() throws Exception { // We have some records to be tagged (two different partitions) - String schemaStr = - IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); - Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; @@ -292,9 +364,6 @@ public class TestHoodieBloomIndex { @Test public void testCheckExists() throws Exception { // We have some records to be tagged (two different partitions) - String schemaStr = - IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); - Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; @@ -366,10 +435,6 @@ public class TestHoodieBloomIndex { String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; // We write record1 to a parquet file, using a bloom filter having both records - String schemaStr = - IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); - Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); - TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); @@ -407,24 +472,32 @@ public class TestHoodieBloomIndex { String fileId = UUID.randomUUID().toString(); String filename = FSUtils.makeDataFileName(commitTime, 1, fileId); + return writeParquetFile(partitionPath, filename, records, schema, filter, createCommitTime); } private String writeParquetFile(String partitionPath, String filename, List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { + + if (filter == null) { filter = new BloomFilter(10000, 0.0000001); } HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); - ParquetWriter writer = new ParquetWriter(new Path(basePath + "/" + partitionPath + "/" + filename), - writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE); - int seqId = 1; String commitTime = FSUtils.getCommitTime(filename); + HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, + ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, new Configuration()); + HoodieParquetWriter writer = new HoodieParquetWriter( + commitTime, + new Path(basePath + "/" + partitionPath + "/" + filename), + config, + schema); + int seqId = 1; for (HoodieRecord record : records) { GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++); HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename); - writer.write(avroRecord); + writer.writeAvro(record.getRecordKey(), avroRecord); filter.add(record.getRecordKey()); } writer.close(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 1150e2e07..1ff8c300f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -19,23 +19,22 @@ package com.uber.hoodie.io; import com.uber.hoodie.HoodieReadClient; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieCompactionMetadata; -import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.index.HoodieBloomIndex; +import com.uber.hoodie.index.bloom.HoodieBloomIndex; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; @@ -52,9 +51,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -68,9 +65,7 @@ public class TestHoodieCompactor { @Before public void init() throws IOException { // Initialize a local spark env - SparkConf sparkConf = - new SparkConf().setAppName("TestHoodieCompactor").setMaster("local[4]"); - jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf)); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieCompactor")); // Create a temp folder as the base path TemporaryFolder folder = new TemporaryFolder(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index 7c0d1e267..f01c4c801 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -69,8 +69,7 @@ public class TestCopyOnWriteTable { public void init() throws Exception { // Initialize a local spark env - SparkConf sparkConf = new SparkConf().setAppName("TestCopyOnWriteTable").setMaster("local[4]"); - jsc = new JavaSparkContext(sparkConf); + jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestCopyOnWriteTable")); // Create a temp folder as the base path TemporaryFolder folder = new TemporaryFolder(); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/avro/HoodieAvroWriteSupport.java b/hoodie-common/src/main/java/com/uber/hoodie/avro/HoodieAvroWriteSupport.java index 6fb7a9b5f..dd3318228 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/avro/HoodieAvroWriteSupport.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/avro/HoodieAvroWriteSupport.java @@ -30,24 +30,47 @@ import java.util.HashMap; */ public class HoodieAvroWriteSupport extends AvroWriteSupport { private BloomFilter bloomFilter; + private String minRecordKey; + private String maxRecordKey; + + public final static String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "com.uber.hoodie.bloomfilter"; + public final static String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key"; + public final static String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key"; + public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, BloomFilter bloomFilter) { super(schema, avroSchema); this.bloomFilter = bloomFilter; } - @Override public WriteSupport.FinalizedWriteContext finalizeWrite() { + @Override + public WriteSupport.FinalizedWriteContext finalizeWrite() { HashMap extraMetaData = new HashMap<>(); if (bloomFilter != null) { extraMetaData .put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString()); + if (minRecordKey != null && maxRecordKey != null) { + extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey); + extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey); + } } return new WriteSupport.FinalizedWriteContext(extraMetaData); } public void add(String recordKey) { this.bloomFilter.add(recordKey); + if (minRecordKey != null) { + minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey; + } else { + minRecordKey = recordKey; + } + + if (maxRecordKey != null) { + maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey; + } else { + maxRecordKey = recordKey; + } } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieDataFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieDataFile.java index 55114aa12..adf2f09d0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieDataFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieDataFile.java @@ -21,9 +21,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import java.io.Serializable; import java.util.Comparator; -public class HoodieDataFile { +public class HoodieDataFile implements Serializable { private FileStatus fileStatus; public HoodieDataFile(FileStatus fileStatus) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java index fd4f7d806..d1cb636d0 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.io.Serializable; import java.util.Comparator; import java.util.Optional; @@ -32,7 +33,7 @@ import java.util.Optional; * * Also contains logic to roll-over the log file */ -public class HoodieLogFile { +public class HoodieLogFile implements Serializable { public static final String DELTA_EXTENSION = ".log"; private final Path path; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java index 933de8c53..541d60b83 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java @@ -20,8 +20,10 @@ import com.uber.hoodie.avro.HoodieAvroWriteSupport; import com.uber.hoodie.common.BloomFilter; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; -import com.uber.hoodie.exception.HoodieIndexException; +import com.uber.hoodie.exception.MetadataNotFoundException; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; @@ -120,22 +122,40 @@ public class ParquetUtils { } + private static List readParquetFooter(Path parquetFilePath, String... footerNames) { + List footerVals = new ArrayList<>(); + ParquetMetadata footer = readMetadata(parquetFilePath); + Map metadata = footer.getFileMetaData().getKeyValueMetaData(); + for (String footerName: footerNames) { + if (metadata.containsKey(footerName)) { + footerVals.add(metadata.get(footerName)); + } else { + throw new MetadataNotFoundException("Could not find index in Parquet footer. " + + "Looked for key " + footerName + " in " + parquetFilePath); + } + } + return footerVals; + } /** * Read out the bloom filter from the parquet file meta data. */ public static BloomFilter readBloomFilterFromParquetMetadata(Path parquetFilePath) { - ParquetMetadata footer = readMetadata(parquetFilePath); - Map metadata = footer.getFileMetaData().getKeyValueMetaData(); - if (metadata.containsKey(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)) { - return new BloomFilter(metadata.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)); - } else { - throw new HoodieIndexException("Could not find index in Parquet footer. Looked for key " - + HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY + " in " - + parquetFilePath); - } + String footerVal = readParquetFooter(parquetFilePath, + HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY).get(0); + return new BloomFilter(footerVal); } + public static String[] readMinMaxRecordKeys(Path parquetFilePath) { + List minMaxKeys = readParquetFooter(parquetFilePath, HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, + HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER); + if (minMaxKeys.size() != 2) { + throw new HoodieException(String.format( + "Could not read min/max record key out of footer correctly from %s. read) : %s", + parquetFilePath, minMaxKeys)); + } + return new String[]{minMaxKeys.get(0), minMaxKeys.get(1)}; + } /** * diff --git a/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieIndexException.java b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieIndexException.java index 3d030cb14..93da5b9d8 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieIndexException.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/exception/HoodieIndexException.java @@ -16,8 +16,6 @@ package com.uber.hoodie.exception; -import java.io.IOException; - /** *

* Exception thrown for HoodieIndex related errors. diff --git a/hoodie-common/src/main/java/com/uber/hoodie/exception/MetadataNotFoundException.java b/hoodie-common/src/main/java/com/uber/hoodie/exception/MetadataNotFoundException.java new file mode 100644 index 000000000..8be9ff401 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/exception/MetadataNotFoundException.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.exception; + +/** + * Thrown when expected metadata is not found + */ +public class MetadataNotFoundException extends HoodieException { + public MetadataNotFoundException(String msg) { + super(msg); + } + + public MetadataNotFoundException(String msg, Throwable e) { + super(msg, e); + } +}