diff --git a/.gitignore b/.gitignore index fa0127ec2..3304d7088 100644 --- a/.gitignore +++ b/.gitignore @@ -68,3 +68,10 @@ local.properties # Maven ####################################### dependency-reduced-pom.xml + + +####################################### +# Docker +####################################### +hoodie-integ-test/compose_env + diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java index 3e4772a7d..3e4a1bffb 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieIndexConfig.java @@ -47,6 +47,15 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true"; public static final String BLOOM_INDEX_TREE_BASED_FILTER_PROP = "hoodie.bloom.index.use.treebased.filter"; public static final String DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER = "true"; + // TODO: On by default. Once stable, we will remove the other mode. + public static final String BLOOM_INDEX_BUCKETIZED_CHECKING_PROP = "hoodie.bloom.index.bucketized.checking"; + public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING = "true"; + // 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter. + // 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions. + public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket"; + public static final String DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET = "10000000"; + + public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage" + ".level"; public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; @@ -118,6 +127,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder bloomIndexBucketizedChecking(boolean bucketizedChecking) { + props.setProperty(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, String.valueOf(bucketizedChecking)); + return this; + } + + public Builder bloomIndexKeysPerBucket(int keysPerBucket) { + props.setProperty(BLOOM_INDEX_KEYS_PER_BUCKET_PROP, String.valueOf(keysPerBucket)); + return this; + } + public Builder withBloomIndexInputStorageLevel(String level) { props.setProperty(BLOOM_INDEX_INPUT_STORAGE_LEVEL, level); return this; @@ -141,6 +160,10 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP), BLOOM_INDEX_TREE_BASED_FILTER_PROP, DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER); + setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP), + BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING); + setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_KEYS_PER_BUCKET_PROP), + BLOOM_INDEX_KEYS_PER_BUCKET_PROP, DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); return config; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index ab6570f31..0834e1f3f 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -328,6 +328,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER_PROP)); } + public boolean useBloomIndexBucketizedChecking() { + return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING_PROP)); + } + + public int getBloomIndexKeysPerBucket() { + return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET_PROP)); + } + public StorageLevel getBloomIndexInputStorageLevel() { return StorageLevel .fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java new file mode 100644 index 000000000..5bc47e2fa --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BucketizedBloomCheckPartitioner.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.uber.hoodie.index.bloom; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.Partitioner; + +/** + * Partitions bloom filter checks by spreading out comparisons across buckets of work. + * + * Each bucket incurs the following cost + *
+ *   1) Read bloom filter from file footer
+ *   2) Check keys against bloom filter
+ *   3) [Conditional] If any key had a hit, open file and check
+ * 
+ * + * The partitioner performs a two phase bin packing algorithm, to pack enough work into each bucket such that cost of + * (1) & (3) is amortized. Also, avoids any skews in the sort based approach, by directly partitioning by the file to be + * checked against and ensuring each partition has similar number of buckets. Performance tests show that this approach + * could bound the amount of skew to std_dev(numberOfBucketsPerPartition) * cost of (3), lower than sort partitioning. + * + * Approach has two goals : + *
+ *   1) Pack as many buckets from same file group into same partition, to amortize cost of (1) and (2) further
+ *   2) Spread buckets across partitions evenly to achieve skew reduction
+ * 
+ */ +public class BucketizedBloomCheckPartitioner extends Partitioner { + + private static Logger logger = LogManager.getLogger(BucketizedBloomCheckPartitioner.class); + + private int partitions; + + /** + * Stores the final mapping of a file group to a list of partitions for its keys. + */ + private Map> fileGroupToPartitions; + + /** + * Create a partitioner that computes a plan based on provided workload characteristics. + * + * @param targetPartitions maximum number of partitions to target + * @param fileGroupToComparisons number of expected comparisons per file group + * @param keysPerBucket maximum number of keys to pack in a single bucket + */ + public BucketizedBloomCheckPartitioner(int targetPartitions, Map fileGroupToComparisons, + int keysPerBucket) { + this.fileGroupToPartitions = new HashMap<>(); + + Map bucketsPerFileGroup = new HashMap<>(); + // Compute the buckets needed per file group, using simple uniform distribution + fileGroupToComparisons.forEach((f, c) -> + bucketsPerFileGroup.put(f, (int) Math.ceil((c * 1.0) / keysPerBucket))); + int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> i).sum(); + // If totalBuckets > targetPartitions, no need to have extra partitions + this.partitions = Math.min(targetPartitions, totalBuckets); + + // PHASE 1 : start filling upto minimum number of buckets into partitions, taking all but one bucket from each file + // This tries to first optimize for goal 1 above, with knowledge that each partition needs a certain minimum number + // of buckets and assigns buckets in the same order as file groups. If we were to simply round robin, then buckets + // for a file group is more or less guaranteed to be placed on different partitions all the time. + int minBucketsPerPartition = Math.max((int) Math.floor((1.0 * totalBuckets) / partitions), 1); + logger.info(String.format("TotalBuckets %d, min_buckets/partition %d", totalBuckets, minBucketsPerPartition)); + int[] bucketsFilled = new int[partitions]; + Map bucketsFilledPerFileGroup = new HashMap<>(); + int partitionIndex = 0; + for (Map.Entry e : bucketsPerFileGroup.entrySet()) { + for (int b = 0; b < Math.max(1, e.getValue() - 1); b++) { + // keep filled counts upto date + bucketsFilled[partitionIndex]++; + AtomicInteger cnt = bucketsFilledPerFileGroup.getOrDefault(e.getKey(), new AtomicInteger(0)); + cnt.incrementAndGet(); + bucketsFilledPerFileGroup.put(e.getKey(), cnt); + + // mark this partition against the file group + List partitionList = this.fileGroupToPartitions.getOrDefault(e.getKey(), new ArrayList<>()); + partitionList.add(partitionIndex); + this.fileGroupToPartitions.put(e.getKey(), partitionList); + + // switch to new partition if needed + if (bucketsFilled[partitionIndex] >= minBucketsPerPartition) { + partitionIndex = (partitionIndex + 1) % partitions; + } + } + } + + // PHASE 2 : for remaining unassigned buckets, round robin over partitions once. Since we withheld 1 bucket from + // each file group uniformly, this remaining is also an uniform mix across file groups. We just round robin to + // optimize for goal 2. + for (Map.Entry e : bucketsPerFileGroup.entrySet()) { + int remaining = e.getValue() - bucketsFilledPerFileGroup.get(e.getKey()).intValue(); + for (int r = 0; r < remaining; r++) { + // mark this partition against the file group + this.fileGroupToPartitions.get(e.getKey()).add(partitionIndex); + bucketsFilled[partitionIndex]++; + partitionIndex = (partitionIndex + 1) % partitions; + } + } + + if (logger.isDebugEnabled()) { + logger.debug("Partitions assigned per file groups :" + fileGroupToPartitions); + StringBuilder str = new StringBuilder(); + for (int i = 0; i < bucketsFilled.length; i++) { + str.append("p" + i + " : " + bucketsFilled[i] + ","); + } + logger.debug("Num buckets assigned per file group :" + str); + } + } + + @Override + public int numPartitions() { + return partitions; + } + + @Override + public int getPartition(Object key) { + String[] parts = ((String) key).split("#"); + String fileName = parts[0]; + final long hashOfKey = Hashing.md5().hashString(parts[1], StandardCharsets.UTF_8).asLong(); + List candidatePartitions = fileGroupToPartitions.get(fileName); + int idx = (int) Math.floorMod(hashOfKey, candidatePartitions.size()); + assert idx >= 0; + return candidatePartitions.get(idx); + } + + @VisibleForTesting + Map> getFileGroupToPartitions() { + return fileGroupToPartitions; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java index 9ac8db0b8..ccafc562b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java @@ -39,9 +39,9 @@ import com.uber.hoodie.exception.MetadataNotFoundException; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.table.HoodieTable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -151,53 +151,53 @@ public class HoodieBloomIndex extends HoodieIndex // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id, // that contains it. - int parallelism = autoComputeParallelism(recordsPerPartition, partitionToFileInfo, + Map comparisonsPerFileGroup = computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD); - return findMatchingFilesForRecordKeys(partitionToFileInfo, - partitionRecordKeyPairRDD, parallelism, hoodieTable.getMetaClient()); + int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup); + int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism); + return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, + hoodieTable.getMetaClient(), comparisonsPerFileGroup); } /** - * The index lookup can be skewed in three dimensions : #files, #partitions, #records

To be able to smoothly - * handle skews, we need to compute how to split each partitions into subpartitions. We do it here, in a way that - * keeps the amount of each Spark join partition to < 2GB.

If - * {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} - * is specified as a NON-zero number, then that is used explicitly. + * Compute the estimated number of bloom filter comparisons to be performed on each file group */ - private int autoComputeParallelism(final Map recordsPerPartition, + private Map computeComparisonsPerFileGroup(final Map recordsPerPartition, final Map> partitionToFileInfo, JavaPairRDD partitionRecordKeyPairRDD) { - long totalComparisons = 0; + Map fileToComparisons; if (config.getBloomIndexPruneByRanges()) { // we will just try exploding the input and then count to determine comparisons - totalComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, - partitionRecordKeyPairRDD).count(); + // FIX(vc): Only do sampling here and extrapolate? + fileToComparisons = explodeRecordRDDWithFileComparisons(partitionToFileInfo, + partitionRecordKeyPairRDD).mapToPair(t -> t._2()).countByKey(); } else { - // if not pruning by ranges, then each file in a partition needs to compared against all - // records for a partition. - Map filesPerPartition = partitionToFileInfo.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> Long.valueOf(e.getValue().size()))); - long totalFiles = 0; - long totalRecords = 0; - for (String partitionPath : recordsPerPartition.keySet()) { - long numRecords = recordsPerPartition.get(partitionPath); - long numFiles = - filesPerPartition.getOrDefault(partitionPath, 1L); - - totalComparisons += numFiles * numRecords; - totalFiles += - filesPerPartition.getOrDefault(partitionPath, 0L); - totalRecords += numRecords; - } - logger.info("TotalRecords: " + totalRecords + ", TotalFiles: " + totalFiles - + ", TotalAffectedPartitions:" + recordsPerPartition.size()); + fileToComparisons = new HashMap<>(); + partitionToFileInfo.entrySet().stream().forEach(e -> { + for (BloomIndexFileInfo fileInfo : e.getValue()) { + //each file needs to be compared against all the records coming into the partition + fileToComparisons.put(fileInfo.getFileName(), recordsPerPartition.get(e.getKey())); + } + }); } + return fileToComparisons; + } - // each partition will have an item per comparison. + /** + * Compute the minimum parallelism needed to play well with the spark 2GB limitation.. The index lookup can be skewed + * in three dimensions : #files, #partitions, #records

To be able to smoothly handle skews, we need to compute how + * to split each partitions into subpartitions. We do it here, in a way that keeps the amount of each Spark join + * partition to < 2GB.

If {@link com.uber.hoodie.config.HoodieIndexConfig#BLOOM_INDEX_PARALLELISM_PROP} is + * specified as a NON-zero number, then that is used explicitly. + */ + int computeSafeParallelism(Map recordsPerPartition, Map comparisonsPerFileGroup) { + long totalComparisons = comparisonsPerFileGroup.values().stream().mapToLong(Long::longValue).sum(); + long totalFiles = comparisonsPerFileGroup.size(); + long totalRecords = recordsPerPartition.values().stream().mapToLong(Long::longValue).sum(); int parallelism = (int) (totalComparisons / MAX_ITEMS_PER_SHUFFLE_PARTITION + 1); - logger.info( - "Auto computed parallelism :" + parallelism + ", totalComparisons: " + totalComparisons); + logger.info(String.format("TotalRecords %d, TotalFiles %d, TotalAffectedPartitions %d, TotalComparisons %d, " + + "SafeParallelism %d", totalRecords, totalFiles, recordsPerPartition.size(), totalComparisons, parallelism)); return parallelism; } @@ -329,18 +329,19 @@ public class HoodieBloomIndex extends HoodieIndex @VisibleForTesting JavaPairRDD findMatchingFilesForRecordKeys( final Map> partitionToFileIndexInfo, - JavaPairRDD partitionRecordKeyPairRDD, int totalSubpartitions, HoodieTableMetaClient metaClient) { - - int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), - totalSubpartitions); - + JavaPairRDD partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTableMetaClient metaClient, + Map fileGroupToComparisons) { JavaPairRDD> fileSortedTripletRDD = - explodeRecordRDDWithFileComparisons( - partitionToFileIndexInfo, partitionRecordKeyPairRDD) - // sort further based on filename, such that all checking for the file can happen within - // a single partition, on-the-fly - .sortByKey(true, joinParallelism); - + explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD); + if (config.useBloomIndexBucketizedChecking()) { + BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, + fileGroupToComparisons, config.getBloomIndexKeysPerBucket()); + fileSortedTripletRDD = fileSortedTripletRDD.repartitionAndSortWithinPartitions(partitioner); + } else { + // sort further based on filename, such that all checking for the file can happen within + // a single partition, on-the-fly + fileSortedTripletRDD = fileSortedTripletRDD.sortByKey(true, shuffleParallelism); + } return fileSortedTripletRDD.mapPartitionsWithIndex( new HoodieBloomIndexCheckFunction(metaClient, config.getBasePath()), true) .flatMap(List::iterator) diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestBucketizedBloomCheckPartitioner.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestBucketizedBloomCheckPartitioner.java new file mode 100644 index 000000000..072e6062a --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestBucketizedBloomCheckPartitioner.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.index.bloom; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import com.uber.hoodie.common.util.collection.Pair; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.Test; + +public class TestBucketizedBloomCheckPartitioner { + + @Test + public void testAssignmentCorrectness() { + Map fileToComparisons = new HashMap() { + { + put("f1", 40L); + put("f2", 35L); + put("f3", 20L); + } + }; + BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(4, fileToComparisons, 10); + Map> assignments = p.getFileGroupToPartitions(); + assertEquals("f1 should have 4 buckets", 4, assignments.get("f1").size()); + assertEquals("f2 should have 4 buckets", 4, assignments.get("f2").size()); + assertEquals("f3 should have 2 buckets", 2, assignments.get("f3").size()); + assertArrayEquals("f1 spread across 3 partitions", new Integer[]{0, 0, 1, 3}, + assignments.get("f1").toArray()); + assertArrayEquals("f2 spread across 3 partitions", new Integer[]{1, 2, 2, 0}, + assignments.get("f2").toArray()); + assertArrayEquals("f3 spread across 2 partitions", new Integer[]{3, 1}, + assignments.get("f3").toArray()); + } + + @Test + public void testUniformPacking() { + // evenly distribute 10 buckets/file across 100 partitions + Map comparisons1 = new HashMap() { + { + IntStream.range(0, 10).forEach(f -> put("f" + f, 100L)); + } + }; + BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(100, comparisons1, 10); + Map> assignments = partitioner.getFileGroupToPartitions(); + assignments.entrySet().stream().forEach(e -> assertEquals(10, e.getValue().size())); + Map partitionToNumBuckets = assignments.entrySet().stream() + .flatMap(e -> e.getValue().stream().map(p -> Pair.of(p, e.getKey()))) + .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting())); + partitionToNumBuckets.entrySet().stream().forEach(e -> assertEquals(1L, e.getValue().longValue())); + } + + @Test + public void testNumPartitions() { + Map comparisons1 = new HashMap() { + { + IntStream.range(0, 10).forEach(f -> put("f" + f, 100L)); + } + }; + BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(10000, comparisons1, 10); + assertEquals("num partitions must equal total buckets", 100, p.numPartitions()); + } +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index a52881d2d..664c146d3 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -36,11 +36,18 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.commons.io.IOUtils; @@ -53,8 +60,11 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import scala.Tuple2; +@RunWith(Parameterized.class) public class TestHoodieBloomIndex { private JavaSparkContext jsc = null; @@ -63,7 +73,21 @@ public class TestHoodieBloomIndex { private String schemaStr; private Schema schema; - public TestHoodieBloomIndex() throws Exception { + private boolean rangePruning; + private boolean treeFiltering; + private boolean bucketizedChecking; + + @Parameterized.Parameters(name = "{index}: Test with rangePruning={0}, treeFiltering ={1}, bucketizedChecking is:{2}") + public static Collection data() { + Object[][] data = new Object[][]{{true, true, true}, {false, true, true}, {true, true, false}, + {true, false, true}}; + return Arrays.asList(data); + } + + public TestHoodieBloomIndex(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) { + this.rangePruning = rangePruning; + this.treeFiltering = treeFiltering; + this.bucketizedChecking = bucketizedChecking; } @Before @@ -91,6 +115,18 @@ public class TestHoodieBloomIndex { } } + private HoodieWriteConfig makeConfig() { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withIndexConfig(HoodieIndexConfig.newBuilder() + .bloomIndexPruneByRanges(rangePruning) + .bloomIndexTreebasedFilter(treeFiltering) + .bloomIndexBucketizedChecking(bucketizedChecking) + .bloomIndexKeysPerBucket(2) + .build()) + .build(); + return config; + } + @Test public void testLoadUUIDsInMemory() throws IOException { // Create one RDD of hoodie record @@ -130,7 +166,7 @@ public class TestHoodieBloomIndex { @Test public void testLoadInvolvedFiles() throws IOException { - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieWriteConfig config = makeConfig(); HoodieBloomIndex index = new HoodieBloomIndex(config); // Create some partitions, and put some files @@ -188,58 +224,57 @@ public class TestHoodieBloomIndex { table = HoodieTable.getHoodieTable(metadata, config, jsc); filesList = index.loadInvolvedFiles(partitions, jsc, table); assertEquals(filesList.size(), 4); - // these files will not have the key ranges - assertNull(filesList.get(0)._2().getMaxRecordKey()); - assertNull(filesList.get(0)._2().getMinRecordKey()); - assertFalse(filesList.get(1)._2().hasKeyRanges()); - assertNotNull(filesList.get(2)._2().getMaxRecordKey()); - assertNotNull(filesList.get(2)._2().getMinRecordKey()); - assertTrue(filesList.get(3)._2().hasKeyRanges()); - // no longer sorted, but should have same files. + if (rangePruning) { + // these files will not have the key ranges + assertNull(filesList.get(0)._2().getMaxRecordKey()); + assertNull(filesList.get(0)._2().getMinRecordKey()); + assertFalse(filesList.get(1)._2().hasKeyRanges()); + assertNotNull(filesList.get(2)._2().getMaxRecordKey()); + assertNotNull(filesList.get(2)._2().getMinRecordKey()); + assertTrue(filesList.get(3)._2().hasKeyRanges()); - List> expected = Arrays.asList( - new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2_0_20160401010101.parquet")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1_0_20150312101010.parquet")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")), - new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003"))); - assertEquals(expected, filesList); - } + // no longer sorted, but should have same files. - @Test - public void testRangePruning() { - for (Boolean rangePruning : new boolean[]{false, true}) { - Map props = new HashMap<>(); - props.put("hoodie.bloom.index.prune.by" + ".ranges", rangePruning.toString()); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withProps(props).build(); - HoodieBloomIndex index = new HoodieBloomIndex(config); - - final Map> partitionToFileIndexInfo = new HashMap<>(); - partitionToFileIndexInfo.put("2017/10/22", Arrays.asList(new BloomIndexFileInfo("f1"), - new BloomIndexFileInfo("f2", "000", "000"), new BloomIndexFileInfo("f3", "001", "003"), - new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010"))); - - JavaPairRDD partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList( - new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), - new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t); - - List>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons( - partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); - - assertEquals(10, comparisonKeyList.size()); - Map> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy( - t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()))); - - assertEquals(4, recordKeyToFileComps.size()); - assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002"))); - assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("003"))); - assertEquals(new HashSet<>(Arrays.asList("f1", "f4")), new HashSet<>(recordKeyToFileComps.get("004"))); - assertEquals(new HashSet<>(Arrays.asList("f1", "f4")), new HashSet<>(recordKeyToFileComps.get("005"))); + List> expected = Arrays.asList( + new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2_0_20160401010101.parquet")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1_0_20150312101010.parquet")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003"))); + assertEquals(expected, filesList); } } @Test - public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedException, ClassNotFoundException { + public void testRangePruning() { + HoodieWriteConfig config = makeConfig(); + HoodieBloomIndex index = new HoodieBloomIndex(config); + + final Map> partitionToFileIndexInfo = new HashMap<>(); + partitionToFileIndexInfo.put("2017/10/22", Arrays.asList(new BloomIndexFileInfo("f1"), + new BloomIndexFileInfo("f2", "000", "000"), new BloomIndexFileInfo("f3", "001", "003"), + new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010"))); + + JavaPairRDD partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList( + new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), + new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t); + + List>> comparisonKeyList = index.explodeRecordRDDWithFileComparisons( + partitionToFileIndexInfo, partitionRecordKeyPairRDD).collect(); + + assertEquals(10, comparisonKeyList.size()); + Map> recordKeyToFileComps = comparisonKeyList.stream().collect(Collectors.groupingBy( + t -> t._2()._2().getRecordKey(), Collectors.mapping(t -> t._2()._1().split("#")[0], Collectors.toList()))); + + assertEquals(4, recordKeyToFileComps.size()); + assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002"))); + assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("003"))); + assertEquals(new HashSet<>(Arrays.asList("f1", "f4")), new HashSet<>(recordKeyToFileComps.get("004"))); + assertEquals(new HashSet<>(Arrays.asList("f1", "f4")), new HashSet<>(recordKeyToFileComps.get("005"))); + } + + @Test + public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedException { // Create some records to use String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," @@ -299,7 +334,7 @@ public class TestHoodieBloomIndex { JavaRDD recordRDD = jsc.emptyRDD(); // Also create the metadata and config HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieWriteConfig config = makeConfig(); HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); // Let's tag @@ -315,76 +350,71 @@ public class TestHoodieBloomIndex { @Test public void testTagLocation() throws Exception { - for (Boolean rangePruning : new boolean[]{false, true}) { - Map props = new HashMap<>(); - props.put("hoodie.bloom.index.prune.by" + ".ranges", rangePruning.toString()); - // We have some records to be tagged (two different partitions) + // We have some records to be tagged (two different partitions) + String rowKey1 = UUID.randomUUID().toString(); + String rowKey2 = UUID.randomUUID().toString(); + String rowKey3 = UUID.randomUUID().toString(); + String rowKey4 = UUID.randomUUID().toString(); + String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + String recordStr4 = "{\"_row_key\":\"" + rowKey4 + "\"," + + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; + TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); + HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), + rowChange1); + TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); + HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), + rowChange2); + TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); + HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), + rowChange3); + TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); + HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), + rowChange4); + JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); - String rowKey1 = UUID.randomUUID().toString(); - String rowKey2 = UUID.randomUUID().toString(); - String rowKey3 = UUID.randomUUID().toString(); - String rowKey4 = UUID.randomUUID().toString(); - String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; - String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\"," - + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; - String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\"," - + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; - String recordStr4 = "{\"_row_key\":\"" + rowKey4 + "\"," - + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; - TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); - HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), - rowChange1); - TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2); - HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), - rowChange2); - TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3); - HoodieRecord record3 = new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), - rowChange3); - TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4); - HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), - rowChange4); - JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4)); + // Also create the metadata and config + HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieWriteConfig config = makeConfig(); + HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); - // Also create the metadata and config - HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withProps(props).build(); - HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); + // Let's tag + HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); + JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); - // Let's tag - HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); - JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); + // Should not find any files + for (HoodieRecord record : taggedRecordRDD.collect()) { + assertFalse(record.isCurrentLocationKnown()); + } - // Should not find any files - for (HoodieRecord record : taggedRecordRDD.collect()) { - assertFalse(record.isCurrentLocationKnown()); - } + // We create three parquet file, each having one record. (two different partitions) + String filename1 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true); + String filename2 = + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true); + String filename3 = + HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true); - // We create three parquet file, each having one record. (two different partitions) - String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, null, true); - String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record2), schema, null, true); - String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true); + // We do the tag again + metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + table = HoodieTable.getHoodieTable(metadata, config, jsc); - // We do the tag again - metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metadata, config, jsc); + taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); - taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); - - // Check results - for (HoodieRecord record : taggedRecordRDD.collect()) { - if (record.getRecordKey().equals(rowKey1)) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename1))); - } else if (record.getRecordKey().equals(rowKey2)) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2))); - } else if (record.getRecordKey().equals(rowKey3)) { - assertTrue(!record.isCurrentLocationKnown()); - } else if (record.getRecordKey().equals(rowKey4)) { - assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); - } + // Check results + for (HoodieRecord record : taggedRecordRDD.collect()) { + if (record.getRecordKey().equals(rowKey1)) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename1))); + } else if (record.getRecordKey().equals(rowKey2)) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2))); + } else if (record.getRecordKey().equals(rowKey3)) { + assertTrue(!record.isCurrentLocationKnown()); + } else if (record.getRecordKey().equals(rowKey4)) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); } } } @@ -417,7 +447,7 @@ public class TestHoodieBloomIndex { // Also create the metadata and config HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieWriteConfig config = makeConfig(); HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); // Let's tag @@ -490,7 +520,7 @@ public class TestHoodieBloomIndex { // We do the tag JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2)); HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieWriteConfig config = makeConfig(); HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc); HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);