[MINOR] Improve code readability by passing in the fileComparisonsRDD in bloom index (#2319)
This commit is contained in:
@@ -122,13 +122,15 @@ public class SparkHoodieBloomIndex<T extends HoodieRecordPayload> extends SparkH
|
|||||||
|
|
||||||
// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
|
// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
|
||||||
// that contains it.
|
// that contains it.
|
||||||
|
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
|
||||||
|
explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD);
|
||||||
Map<String, Long> comparisonsPerFileGroup =
|
Map<String, Long> comparisonsPerFileGroup =
|
||||||
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
|
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, fileComparisonsRDD);
|
||||||
int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
|
int inputParallelism = partitionRecordKeyPairRDD.partitions().size();
|
||||||
int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
|
int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
|
||||||
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
|
LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
|
||||||
+ config.getBloomIndexParallelism() + "}");
|
+ config.getBloomIndexParallelism() + "}");
|
||||||
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
|
return findMatchingFilesForRecordKeys(fileComparisonsRDD, joinParallelism, hoodieTable,
|
||||||
comparisonsPerFileGroup);
|
comparisonsPerFileGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,14 +139,12 @@ public class SparkHoodieBloomIndex<T extends HoodieRecordPayload> extends SparkH
|
|||||||
*/
|
*/
|
||||||
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
|
private Map<String, Long> computeComparisonsPerFileGroup(final Map<String, Long> recordsPerPartition,
|
||||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
|
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
|
||||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
|
final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD) {
|
||||||
|
|
||||||
Map<String, Long> fileToComparisons;
|
Map<String, Long> fileToComparisons;
|
||||||
if (config.getBloomIndexPruneByRanges()) {
|
if (config.getBloomIndexPruneByRanges()) {
|
||||||
// we will just try exploding the input and then count to determine comparisons
|
// we will just try exploding the input and then count to determine comparisons
|
||||||
// FIX(vc): Only do sampling here and extrapolate?
|
// FIX(vc): Only do sampling here and extrapolate?
|
||||||
fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairRDD)
|
fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
|
||||||
.mapToPair(t -> t).countByKey();
|
|
||||||
} else {
|
} else {
|
||||||
fileToComparisons = new HashMap<>();
|
fileToComparisons = new HashMap<>();
|
||||||
partitionToFileInfo.forEach((key, value) -> {
|
partitionToFileInfo.forEach((key, value) -> {
|
||||||
@@ -252,11 +252,10 @@ public class SparkHoodieBloomIndex<T extends HoodieRecordPayload> extends SparkH
|
|||||||
* Make sure the parallelism is atleast the groupby parallelism for tagging location
|
* Make sure the parallelism is atleast the groupby parallelism for tagging location
|
||||||
*/
|
*/
|
||||||
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
|
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
|
||||||
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
|
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD,
|
||||||
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
|
int shuffleParallelism,
|
||||||
|
HoodieTable hoodieTable,
|
||||||
Map<String, Long> fileGroupToComparisons) {
|
Map<String, Long> fileGroupToComparisons) {
|
||||||
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
|
|
||||||
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
|
|
||||||
|
|
||||||
if (config.useBloomIndexBucketizedChecking()) {
|
if (config.useBloomIndexBucketizedChecking()) {
|
||||||
Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
|
Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
|
||||||
|
|||||||
Reference in New Issue
Block a user