/* * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.uber.hoodie.index; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in * its metadata. */ public class HoodieBloomIndex extends HoodieIndex { private static Logger logger = LogManager.getLogger(HoodieBloomIndex.class); // we need to limit the join such that it stays within 1.5GB per Spark partition. (SPARK-1476) private static final int SPARK_MAXIMUM_BYTES_PER_PARTITION = 1500 * 1024 * 1024; // this is how much a triplet of (partitionPath, fileId, recordKey) costs. private static final int BYTES_PER_PARTITION_FILE_KEY_TRIPLET = 300; private static int MAX_ITEMS_PER_JOIN_PARTITION = SPARK_MAXIMUM_BYTES_PER_PARTITION / BYTES_PER_PARTITION_FILE_KEY_TRIPLET; public HoodieBloomIndex(HoodieWriteConfig config, JavaSparkContext jsc) { super(config, jsc); } @Override public JavaRDD> tagLocation(JavaRDD> recordRDD, final HoodieTable hoodieTable) { // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) JavaPairRDD partitionRecordKeyPairRDD = recordRDD .mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())); // Lookup indexes for all the partition/recordkey pair JavaPairRDD rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, hoodieTable); // Cache the result, for subsequent stages. rowKeyFilenamePairRDD.cache(); long totalTaggedRecords = rowKeyFilenamePairRDD.count(); logger.info("Number of update records (ones tagged with a fileID): " + totalTaggedRecords); // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys // Cost: 4 sec. return tagLocationBacktoRecords(rowKeyFilenamePairRDD, recordRDD); } public JavaPairRDD> fetchRecordLocation( JavaRDD hoodieKeys, final HoodieTable table) { JavaPairRDD partitionRecordKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key.getPartitionPath(), key.getRecordKey())); // Lookup indexes for all the partition/recordkey pair JavaPairRDD rowKeyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, table); JavaPairRDD rowKeyHoodieKeyPairRDD = hoodieKeys.mapToPair(key -> new Tuple2<>(key.getRecordKey(), key)); return rowKeyHoodieKeyPairRDD.leftOuterJoin(rowKeyFilenamePairRDD) .mapToPair(keyPathTuple -> { Optional recordLocationPath; if (keyPathTuple._2._2.isPresent()) { String fileName = keyPathTuple._2._2.get(); String partitionPath = keyPathTuple._2._1.getPartitionPath(); recordLocationPath = Optional.of(new Path( new Path(table.getMetaClient().getBasePath(), partitionPath), fileName).toUri().getPath()); } else { recordLocationPath = Optional.absent(); } return new Tuple2<>(keyPathTuple._2._1, recordLocationPath); }); } /** * Lookup the location for each record key and return the pair for all * record keys already present and drop the record keys if not present */ private JavaPairRDD lookupIndex( JavaPairRDD partitionRecordKeyPairRDD, final HoodieTable hoodieTable) { // Obtain records per partition, in the incoming records Map recordsPerPartition = partitionRecordKeyPairRDD.countByKey(); List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); // Step 2: Load all involved files as pairs JavaPairRDD partitionFilePairRDD = loadInvolvedFiles(affectedPartitionPathList, hoodieTable); Map filesPerPartition = partitionFilePairRDD.countByKey(); // Compute total subpartitions, to split partitions into. Map subpartitionCountMap = computeSubPartitions(recordsPerPartition, filesPerPartition); // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, that contains it. return findMatchingFilesForRecordKeys(partitionFilePairRDD, partitionRecordKeyPairRDD, subpartitionCountMap); } /** * The index lookup can be skewed in three dimensions : #files, #partitions, #records * * To be able to smoothly handle skews, we need to compute how to split each partitions into * subpartitions. We do it here, in a way that keeps the amount of each Spark join partition to * < 2GB. */ private Map computeSubPartitions(Map recordsPerPartition, Map filesPerPartition) { Map subpartitionCountMap = new HashMap<>(); long totalRecords = 0; long totalFiles = 0; for (String partitionPath : recordsPerPartition.keySet()) { long numRecords = recordsPerPartition.get(partitionPath); long numFiles = filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 1L; subpartitionCountMap.put(partitionPath, ((numFiles * numRecords) / MAX_ITEMS_PER_JOIN_PARTITION) + 1); totalFiles += filesPerPartition.containsKey(partitionPath) ? filesPerPartition.get(partitionPath) : 0L; totalRecords += numRecords; } logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles + ", TotalAffectedPartitions:" + recordsPerPartition.size()); logger.info("Sub Partition Counts : " + subpartitionCountMap); return subpartitionCountMap; } /** * Load the input records as in memory. */ @VisibleForTesting Map> getPartitionToRowKeys(JavaRDD> recordRDD) { // Have to wrap the map into a hashmap becuase of the need to braoadcast (see: http://php.sabscape.com/blog/?p=671) return recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())) .groupByKey().collectAsMap(); } /** * Load all involved files as pair RDD. */ @VisibleForTesting JavaPairRDD loadInvolvedFiles(List partitions, final HoodieTable hoodieTable) { return jsc.parallelize(partitions, Math.max(partitions.size(), 1)) .flatMapToPair(partitionPath -> { java.util.Optional latestCommitTime = hoodieTable.getCommitTimeline().filterCompletedInstants().lastInstant(); List> list = new ArrayList<>(); if (latestCommitTime.isPresent()) { List filteredFiles = hoodieTable.getFileSystemView().getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()).collect(Collectors.toList()); for (HoodieDataFile file : filteredFiles) { list.add(new Tuple2<>(partitionPath, file.getFileName())); } } return list.iterator(); }); } @Override public boolean rollbackCommit(String commitTime) { // Nope, don't need to do anything. return true; } /** * When we subpartition records going into a partition, we still need to check them against all * the files within the partition. Thus, we need to explode the (partition, file) pairs to * (partition_subpartnum, file), so we can later join. */ private JavaPairRDD explodePartitionFilePairRDD(JavaPairRDD partitionFilePairRDD, final Map subpartitionCountMap) { return partitionFilePairRDD .map(partitionFilePair -> { List> explodedPartitionFilePairs = new ArrayList<>(); for (long l = 0; l < subpartitionCountMap.get(partitionFilePair._1); l++) { explodedPartitionFilePairs.add(new Tuple2<>( String.format("%s#%d", partitionFilePair._1, l), partitionFilePair._2)); } return explodedPartitionFilePairs; }) .flatMapToPair(exploded -> exploded.iterator()); } /** * To handle tons of incoming records to a partition, we need to split them into groups or * create subpartitions. Here, we do a simple hash mod splitting, based on computed sub * partitions. */ private JavaPairRDD splitPartitionRecordKeysPairRDD(JavaPairRDD partitionRecordKeyPairRDD, final Map subpartitionCountMap) { return partitionRecordKeyPairRDD .mapToPair(partitionRecordKeyPair -> { long subpart = Math.abs(partitionRecordKeyPair._2.hashCode()) % subpartitionCountMap.get(partitionRecordKeyPair._1); return new Tuple2<>( String.format("%s#%d", partitionRecordKeyPair._1, subpart), partitionRecordKeyPair._2); }); } /** * Its crucial to pick the right parallelism. * * totalSubPartitions : this is deemed safe limit, to be nice with Spark. inputParallelism : * typically number of input files. * * We pick the max such that, we are always safe, but go higher if say a there are a lot of * input files. (otherwise, we will fallback to number of partitions in input and end up with * slow performance) */ private int determineParallelism(int inputParallelism, final Map subpartitionCountMap) { // size the join parallelism to max(total number of sub partitions, total number of files). int totalSubparts = 0; for (long subparts : subpartitionCountMap.values()) { totalSubparts += (int) subparts; } // If bloom index parallelism is set, use it to to check against the input parallelism and take the max int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); int joinParallelism = Math.max(totalSubparts, indexParallelism); logger.info("InputParallelism: ${" + inputParallelism + "}, " + "IndexParallelism: ${" + config.getBloomIndexParallelism() + "}, " + "TotalSubParts: ${" + totalSubparts + "}, " + "Join Parallelism set to : " + joinParallelism); return joinParallelism; } /** * Find out pair. All workload grouped by file-level. * * // Join PairRDD(PartitionPath, RecordKey) and PairRDD(PartitionPath, File) & then repartition * such that // each RDD partition is a file, then for each file, we do (1) load bloom filter, * (2) load rowKeys, (3) Tag rowKey // Make sure the parallelism is atleast the groupby * parallelism for tagging location */ private JavaPairRDD findMatchingFilesForRecordKeys(JavaPairRDD partitionFilePairRDD, JavaPairRDD partitionRecordKeyPairRDD, final Map subpartitionCountMap) { // prepare the two RDDs and their join parallelism JavaPairRDD subpartitionFilePairRDD = explodePartitionFilePairRDD(partitionFilePairRDD, subpartitionCountMap); JavaPairRDD subpartitionRecordKeyPairRDD = splitPartitionRecordKeysPairRDD(partitionRecordKeyPairRDD, subpartitionCountMap); int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), subpartitionCountMap); // Perform a join, to bring all the files in each subpartition ,together with the record keys to be tested against them JavaPairRDD> joinedTripletRDD = subpartitionFilePairRDD .join(subpartitionRecordKeyPairRDD, joinParallelism); // sort further based on filename, such that all checking for the file can happen within a single partition, on-the-fly JavaPairRDD> fileSortedTripletRDD = joinedTripletRDD /** * Incoming triplet is (partitionPath_subpart) => (file, recordKey) */ .mapToPair(joinedTriplet -> { String partitionPath = joinedTriplet._1.split("#")[0]; // throw away the subpart String fileName = joinedTriplet._2._1; String recordKey = joinedTriplet._2._2; // make a sort key as #, to handle skews return new Tuple2<>(String.format("%s#%s", fileName, recordKey), new Tuple2<>(fileName, new HoodieKey(recordKey, partitionPath))); }).sortByKey(true, joinParallelism); return fileSortedTripletRDD .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(config.getBasePath()), true) .flatMap(indexLookupResults -> indexLookupResults.iterator()) .filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0) .flatMapToPair(lookupResult -> { List> vals = new ArrayList<>(); for (String recordKey : lookupResult.getMatchingRecordKeys()) { vals.add(new Tuple2<>(recordKey, lookupResult.getFileName())); } return vals.iterator(); }); } /** * Tag the back to the original HoodieRecord RDD. */ private JavaRDD> tagLocationBacktoRecords(JavaPairRDD rowKeyFilenamePairRDD, JavaRDD> recordRDD) { JavaPairRDD> rowKeyRecordPairRDD = recordRDD .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record)); // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join. return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map( v1 -> { HoodieRecord record = v1._1(); if (v1._2().isPresent()) { String filename = v1._2().get(); if (filename != null && !filename.isEmpty()) { record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename))); } } return record; } ); } @Override public JavaRDD updateLocation(JavaRDD writeStatusRDD, HoodieTable hoodieTable) { return writeStatusRDD; } }