1
0

Adding range based pruning to bloom index

- keys compared lexicographically using String::compareTo
 - Range metadata additionally written into parquet file footers
 - Trim fat & few optimizations to speed up indexing
 - Add param to control whether input shall be cached, to speed up lookup
 - Add param to turn on/off range pruning
 - Auto compute of parallelism now simply factors in amount of comparisons done
 - More accurate parallelism computation when range pruning is on
 - tests added & hardened, docs updated
This commit is contained in:
Vinoth Chandar
2017-07-14 09:29:16 -07:00
committed by prazanna
parent 0b26b60a5c
commit 86209640f7
25 changed files with 784 additions and 473 deletions

View File

@@ -30,7 +30,7 @@ import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.index.HoodieBloomIndex;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.table.HoodieTable;
@@ -50,7 +50,6 @@ import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

View File

@@ -42,6 +42,10 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism";
// Disable explicit bloom index parallelism setting by default - hoodie auto computes
public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0";
public static final String BLOOM_INDEX_PRUNE_BY_RANGES_PROP = "hoodie.bloom.index.prune.by.ranges";
public static final String DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES = "true";
public static final String BLOOM_INDEX_USE_CACHING_PROP = "hoodie.bloom.index.use.caching";
public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true";
// ***** HBase Index Configs *****
public final static String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
@@ -112,6 +116,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
public Builder bloomIndexPruneByRanges(boolean pruneRanges) {
props.setProperty(BLOOM_INDEX_PRUNE_BY_RANGES_PROP, String.valueOf(pruneRanges));
return this;
}
public Builder bloomIndexUseCaching(boolean useCaching) {
props.setProperty(BLOOM_INDEX_USE_CACHING_PROP, String.valueOf(useCaching));
return this;
}
public Builder numBucketsPerPartition(int numBuckets) {
props.setProperty(BUCKETED_INDEX_NUM_BUCKETS_PROP, String.valueOf(numBuckets));
return this;
@@ -127,6 +141,10 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
BLOOM_FILTER_FPP, DEFAULT_BLOOM_FILTER_FPP);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_PARALLELISM_PROP),
BLOOM_INDEX_PARALLELISM_PROP, DEFAULT_BLOOM_INDEX_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_PRUNE_BY_RANGES_PROP),
BLOOM_INDEX_PRUNE_BY_RANGES_PROP, DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_USE_CACHING_PROP),
BLOOM_INDEX_USE_CACHING_PROP, DEFAULT_BLOOM_INDEX_USE_CACHING);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config;

View File

@@ -203,6 +203,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
}
public boolean getBloomIndexPruneByRanges() {
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PRUNE_BY_RANGES_PROP));
}
public boolean getBloomIndexUseCaching() {
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING_PROP));
}
public int getNumBucketsPerPartition() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BUCKETED_INDEX_NUM_BUCKETS_PROP));
}

View File

@@ -1,347 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.index;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in
* its metadata.
*/
public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
private static Logger logger = LogManager.getLogger(HoodieBloomIndex.class);
// we need to limit the join such that it stays within 1.5GB per Spark partition. (SPARK-1476)
private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1500 * 1024 * 1024;
// this is how much a triplet of (partitionPath, fileId, recordKey) costs.
private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300;
private static int MAX_ITEMS_PER_JOIN_PARTITION = SPARK_MAXIMUM_BYTES_PER_PARTITION / BYTES_PER_PARTITION_FILE_KEY_TRIPLET;
public HoodieBloomIndex(HoodieWriteConfig config, JavaSparkContext jsc) {
super(config, jsc);
}
@Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, final HoodieTable<T> hoodieTable) {
// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
JavaPairRDD<String, String> partitionRecordKeyPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, hoodieTable);
// Cache the result, for subsequent stages.
rowKeyFilenamePairRDD.cache();
long totalTaggedRecords = rowKeyFilenamePairRDD.count();
logger.info("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
// Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys
// Cost: 4 sec.
return tagLocationBacktoRecords(rowKeyFilenamePairRDD, recordRDD);
}
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> table) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, table);
JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD)
.mapToPair(keyPathTuple -> {
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
String fileName = keyPathTuple._2._2.get();
String partitionPath = keyPathTuple._2._1.getPartitionPath();
recordLocationPath = Optional.of(new Path(
new Path(table.getMetaClient().getBasePath(), partitionPath),
fileName).toUri().getPath());
} else {
recordLocationPath = Optional.absent();
}
return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
});
}
/**
* Lookup the location for each record key and return the pair<record_key,location> for all
* record keys already present and drop the record keys if not present
*/
private JavaPairRDD<String, String> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final HoodieTable<T> hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
JavaPairRDD<String, String> partitionFilePairRDD =
loadInvolvedFiles(affectedPartitionPathList, hoodieTable);
Map<String, Long> filesPerPartition = partitionFilePairRDD.countByKey();
// Compute total subpartitions, to split partitions into.
Map<String, Long> subpartitionCountMap =
computeSubPartitions(recordsPerPartition, filesPerPartition);
// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, that contains it.
return findMatchingFilesForRecordKeys(partitionFilePairRDD, partitionRecordKeyPairRDD,
subpartitionCountMap);
}
/**
* The index lookup can be skewed in three dimensions : #files, #partitions, #records
*
* To be able to smoothly handle skews, we need to compute how to split each partitions into
* subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to
* < 2GB.
*/
private Map<String, Long> computeSubPartitions(Map<String, Long> recordsPerPartition, Map<String, Long> filesPerPartition) {
Map<String, Long> subpartitionCountMap = new HashMap<>();
long totalRecords = 0;
long totalFiles = 0;
for (String partitionPath : recordsPerPartition.keySet()) {
long numRecords = recordsPerPartition.get(partitionPath);
long numFiles = filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 1L;
subpartitionCountMap.put(partitionPath, ((numFiles * numRecords) / MAX_ITEMS_PER_JOIN_PARTITION) + 1);
totalFiles += filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 0L;
totalRecords += numRecords;
}
logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles + ", TotalAffectedPartitions:" + recordsPerPartition.size());
logger.info("Sub Partition Counts : " + subpartitionCountMap);
return subpartitionCountMap;
}
/**
* Load the input records as <Partition, RowKeys> in memory.
*/
@VisibleForTesting
Map<String, Iterable<String>> getPartitionToRowKeys(JavaRDD<HoodieRecord<T>> recordRDD) {
// Have to wrap the map into a hashmap becuase of the need to braoadcast (see: http://php.sabscape.com/blog/?p=671)
return recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()))
.groupByKey().collectAsMap();
}
/**
* Load all involved files as <Partition, filename> pair RDD.
*/
@VisibleForTesting
JavaPairRDD<String, String> loadInvolvedFiles(List<String> partitions,
final HoodieTable<T> hoodieTable) {
return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
.flatMapToPair(partitionPath -> {
java.util.Optional<HoodieInstant> latestCommitTime =
hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant();
List<Tuple2<String, String>> list = new ArrayList<>();
if (latestCommitTime.isPresent()) {
List<HoodieDataFile> filteredFiles =
hoodieTable.getROFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath,
latestCommitTime.get().getTimestamp()).collect(Collectors.toList());
for (HoodieDataFile file : filteredFiles) {
list.add(new Tuple2<>(partitionPath, file.getFileName()));
}
}
return list.iterator();
});
}
@Override
public boolean rollbackCommit(String commitTime) {
// Nope, don't need to do anything.
return true;
}
/**
* When we subpartition records going into a partition, we still need to check them against all
* the files within the partition. Thus, we need to explode the (partition, file) pairs to
* (partition_subpartnum, file), so we can later join.
*/
private JavaPairRDD<String, String> explodePartitionFilePairRDD(JavaPairRDD<String, String> partitionFilePairRDD,
final Map<String, Long> subpartitionCountMap) {
return partitionFilePairRDD
.map(partitionFilePair -> {
List<Tuple2<String, String>> explodedPartitionFilePairs = new ArrayList<>();
for (long l = 0; l < subpartitionCountMap.get(partitionFilePair._1); l++) {
explodedPartitionFilePairs.add(new Tuple2<>(
String.format("%s#%d", partitionFilePair._1, l),
partitionFilePair._2));
}
return explodedPartitionFilePairs;
})
.flatMapToPair(exploded -> exploded.iterator());
}
/**
* To handle tons of incoming records to a partition, we need to split them into groups or
* create subpartitions. Here, we do a simple hash mod splitting, based on computed sub
* partitions.
*/
private JavaPairRDD<String, String> splitPartitionRecordKeysPairRDD(JavaPairRDD<String, String> partitionRecordKeyPairRDD,
final Map<String, Long> subpartitionCountMap) {
return partitionRecordKeyPairRDD
.mapToPair(partitionRecordKeyPair -> {
long subpart = Math.abs(partitionRecordKeyPair._2.hashCode()) % subpartitionCountMap.get(partitionRecordKeyPair._1);
return new Tuple2<>(
String.format("%s#%d", partitionRecordKeyPair._1, subpart),
partitionRecordKeyPair._2);
});
}
/**
* Its crucial to pick the right parallelism.
*
* totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism :
* typically number of input files.
*
* We pick the max such that, we are always safe, but go higher if say a there are a lot of
* input files. (otherwise, we will fallback to number of partitions in input and end up with
* slow performance)
*/
private int determineParallelism(int inputParallelism, final Map<String, Long> subpartitionCountMap) {
// size the join parallelism to max(total number of sub partitions, total number of files).
int totalSubparts = 0;
for (long subparts : subpartitionCountMap.values()) {
totalSubparts += (int) subparts;
}
// If bloom index parallelism is set, use it to to check against the input parallelism and take the max
int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
int joinParallelism = Math.max(totalSubparts, indexParallelism);
logger.info("InputParallelism: ${" + inputParallelism + "}, " +
"IndexParallelism: ${" + config.getBloomIndexParallelism() + "}, " +
"TotalSubParts: ${" + totalSubparts + "}, " +
"Join Parallelism set to : " + joinParallelism);
return joinParallelism;
}
/**
* Find out <RowKey, filename> pair. All workload grouped by file-level.
*
* // Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition
* such that // each RDD partition is a file, then for each file, we do (1) load bloom filter,
* (2) load rowKeys, (3) Tag rowKey // Make sure the parallelism is atleast the groupby
* parallelism for tagging location
*/
private JavaPairRDD<String, String> findMatchingFilesForRecordKeys(JavaPairRDD<String, String> partitionFilePairRDD,
JavaPairRDD<String, String> partitionRecordKeyPairRDD,
final Map<String, Long> subpartitionCountMap) {
// prepare the two RDDs and their join parallelism
JavaPairRDD<String, String> subpartitionFilePairRDD = explodePartitionFilePairRDD(partitionFilePairRDD, subpartitionCountMap);
JavaPairRDD<String, String> subpartitionRecordKeyPairRDD = splitPartitionRecordKeysPairRDD(partitionRecordKeyPairRDD,
subpartitionCountMap);
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), subpartitionCountMap);
// Perform a join, to bring all the files in each subpartition ,together with the record keys to be tested against them
JavaPairRDD<String, Tuple2<String, String>> joinedTripletRDD = subpartitionFilePairRDD
.join(subpartitionRecordKeyPairRDD, joinParallelism);
// sort further based on filename, such that all checking for the file can happen within a single partition, on-the-fly
JavaPairRDD<String, Tuple2<String, HoodieKey>> fileSortedTripletRDD = joinedTripletRDD
/**
* Incoming triplet is (partitionPath_subpart) => (file, recordKey)
*/
.mapToPair(joinedTriplet -> {
String partitionPath = joinedTriplet._1.split("#")[0]; // throw away the subpart
String fileName = joinedTriplet._2._1;
String recordKey = joinedTriplet._2._2;
// make a sort key as <file>#<recordKey>, to handle skews
return new Tuple2<>(String.format("%s#%s", fileName, recordKey),
new Tuple2<>(fileName, new HoodieKey(recordKey, partitionPath)));
}).sortByKey(true, joinParallelism);
return fileSortedTripletRDD
.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true)
.flatMap(indexLookupResults -> indexLookupResults.iterator())
.filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> {
List<Tuple2<String, String>> vals = new ArrayList<>();
for (String recordKey : lookupResult.getMatchingRecordKeys()) {
vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
}
return vals.iterator();
});
}
/**
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
*/
private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(JavaPairRDD<String, String> rowKeyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(
v1 -> {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
}
return record;
}
);
}
@Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> hoodieTable) {
return writeStatusRDD;
}
}

View File

@@ -17,8 +17,7 @@
package com.uber.hoodie.index;
import com.google.common.base.Optional;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieKey;
@@ -26,6 +25,9 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.index.bucketed.BucketedIndex;
import com.uber.hoodie.index.hbase.HBaseIndex;
import com.uber.hoodie.table.HoodieTable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;

View File

@@ -0,0 +1,99 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import com.google.common.base.Objects;
import java.io.Serializable;
/**
* Metadata about a given file, useful for index lookup
*/
public class BloomIndexFileInfo implements Serializable {
private final String fileName;
private final String minRecordKey;
private final String maxRecordKey;
public BloomIndexFileInfo(String fileName, String minRecordKey, String maxRecordKey) {
this.fileName = fileName;
this.minRecordKey = minRecordKey;
this.maxRecordKey = maxRecordKey;
}
public BloomIndexFileInfo(String fileName) {
this.fileName = fileName;
this.minRecordKey = null;
this.maxRecordKey = null;
}
public String getFileName() {
return fileName;
}
public String getMinRecordKey() {
return minRecordKey;
}
public String getMaxRecordKey() {
return maxRecordKey;
}
public boolean hasKeyRanges() {
return minRecordKey != null && maxRecordKey != null;
}
/**
* Does the given key fall within the range (inclusive)
* @param recordKey
* @return
*/
public boolean isKeyInRange(String recordKey) {
return minRecordKey.compareTo(recordKey) <= 0 &&
maxRecordKey.compareTo(recordKey) >= 0;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BloomIndexFileInfo that = (BloomIndexFileInfo) o;
return Objects.equal(that.fileName, fileName) &&
Objects.equal(that.minRecordKey, minRecordKey) &&
Objects.equal(that.maxRecordKey, maxRecordKey);
}
@Override
public int hashCode() {
return Objects.hashCode(fileName, minRecordKey, maxRecordKey);
}
public String toString() {
final StringBuilder sb = new StringBuilder("BloomIndexFileInfo {");
sb.append(" fileName=").append(fileName);
sb.append(" minRecordKey=").append(minRecordKey);
sb.append(" maxRecordKey=").append(maxRecordKey);
sb.append('}');
return sb.toString();
}
}

View File

@@ -0,0 +1,380 @@
/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index.bloom;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.MetadataNotFoundException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.*;
/**
* Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in
* its metadata.
*/
public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
private static Logger logger = LogManager.getLogger(HoodieBloomIndex.class);
// we need to limit the join such that it stays within 1.5GB per Spark partition. (SPARK-1476)
private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1500 * 1024 * 1024;
// this is how much a triplet of (partitionPath, fileId, recordKey) costs.
private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300;
private static int MAX_ITEMS_PER_SHUFFLE_PARTITION = SPARK_MAXIMUM_BYTES_PER_PARTITION / BYTES_PER_PARTITION_FILE_KEY_TRIPLET;
public HoodieBloomIndex(HoodieWriteConfig config, JavaSparkContext jsc) {
super(config, jsc);
}
@Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, final HoodieTable<T> hoodieTable) {
// Step 0: cache the input record RDD
if (config.getBloomIndexUseCaching()) {
recordRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
}
// Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
JavaPairRDD<String, String> partitionRecordKeyPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, hoodieTable);
// Cache the result, for subsequent stages.
if (config.getBloomIndexUseCaching()) {
rowKeyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
}
if (logger.isDebugEnabled()) {
long totalTaggedRecords = rowKeyFilenamePairRDD.count();
logger.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
}
// Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys
// Cost: 4 sec.
JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(rowKeyFilenamePairRDD, recordRDD);
if (config.getBloomIndexUseCaching()) {
recordRDD.unpersist(); // unpersist the input Record RDD
rowKeyFilenamePairRDD.unpersist();
}
return taggedRecordRDD;
}
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> table) {
JavaPairRDD<String, String> partitionRecordKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey()));
// Lookup indexes for all the partition/recordkey pair
JavaPairRDD<String, String> rowKeyFilenamePairRDD =
lookupIndex(partitionRecordKeyPairRDD, table);
JavaPairRDD<String, HoodieKey> rowKeyHoodieKeyPairRDD =
hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key));
return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD)
.mapToPair(keyPathTuple -> {
Optional<String> recordLocationPath;
if (keyPathTuple._2._2.isPresent()) {
String fileName = keyPathTuple._2._2.get();
String partitionPath = keyPathTuple._2._1.getPartitionPath();
recordLocationPath = Optional.of(new Path(
new Path(table.getMetaClient().getBasePath(), partitionPath),
fileName).toUri().getPath());
} else {
recordLocationPath = Optional.absent();
}
return new Tuple2<>(keyPathTuple._2._1, recordLocationPath);
});
}
/**
* Lookup the location for each record key and return the pair<record_key,location> for all
* record keys already present and drop the record keys if not present
*/
private JavaPairRDD<String, String> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final HoodieTable<T> hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
List<Tuple2<String, BloomIndexFileInfo>> fileInfoList = loadInvolvedFiles(affectedPartitionPathList, hoodieTable);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo = fileInfoList.stream()
.collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, that contains it.
int parallelism = autoComputeParallelism(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, parallelism);
}
/**
* The index lookup can be skewed in three dimensions : #files, #partitions, #records
*
* To be able to smoothly handle skews, we need to compute how to split each partitions into
* subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to
* < 2GB.
*
* If {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is specified as a NON-zero number,
* then that is used explicitly.
*
*/
private int autoComputeParallelism(final Map<String, Long> recordsPerPartition,
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
long totalComparisons = 0;
if (config.getBloomIndexPruneByRanges()) {
// we will just try exploding the input and then count to determine comparisons
totalComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD).count();
} else {
// if not pruning by ranges, then each file in a partition needs to compared against all
// records for a partition.
Map<String, Long> filesPerPartition = partitionToFileInfo.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> Long.valueOf(e.getValue().size())));
long totalFiles = 0, totalRecords = 0;
for (String partitionPath : recordsPerPartition.keySet()) {
long numRecords = recordsPerPartition.get(partitionPath);
long numFiles = filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 1L;
totalComparisons += numFiles * numRecords;
totalFiles += filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 0L;
totalRecords += numRecords;
}
logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles + ", TotalAffectedPartitions:" + recordsPerPartition.size());
}
// each partition will have an item per comparison.
int parallelism = (int) (totalComparisons/ MAX_ITEMS_PER_SHUFFLE_PARTITION + 1);
logger.info("Auto computed parallelism :" + parallelism + ", totalComparisons: " + totalComparisons);
return parallelism;
}
/**
* Its crucial to pick the right parallelism.
*
* totalSubPartitions : this is deemed safe limit, to be nice with Spark.
* inputParallelism : typically number of input file splits
*
* We pick the max such that, we are always safe, but go higher if say a there are a lot of
* input files. (otherwise, we will fallback to number of partitions in input and end up with
* slow performance)
*/
private int determineParallelism(int inputParallelism, int totalSubPartitions) {
// If bloom index parallelism is set, use it to to check against the input parallelism and take the max
int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
int joinParallelism = Math.max(totalSubPartitions, indexParallelism);
logger.info("InputParallelism: ${" + inputParallelism + "}, " +
"IndexParallelism: ${" + config.getBloomIndexParallelism() + "}, " +
"TotalSubParts: ${" + totalSubPartitions + "}, " +
"Join Parallelism set to : " + joinParallelism);
return joinParallelism;
}
/**
* Load all involved files as <Partition, filename> pair RDD.
*/
@VisibleForTesting
List<Tuple2<String, BloomIndexFileInfo>> loadInvolvedFiles(List<String> partitions, final HoodieTable<T> hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Tuple2<String, HoodieDataFile>> dataFilesList = jsc.parallelize(partitions, Math.max(partitions.size(), 1))
.flatMapToPair(partitionPath -> {
java.util.Optional<HoodieInstant> latestCommitTime =
hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant();
List<Tuple2<String, HoodieDataFile>> filteredFiles = new ArrayList<>();
if (latestCommitTime.isPresent()) {
filteredFiles =
hoodieTable.getROFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath,
latestCommitTime.get().getTimestamp())
.map(f -> new Tuple2<>(partitionPath, f))
.collect(toList());
}
return filteredFiles.iterator();
}).collect();
if (config.getBloomIndexPruneByRanges()) {
// also obtain file ranges, if range pruning is enabled
return jsc.parallelize(dataFilesList, Math.max(dataFilesList.size(), 1))
.mapToPair(ft -> {
try {
String[] minMaxKeys = ParquetUtils.readMinMaxRecordKeys(ft._2().getFileStatus().getPath());
return new Tuple2<>(ft._1(), new BloomIndexFileInfo(ft._2().getFileName(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
logger.warn("Unable to find range metadata in file :" + ft._2());
return new Tuple2<>(ft._1(), new BloomIndexFileInfo(ft._2().getFileName()));
}
}).collect();
} else {
return dataFilesList.stream()
.map(ft -> new Tuple2<>(ft._1(), new BloomIndexFileInfo(ft._2().getFileName())))
.collect(toList());
}
}
@Override
public boolean rollbackCommit(String commitTime) {
// Nope, don't need to do anything.
return true;
}
/**
* if we dont have key ranges, then also we need to compare against the file. no other choice
* if we do, then only compare the file if the record key falls in range.
* @param indexInfo
* @param recordKey
* @return
*/
private boolean shouldCompareWithFile(BloomIndexFileInfo indexInfo, String recordKey) {
return !indexInfo.hasKeyRanges() || indexInfo.isKeyInRange(recordKey);
}
/**
* For each incoming record, produce N output records, 1 each for each file against which the record's key
* needs to be checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix),
* the number of files to be compared gets cut down a lot from range pruning.
*
*
* @param partitionToFileIndexInfo
* @param partitionRecordKeyPairRDD
* @return
*/
// sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on recordKey
// ranges in the index info.
@VisibleForTesting
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
return partitionRecordKeyPairRDD
.map(partitionRecordKeyPair -> {
String recordKey = partitionRecordKeyPair._2();
String partitionPath = partitionRecordKeyPair._1();
List<BloomIndexFileInfo> indexInfos = partitionToFileIndexInfo.get(partitionPath);
List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
if (indexInfos != null) { // could be null, if there are no files in a given partition yet.
// for each candidate file in partition, that needs to be compared.
for (BloomIndexFileInfo indexInfo : indexInfos) {
if (shouldCompareWithFile(indexInfo, recordKey)) {
recordComparisons.add(
new Tuple2<>(String.format("%s#%s", indexInfo.getFileName(), recordKey),
new Tuple2<>(indexInfo.getFileName(), new HoodieKey(recordKey, partitionPath))));
}
}
}
return recordComparisons;
})
.flatMapToPair(t -> t.iterator());
}
/**
* Find out <RowKey, filename> pair. All workload grouped by file-level.
*
* Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition
* such that each RDD partition is a file, then for each file, we do
* (1) load bloom filter,
* (2) load rowKeys,
* (3) Tag rowKey
*
* Make sure the parallelism is atleast the groupby parallelism for tagging location
*/
@VisibleForTesting
JavaPairRDD<String, String> findMatchingFilesForRecordKeys(final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD,
int totalSubpartitions) {
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), totalSubpartitions);
JavaPairRDD<String, Tuple2<String, HoodieKey>> fileSortedTripletRDD = explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD)
// sort further based on filename, such that all checking for the file can happen within a single partition, on-the-fly
.sortByKey(true, joinParallelism);
return fileSortedTripletRDD
.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true)
.flatMap(indexLookupResults -> indexLookupResults.iterator())
.filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> {
List<Tuple2<String, String>> vals = new ArrayList<>();
for (String recordKey : lookupResult.getMatchingRecordKeys()) {
vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
}
return vals.iterator();
});
}
/**
* Tag the <rowKey, filename> back to the original HoodieRecord RDD.
*/
private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(JavaPairRDD<String, String> rowKeyFilenamePairRDD,
JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(
v1 -> {
HoodieRecord<T> record = v1._1();
if (v1._2().isPresent()) {
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
}
return record;
}
);
}
@Override
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, HoodieTable<T> hoodieTable) {
return writeStatusRDD;
}
}

View File

@@ -1,20 +1,22 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index;
package com.uber.hoodie.index.bloom;
import com.uber.hoodie.common.BloomFilter;
import com.uber.hoodie.common.model.HoodieKey;

View File

@@ -1,20 +1,22 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index;
package com.uber.hoodie.index.bloom;
import java.util.List;

View File

@@ -16,7 +16,7 @@
*
*/
package com.uber.hoodie.index;
package com.uber.hoodie.index.bucketed;
import com.google.common.base.Optional;
@@ -25,9 +25,9 @@ import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import org.apache.log4j.LogManager;

View File

@@ -1,20 +1,22 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index;
package com.uber.hoodie.index.hbase;
import com.google.common.base.Optional;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -29,6 +31,7 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

View File

@@ -91,8 +91,7 @@ public class TestHoodieClient implements Serializable {
@Before
public void init() throws IOException {
// Initialize a local spark env
SparkConf sparkConf = new SparkConf().setAppName("TestHoodieClient").setMaster("local[4]");
jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieClient"));
//SQLContext stuff
sqlContext = new SQLContext(jsc);

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieMergeOnReadTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.minicluster.HdfsTestService;
@@ -30,32 +31,21 @@ import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.HoodieCompactor;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -68,8 +58,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -77,7 +65,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.uber.hoodie.common.HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat;
import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -126,10 +113,7 @@ public class TestMergeOnReadTable {
this.fs = FSUtils.getFs();
// Initialize a local spark env
SparkConf sparkConf =
new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setAppName("TestHoodieCompactor").setMaster("local[1]");
jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeOnReadTable"));
jsc.hadoopConfiguration().addResource(FSUtils.getFs().getConf());
// Create a temp folder as the base path

View File

@@ -16,12 +16,15 @@
package com.uber.hoodie.common;
import com.uber.hoodie.HoodieReadClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.FSUtils;
import org.apache.spark.SparkConf;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -79,4 +82,12 @@ public class HoodieClientTestUtils {
new File(path).createNewFile();
new RandomAccessFile(path, "rw").setLength(length);
}
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf()
.setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[4]");
return HoodieReadClient.addHoodieSupport(sparkConf);
}
}

View File

@@ -19,6 +19,9 @@ package com.uber.hoodie.index;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.index.hbase.HBaseIndex;
import org.junit.Test;
import static org.junit.Assert.*;

View File

@@ -1,25 +1,29 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie.index;
package com.uber.hoodie.index.bloom;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.avro.HoodieAvroWriteSupport;
import com.uber.hoodie.common.BloomFilter;
@@ -30,14 +34,22 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.index.bloom.BloomIndexFileInfo;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.index.bloom.HoodieBloomIndexCheckFunction;
import com.uber.hoodie.io.storage.HoodieParquetConfig;
import com.uber.hoodie.io.storage.HoodieParquetWriter;
import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -47,6 +59,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import scala.Tuple2;
@@ -54,6 +67,8 @@ import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.*;
@@ -61,6 +76,8 @@ public class TestHoodieBloomIndex {
private JavaSparkContext jsc = null;
private String basePath = null;
private transient final FileSystem fs;
private String schemaStr;
private Schema schema;
public TestHoodieBloomIndex() throws Exception {
fs = FSUtils.getFs();
@@ -69,13 +86,15 @@ public class TestHoodieBloomIndex {
@Before
public void init() throws IOException {
// Initialize a local spark env
SparkConf sparkConf = new SparkConf().setAppName("TestHoodieBloomIndex").setMaster("local[4]");
jsc = new JavaSparkContext(sparkConf);
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieBloomIndex"));
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(basePath);
// We have some records to be tagged (two different partitions)
schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
}
@Test
@@ -86,8 +105,6 @@ public class TestHoodieBloomIndex {
String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
String schemaStr =
IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
@@ -101,8 +118,10 @@ public class TestHoodieBloomIndex {
// Load to memory
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
HoodieBloomIndex index = new HoodieBloomIndex(config, jsc);
Map<String, Iterable<String>> map = index.getPartitionToRowKeys(recordRDD);
Map<String, Iterable<String>> map = recordRDD
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()))
.groupByKey().collectAsMap();
assertEquals(map.size(), 2);
List<String> list1 = Lists.newArrayList(map.get("2016/01/31"));
List<String> list2 = Lists.newArrayList(map.get("2015/01/31"));
@@ -112,7 +131,9 @@ public class TestHoodieBloomIndex {
@Test
public void testLoadInvolvedFiles() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.build();
HoodieBloomIndex index = new HoodieBloomIndex(config, jsc);
// Create some partitions, and put some files
@@ -122,51 +143,105 @@ public class TestHoodieBloomIndex {
new File(basePath + "/2016/01/21").mkdirs();
new File(basePath + "/2016/04/01").mkdirs();
new File(basePath + "/2015/03/12").mkdirs();
new File(basePath + "/2016/04/01/2_0_20160401010101.parquet").createNewFile();
new File(basePath + "/2015/03/12/1_0_20150312101010.parquet").createNewFile();
new File(basePath + "/2015/03/12/3_0_20150312101010.parquet").createNewFile();
new File(basePath + "/2015/03/12/4_0_20150312101010.parquet").createNewFile();
TestRawTripPayload rowChange1 = new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
TestRawTripPayload rowChange2 = new TestRawTripPayload("{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);
TestRawTripPayload rowChange3 = new TestRawTripPayload("{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3);
TestRawTripPayload rowChange4 = new TestRawTripPayload("{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
writeParquetFile("2016/04/01","2_0_20160401010101.parquet", Lists.newArrayList(), schema, null, false);
writeParquetFile("2015/03/12","1_0_20150312101010.parquet", Lists.newArrayList(), schema, null, false);
writeParquetFile("2015/03/12","3_0_20150312101010.parquet", Arrays.asList(record1), schema, null, false);
writeParquetFile("2015/03/12","4_0_20150312101010.parquet", Arrays.asList(record2, record3, record4), schema, null, false);
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metadata, config);
JavaPairRDD<String, String> rdd = index.loadInvolvedFiles(partitions, table);
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, table);
// Still 0, as no valid commit
assertEquals(rdd.count(), 0);
assertEquals(filesList.size(), 0);
// Add some commits
new File(basePath + "/.hoodie").mkdirs();
new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
metadata = new HoodieTableMetaClient(fs, basePath);
rdd = index.loadInvolvedFiles(partitions, table);
final List<Tuple2<String, String>> filesList = rdd.collect();
filesList = index.loadInvolvedFiles(partitions, table);
assertEquals(filesList.size(), 4);
// these files will not have the key ranges
assertNull(filesList.get(0)._2().getMaxRecordKey());
assertNull(filesList.get(0)._2().getMinRecordKey());
assertFalse(filesList.get(1)._2().hasKeyRanges());
assertNotNull(filesList.get(2)._2().getMaxRecordKey());
assertNotNull(filesList.get(2)._2().getMinRecordKey());
assertTrue(filesList.get(3)._2().hasKeyRanges());
// no longer sorted, but should have same files.
Set<String> actualFiles = new HashSet<String>(){{
add(filesList.get(0)._1 + "/" + filesList.get(0)._2);
add(filesList.get(1)._1 + "/" + filesList.get(1)._2);
add(filesList.get(2)._1 + "/" + filesList.get(2)._2);
add(filesList.get(3)._1 + "/" + filesList.get(3)._2);
}};
Set<String> expected = new HashSet<String>() {{
add("2016/04/01/2_0_20160401010101.parquet");
add("2015/03/12/1_0_20150312101010.parquet");
add("2015/03/12/3_0_20150312101010.parquet");
add("2015/03/12/4_0_20150312101010.parquet");
}};
assertEquals(expected, actualFiles);
List<Tuple2<String, BloomIndexFileInfo>> expected = Arrays.asList(
new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2_0_20160401010101.parquet")),
new Tuple2<>("2015/03/12",new BloomIndexFileInfo("1_0_20150312101010.parquet")),
new Tuple2<>("2015/03/12",new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")),
new Tuple2<>("2015/03/12",new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003"))
);
assertEquals(expected, filesList);
}
@Test
public void testRangePruning() {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath)
.build();
HoodieBloomIndex index = new HoodieBloomIndex(config, jsc);
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo = new HashMap<>();
partitionToFileIndexInfo.put("2017/10/22", Arrays.asList(
new BloomIndexFileInfo("f1"),
new BloomIndexFileInfo("f2", "000", "000"),
new BloomIndexFileInfo("f3", "001", "003"),
new BloomIndexFileInfo("f4", "002", "007"),
new BloomIndexFileInfo("f5", "009", "010")
));
JavaPairRDD<String, String> partitionRecordKeyPairRDD = jsc
.parallelize(Arrays.asList(
new Tuple2<>("2017/10/22","003"),
new Tuple2<>("2017/10/22","002"),
new Tuple2<>("2017/10/22","005"),
new Tuple2<>("2017/10/22","004")
))
.mapToPair(t -> t);
List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index
.explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD)
.collect();
assertEquals(10, comparisonKeyList.size());
Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
.collect(Collectors.groupingBy(
t -> t._2()._2().getRecordKey(),
Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()
)
));
assertEquals(4, recordKeyToFileComps.size());
assertEquals(Arrays.asList("f1", "f3", "f4"), recordKeyToFileComps.get("002"));
assertEquals(Arrays.asList("f1", "f3", "f4"), recordKeyToFileComps.get("003"));
assertEquals(Arrays.asList("f1", "f4"), recordKeyToFileComps.get("004"));
assertEquals(Arrays.asList("f1", "f4"), recordKeyToFileComps.get("005"));
}
@Test
public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedException, ClassNotFoundException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
HoodieBloomIndex index = new HoodieBloomIndex(config, jsc);
String schemaStr =
IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
// Create some records to use
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
@@ -232,9 +307,6 @@ public class TestHoodieBloomIndex {
@Test
public void testTagLocation() throws Exception {
// We have some records to be tagged (two different partitions)
String schemaStr =
IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
@@ -292,9 +364,6 @@ public class TestHoodieBloomIndex {
@Test
public void testCheckExists() throws Exception {
// We have some records to be tagged (two different partitions)
String schemaStr =
IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
@@ -366,10 +435,6 @@ public class TestHoodieBloomIndex {
String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
// We write record1 to a parquet file, using a bloom filter having both records
String schemaStr =
IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
@@ -407,24 +472,32 @@ public class TestHoodieBloomIndex {
String fileId = UUID.randomUUID().toString();
String filename = FSUtils.makeDataFileName(commitTime, 1, fileId);
return writeParquetFile(partitionPath, filename, records, schema, filter, createCommitTime);
}
private String writeParquetFile(String partitionPath, String filename, List<HoodieRecord> records, Schema schema,
BloomFilter filter, boolean createCommitTime) throws IOException {
if (filter == null) {
filter = new BloomFilter(10000, 0.0000001);
}
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);
ParquetWriter writer = new ParquetWriter(new Path(basePath + "/" + partitionPath + "/" + filename),
writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE);
int seqId = 1;
String commitTime = FSUtils.getCommitTime(filename);
HoodieParquetConfig config = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, new Configuration());
HoodieParquetWriter writer = new HoodieParquetWriter(
commitTime,
new Path(basePath + "/" + partitionPath + "/" + filename),
config,
schema);
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++);
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename);
writer.write(avroRecord);
writer.writeAvro(record.getRecordKey(), avroRecord);
filter.add(record.getRecordKey());
}
writer.close();

View File

@@ -19,23 +19,22 @@ package com.uber.hoodie.io;
import com.uber.hoodie.HoodieReadClient;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieCompactionMetadata;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieBloomIndex;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.HoodieCompactor;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
@@ -52,9 +51,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -68,9 +65,7 @@ public class TestHoodieCompactor {
@Before
public void init() throws IOException {
// Initialize a local spark env
SparkConf sparkConf =
new SparkConf().setAppName("TestHoodieCompactor").setMaster("local[4]");
jsc = new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieCompactor"));
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();

View File

@@ -69,8 +69,7 @@ public class TestCopyOnWriteTable {
public void init() throws Exception {
// Initialize a local spark env
SparkConf sparkConf = new SparkConf().setAppName("TestCopyOnWriteTable").setMaster("local[4]");
jsc = new JavaSparkContext(sparkConf);
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestCopyOnWriteTable"));
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();