Remove redundant string from file comp rdd
This commit is contained in:
committed by
vinoth chandar
parent
a7e6cf5197
commit
3fd2fd6e9d
@@ -19,12 +19,14 @@ package com.uber.hoodie.index.bloom;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.uber.hoodie.common.util.collection.Pair;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.Partitioner;
|
||||
@@ -139,11 +141,10 @@ public class BucketizedBloomCheckPartitioner extends Partitioner {
|
||||
|
||||
@Override
|
||||
public int getPartition(Object key) {
|
||||
String[] parts = ((String) key).split("#");
|
||||
String fileName = parts[0];
|
||||
final long hashOfKey = Hashing.md5().hashString(parts[1], StandardCharsets.UTF_8).asLong();
|
||||
List<Integer> candidatePartitions = fileGroupToPartitions.get(fileName);
|
||||
int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
|
||||
final Pair<String, String> parts = (Pair<String, String>) key;
|
||||
final long hashOfKey = Hashing.md5().hashString(parts.getRight(), StandardCharsets.UTF_8).asLong();
|
||||
final List<Integer> candidatePartitions = fileGroupToPartitions.get(parts.getLeft());
|
||||
final int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size());
|
||||
assert idx >= 0;
|
||||
return candidatePartitions.get(idx);
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.ParquetUtils;
|
||||
import com.uber.hoodie.common.util.collection.Pair;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.MetadataNotFoundException;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
@@ -42,9 +43,12 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.Partitioner;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
@@ -171,7 +175,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
||||
// we will just try exploding the input and then count to determine comparisons
|
||||
// FIX(vc): Only do sampling here and extrapolate?
|
||||
fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo,
|
||||
partitionRecordKeyPairRDD).mapToPair(t -> t._2()).countByKey();
|
||||
partitionRecordKeyPairRDD).mapToPair(t -> t).countByKey();
|
||||
} else {
|
||||
fileToComparisons = new HashMap<>();
|
||||
partitionToFileInfo.entrySet().stream().forEach(e -> {
|
||||
@@ -290,8 +294,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be
|
||||
* checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files
|
||||
@@ -301,24 +303,21 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
||||
* recordKey ranges in the index info.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
||||
IndexFileFilter indexFileFilter = config.useBloomIndexTreebasedFilter()
|
||||
? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
|
||||
: new ListBasedIndexFileFilter(partitionToFileIndexInfo);
|
||||
|
||||
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
|
||||
String recordKey = partitionRecordKeyPair._2();
|
||||
String partitionPath = partitionRecordKeyPair._1();
|
||||
List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
|
||||
indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
|
||||
recordComparisons.add(
|
||||
new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
|
||||
new Tuple2<>(matchingFile,
|
||||
new HoodieKey(recordKey, partitionPath))));
|
||||
});
|
||||
return recordComparisons;
|
||||
}).flatMapToPair(List::iterator);
|
||||
|
||||
return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
|
||||
.map(matchingFile -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, partitionPath)))
|
||||
.collect(Collectors.toList());
|
||||
}).flatMap(List::iterator);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -332,28 +331,32 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient,
|
||||
Map<String, Long> fileGroupToComparisons) {
|
||||
JavaPairRDD<String, Tuple2<String, HoodieKey>> fileSortedTripletRDD =
|
||||
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
|
||||
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
|
||||
|
||||
if (config.useBloomIndexBucketizedChecking()) {
|
||||
BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism,
|
||||
fileGroupToComparisons, config.getBloomIndexKeysPerBucket());
|
||||
fileSortedTripletRDD = fileSortedTripletRDD.repartitionAndSortWithinPartitions(partitioner);
|
||||
Partitioner partitioner = new BucketizedBloomCheckPartitioner(
|
||||
shuffleParallelism,
|
||||
fileGroupToComparisons,
|
||||
config.getBloomIndexKeysPerBucket()
|
||||
);
|
||||
|
||||
fileComparisonsRDD = fileComparisonsRDD
|
||||
.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
|
||||
.repartitionAndSortWithinPartitions(partitioner)
|
||||
.map(Tuple2::_2);
|
||||
} else {
|
||||
// sort further based on filename, such that all checking for the file can happen within
|
||||
// a single partition, on-the-fly
|
||||
fileSortedTripletRDD = fileSortedTripletRDD.sortByKey(true, shuffleParallelism);
|
||||
fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
|
||||
}
|
||||
return fileSortedTripletRDD.mapPartitionsWithIndex(
|
||||
new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
|
||||
|
||||
return fileComparisonsRDD
|
||||
.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true)
|
||||
.flatMap(List::iterator)
|
||||
.filter(lookupResult -> lookupResult.getMatchingRecordKeys().size() > 0)
|
||||
.flatMapToPair(lookupResult -> {
|
||||
List<Tuple2<String, String>> vals = new ArrayList<>();
|
||||
for (String recordKey : lookupResult.getMatchingRecordKeys()) {
|
||||
vals.add(new Tuple2<>(recordKey, lookupResult.getFileName()));
|
||||
}
|
||||
return vals.iterator();
|
||||
});
|
||||
.filter(lr -> lr.getMatchingRecordKeys().size() > 0)
|
||||
.flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
|
||||
.map(recordKey -> new Tuple2<>(recordKey, lookupResult.getFileName()))
|
||||
.collect(Collectors.toList())
|
||||
.iterator());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -43,7 +43,7 @@ import scala.Tuple2;
|
||||
* actual files
|
||||
*/
|
||||
public class HoodieBloomIndexCheckFunction implements
|
||||
Function2<Integer, Iterator<Tuple2<String, Tuple2<String, HoodieKey>>>,
|
||||
Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
|
||||
Iterator<List<KeyLookupResult>>> {
|
||||
|
||||
private static Logger logger = LogManager.getLogger(HoodieBloomIndexCheckFunction.class);
|
||||
@@ -84,13 +84,13 @@ public class HoodieBloomIndexCheckFunction implements
|
||||
|
||||
@Override
|
||||
public Iterator<List<KeyLookupResult>> call(Integer partition,
|
||||
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> fileParitionRecordKeyTripletItr)
|
||||
Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr)
|
||||
throws Exception {
|
||||
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
|
||||
}
|
||||
|
||||
class LazyKeyCheckIterator extends
|
||||
LazyIterableIterator<Tuple2<String, Tuple2<String, HoodieKey>>, List<KeyLookupResult>> {
|
||||
LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
|
||||
|
||||
private List<String> candidateRecordKeys;
|
||||
|
||||
@@ -103,7 +103,7 @@ public class HoodieBloomIndexCheckFunction implements
|
||||
private long totalKeysChecked;
|
||||
|
||||
LazyKeyCheckIterator(
|
||||
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> filePartitionRecordKeyTripletItr) {
|
||||
Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
|
||||
super(filePartitionRecordKeyTripletItr);
|
||||
currentFile = null;
|
||||
candidateRecordKeys = new ArrayList<>();
|
||||
@@ -162,10 +162,10 @@ public class HoodieBloomIndexCheckFunction implements
|
||||
try {
|
||||
// process one file in each go.
|
||||
while (inputItr.hasNext()) {
|
||||
Tuple2<String, Tuple2<String, HoodieKey>> currentTuple = inputItr.next();
|
||||
String fileName = currentTuple._2._1;
|
||||
String partitionPath = currentTuple._2._2.getPartitionPath();
|
||||
String recordKey = currentTuple._2._2.getRecordKey();
|
||||
Tuple2<String, HoodieKey> currentTuple = inputItr.next();
|
||||
String fileName = currentTuple._1;
|
||||
String partitionPath = currentTuple._2.getPartitionPath();
|
||||
String recordKey = currentTuple._2.getRecordKey();
|
||||
|
||||
// lazily init state
|
||||
if (currentFile == null) {
|
||||
|
||||
@@ -27,12 +27,14 @@ import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import scala.Tuple2;
|
||||
|
||||
@@ -76,7 +78,7 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
||||
|
||||
@Override
|
||||
@VisibleForTesting
|
||||
JavaPairRDD<String, Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||
JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
|
||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
||||
Map<String, String> indexToPartitionMap = new HashMap<>();
|
||||
@@ -87,17 +89,14 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
||||
IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
|
||||
? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo)
|
||||
: new ListBasedGlobalIndexFileFilter(partitionToFileIndexInfo);
|
||||
|
||||
return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
|
||||
String recordKey = partitionRecordKeyPair._2();
|
||||
String partitionPath = partitionRecordKeyPair._1();
|
||||
List<Tuple2<String, Tuple2<String, HoodieKey>>> recordComparisons = new ArrayList<>();
|
||||
indexFileFilter.getMatchingFiles(partitionPath, recordKey).forEach(matchingFile -> {
|
||||
recordComparisons.add(
|
||||
new Tuple2<>(String.format("%s#%s", matchingFile, recordKey),
|
||||
new Tuple2<>(matchingFile,
|
||||
new HoodieKey(recordKey, indexToPartitionMap.get(matchingFile)))));
|
||||
});
|
||||
return recordComparisons;
|
||||
}).flatMapToPair(List::iterator);
|
||||
|
||||
return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
|
||||
.map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file))))
|
||||
.collect(Collectors.toList());
|
||||
}).flatMap(List::iterator);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,12 +259,12 @@ public class TestHoodieBloomIndex {
|
||||
new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
|
||||
new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t);
|
||||
|
||||
List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
|
||||
List<Tuple2<String, HoodieKey>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
|
||||
partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
|
||||
|
||||
assertEquals(10, comparisonKeyList.size());
|
||||
Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
|
||||
t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
|
||||
t -> t._2.getRecordKey(), Collectors.mapping(t -> t._1, Collectors.toList())));
|
||||
|
||||
assertEquals(4, recordKeyToFileComps.size());
|
||||
assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002")));
|
||||
|
||||
@@ -190,25 +190,25 @@ public class TestHoodieGlobalBloomIndex {
|
||||
new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"),
|
||||
new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t);
|
||||
|
||||
List<Tuple2<String, Tuple2<String, HoodieKey>>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons(
|
||||
partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
|
||||
List<Tuple2<String, HoodieKey>> comparisonKeyList =
|
||||
index.explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect();
|
||||
|
||||
/* epecting:
|
||||
f4#003, f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
|
||||
f1#003, f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
|
||||
f3#003, f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
|
||||
f4#002, f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
|
||||
f1#002, f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
|
||||
f3#002, f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
|
||||
f4#005, f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
|
||||
f1#005, f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
|
||||
f4#004, f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
|
||||
f1#004, f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
|
||||
/* expecting:
|
||||
f4, HoodieKey { recordKey=003 partitionPath=2017/10/23}
|
||||
f1, HoodieKey { recordKey=003 partitionPath=2017/10/22}
|
||||
f3, HoodieKey { recordKey=003 partitionPath=2017/10/22}
|
||||
f4, HoodieKey { recordKey=002 partitionPath=2017/10/23}
|
||||
f1, HoodieKey { recordKey=002 partitionPath=2017/10/22}
|
||||
f3, HoodieKey { recordKey=002 partitionPath=2017/10/22}
|
||||
f4, HoodieKey { recordKey=005 partitionPath=2017/10/23}
|
||||
f1, HoodieKey { recordKey=005 partitionPath=2017/10/22}
|
||||
f4, HoodieKey { recordKey=004 partitionPath=2017/10/23}
|
||||
f1, HoodieKey { recordKey=004 partitionPath=2017/10/22}
|
||||
*/
|
||||
assertEquals(10, comparisonKeyList.size());
|
||||
|
||||
Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy(
|
||||
t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList())));
|
||||
Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
|
||||
.collect(Collectors.groupingBy(t -> t._2.getRecordKey(), Collectors.mapping(Tuple2::_1, Collectors.toList())));
|
||||
|
||||
assertEquals(4, recordKeyToFileComps.size());
|
||||
assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002")));
|
||||
|
||||
Reference in New Issue
Block a user