diff --git a/LICENSE b/LICENSE index 17461664d..325ba653b 100644 --- a/LICENSE +++ b/LICENSE @@ -293,3 +293,31 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ------------------------------------------------------------------------------- + +This product includes code from org.apache.hadoop. + +* org.apache.hudi.common.bloom.filter.InternalDynamicBloomFilter.java adapted from org.apache.hadoop.util.bloom.DynamicBloomFilter.java + +* org.apache.hudi.common.bloom.filter.InternalFilter copied from classes in org.apache.hadoop.util.bloom package + +with the following license + +Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org) + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + ------------------------------------------------------------------------------- diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index 7dc8b2b60..f6daea299 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -17,21 +17,18 @@ package org.apache.hudi.cli -import java.util -import java.util.Map - import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.avro.HoodieAvroWriteSupport +import org.apache.hudi.common.HoodieJsonPayload +import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.ParquetUtils -import org.apache.hudi.common.{BloomFilter, HoodieJsonPayload} import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} import org.apache.hudi.io.storage.{HoodieParquetConfig, HoodieParquetWriter} import org.apache.parquet.avro.AvroSchemaConverter -import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -44,7 +41,8 @@ object SparkHelpers { def skipKeysAndWriteNewFile(commitTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) { val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile) val schema: Schema = sourceRecords.get(0).getSchema - val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble) + val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble, + HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE); val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter) val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble) val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 24e5949c9..f4672e93c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -18,6 +18,7 @@ package org.apache.hudi.config; +import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode; import org.apache.hudi.index.HoodieIndex; import javax.annotation.concurrent.Immutable; @@ -27,7 +28,6 @@ import java.io.FileReader; import java.io.IOException; import java.util.Properties; - /** * Indexing related config. */ @@ -54,6 +54,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { // TODO: On by default. Once stable, we will remove the other mode. public static final String BLOOM_INDEX_BUCKETIZED_CHECKING_PROP = "hoodie.bloom.index.bucketized.checking"; public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING = "true"; + public static final String BLOOM_INDEX_FILTER_TYPE = "hoodie.bloom.index.filter.type"; + public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = BloomFilterTypeCode.SIMPLE.name(); + public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "hoodie.bloom.index.filter.dynamic.max.entries"; + public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "100000"; + // 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter. // 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions. public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket"; @@ -194,6 +199,10 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_KEYS_PER_BUCKET_PROP), BLOOM_INDEX_KEYS_PER_BUCKET_PROP, DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET); + setDefaultOnCondition(props, !props.contains(BLOOM_INDEX_FILTER_TYPE), + BLOOM_INDEX_FILTER_TYPE, DEFAULT_BLOOM_INDEX_FILTER_TYPE); + setDefaultOnCondition(props, !props.contains(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES), + HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES); // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); return config; diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4db8e4d41..488ad75ff 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -96,6 +96,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + private ConsistencyGuardConfig consistencyGuardConfig; // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled @@ -369,6 +370,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); } + public String getBloomFilterType() { + return props.getProperty(HoodieIndexConfig.BLOOM_INDEX_FILTER_TYPE); + } + + public int getDynamicBloomFilterMaxNumEntries() { + return Integer.parseInt(props.getProperty(HoodieIndexConfig.HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES)); + } + /** * Fraction of the global share of QPS that should be allocated to this job. Let's say there are 3 jobs which have * input size in terms of number of rows required for HbaseIndexing as x, 2x, 3x respectively. Then this fraction for diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index ca1dc5af2..9f3bdbbdd 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -18,7 +18,7 @@ package org.apache.hudi.io; -import org.apache.hudi.common.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilter; import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; @@ -55,7 +55,7 @@ public class HoodieKeyLookupHandle extends Hoodie private long totalKeysChecked; public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable hoodieTable, - Pair partitionPathFilePair) { + Pair partitionPathFilePair) { super(config, null, hoodieTable, partitionPathFilePair); this.tableType = hoodieTable.getMetaClient().getTableType(); this.candidateRecordKeys = new ArrayList<>(); @@ -70,7 +70,7 @@ public class HoodieKeyLookupHandle extends Hoodie * Given a list of row keys and one file, return only row keys existing in that file. */ public static List checkCandidatesAgainstFile(Configuration configuration, List candidateRecordKeys, - Path filePath) throws HoodieIndexException { + Path filePath) throws HoodieIndexException { List foundRecordKeys = new ArrayList<>(); try { // Load all rowKeys from the file, to double-confirm @@ -134,7 +134,7 @@ public class HoodieKeyLookupHandle extends Hoodie private final String partitionPath; public KeyLookupResult(String fileId, String partitionPath, String baseInstantTime, - List matchingRecordKeys) { + List matchingRecordKeys) { this.fileId = fileId; this.partitionPath = partitionPath; this.baseInstantTime = baseInstantTime; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java index e179572b2..c7ea85ef4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java @@ -19,7 +19,8 @@ package org.apache.hudi.io.storage; import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilterFactory; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -51,7 +52,10 @@ public class HoodieStorageWriterFactory { private static HoodieStorageWriter newParquetStorageWriter( String commitTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable) throws IOException { - BloomFilter filter = new BloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP()); + BloomFilter filter = BloomFilterFactory + .createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(), + config.getDynamicBloomFilterMaxNumEntries(), + config.getBloomFilterType()); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java index f80cb7b93..65fbe1f0e 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java @@ -21,6 +21,9 @@ package org.apache.hudi.common; import org.apache.hudi.HoodieReadClient; import org.apache.hudi.WriteStatus; import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.bloom.filter.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilterFactory; +import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieKey; @@ -147,7 +150,7 @@ public class HoodieClientTestUtils { } public static HashMap getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, - List commitsToReturn) throws IOException { + List commitsToReturn) throws IOException { HashMap fileIdToFullPath = new HashMap<>(); for (HoodieInstant commit : commitsToReturn) { HoodieCommitMetadata metadata = @@ -158,7 +161,7 @@ public class HoodieClientTestUtils { } public static Dataset readCommit(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline, - String commitTime) { + String commitTime) { HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); if (!commitTimeline.containsInstant(commitInstant)) { new HoodieException("No commit exists at " + commitTime); @@ -178,7 +181,7 @@ public class HoodieClientTestUtils { * Obtain all new data written into the Hoodie dataset since the given timestamp. */ public static Dataset readSince(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline, - String lastCommitTime) { + String lastCommitTime) { List commitsToReturn = commitTimeline.findInstantsAfter(lastCommitTime, Integer.MAX_VALUE).getInstants().collect(Collectors.toList()); try { @@ -195,7 +198,7 @@ public class HoodieClientTestUtils { * Reads the paths under the a hoodie dataset out as a DataFrame. */ public static Dataset read(JavaSparkContext jsc, String basePath, SQLContext sqlContext, FileSystem fs, - String... paths) { + String... paths) { List filteredPaths = new ArrayList<>(); try { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true); @@ -214,10 +217,11 @@ public class HoodieClientTestUtils { } public static String writeParquetFile(String basePath, String partitionPath, String filename, - List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { + List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { if (filter == null) { - filter = new BloomFilter(10000, 0.0000001); + filter = BloomFilterFactory + .createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); } HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); @@ -245,7 +249,7 @@ public class HoodieClientTestUtils { } public static String writeParquetFile(String basePath, String partitionPath, List records, - Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException { + Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException { Thread.sleep(1000); String commitTime = HoodieTestUtils.makeNewCommitTime(); String fileId = UUID.randomUUID().toString(); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 8a4d163a3..920985f3a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -19,9 +19,11 @@ package org.apache.hudi.index.bloom; import org.apache.hudi.HoodieClientTestHarness; -import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.TestRawTripPayload; +import org.apache.hudi.common.bloom.filter.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilterFactory; +import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -248,7 +250,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // We write record1, record2 to a parquet file, but the bloom filter contains (record1, // record2, record3). - BloomFilter filter = new BloomFilter(10000, 0.0000001); + BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); filter.add(record3.getRecordKey()); String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1, record2), schema, filter, true); @@ -451,7 +453,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); - BloomFilter filter = new BloomFilter(10000, 0.0000001); + BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, + BloomFilterTypeCode.SIMPLE.name()); filter.add(record2.getRecordKey()); String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, filter, true); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java index ddddc0c08..ba66c6e76 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java @@ -20,11 +20,11 @@ package org.apache.hudi.table; import org.apache.hudi.HoodieClientTestHarness; import org.apache.hudi.WriteStatus; -import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus; +import org.apache.hudi.common.bloom.filter.BloomFilter; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java index 392cd2329..8db227f9c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java @@ -18,7 +18,8 @@ package org.apache.hudi.avro; -import org.apache.hudi.common.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilter; +import org.apache.hudi.common.bloom.filter.HoodieDynamicBoundedBloomFilter; import org.apache.avro.Schema; import org.apache.parquet.avro.AvroWriteSupport; @@ -40,6 +41,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport { public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter"; public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key"; public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key"; + public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code"; public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, BloomFilter bloomFilter) { super(schema, avroSchema); @@ -55,6 +57,9 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport { extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey); extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey); } + if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { + extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name()); + } } return new WriteSupport.FinalizedWriteContext(extraMetaData); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/TestBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilter.java similarity index 53% rename from hudi-common/src/test/java/org/apache/hudi/common/TestBloomFilter.java rename to hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilter.java index 5ba1f13b7..7356a4826 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/TestBloomFilter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilter.java @@ -16,34 +16,35 @@ * limitations under the License. */ -package org.apache.hudi.common; - -import org.junit.Test; - -import java.io.IOException; +package org.apache.hudi.common.bloom.filter; /** - * Tests bloom filter {@link BloomFilter}. + * A Bloom filter interface. */ -public class TestBloomFilter { +public interface BloomFilter { - @Test - public void testAddKey() { - BloomFilter filter = new BloomFilter(100, 0.0000001); - filter.add("key1"); - assert (filter.mightContain("key1")); - } + /** + * Add a key to the {@link BloomFilter}. + * + * @param key the key to the added to the {@link BloomFilter} + */ + void add(String key); - @Test - public void testSerialize() throws IOException, ClassNotFoundException { - BloomFilter filter = new BloomFilter(1000, 0.0000001); - filter.add("key1"); - filter.add("key2"); - String filterStr = filter.serializeToString(); + /** + * Tests for key membership. + * + * @param key the key to be checked for membership + * @return {@code true} if key may be found, {@code false} if key is not found for sure. + */ + boolean mightContain(String key); - // Rebuild - BloomFilter newFilter = new BloomFilter(filterStr); - assert (newFilter.mightContain("key1")); - assert (newFilter.mightContain("key2")); - } + /** + * Serialize the bloom filter as a string. + */ + String serializeToString(); + + /** + * @return the bloom index type code. + **/ + BloomFilterTypeCode getBloomFilterTypeCode(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterFactory.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterFactory.java new file mode 100644 index 000000000..052a6f6ce --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom.filter; + +import org.apache.hadoop.util.hash.Hash; + +/** + * A Factory class to generate different versions of {@link BloomFilter}. + */ +public class BloomFilterFactory { + + /** + * Creates a new {@link BloomFilter} with the given args. + * + * @param numEntries total number of entries + * @param errorRate max allowed error rate + * @param bloomFilterTypeCode bloom filter type code + * @return the {@link BloomFilter} thus created + */ + public static BloomFilter createBloomFilter(int numEntries, double errorRate, int maxNumberOfEntries, + String bloomFilterTypeCode) { + if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) { + return new SimpleBloomFilter(numEntries, errorRate, Hash.MURMUR_HASH); + } else if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.DYNAMIC_V0.name())) { + return new HoodieDynamicBoundedBloomFilter(numEntries, errorRate, Hash.MURMUR_HASH, maxNumberOfEntries); + } else { + throw new IllegalArgumentException("Bloom Filter type code not recognizable " + bloomFilterTypeCode); + } + } + + /** + * Generate {@link BloomFilter} from serialized String. + * + * @param serString the serialized string of the {@link BloomFilter} + * @param bloomFilterTypeCode bloom filter type code as string + * @return the {@link BloomFilter} thus generated from the passed in serialized string + */ + public static BloomFilter fromString(String serString, String bloomFilterTypeCode) { + if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) { + return new SimpleBloomFilter(serString); + } else if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.DYNAMIC_V0.name())) { + return new HoodieDynamicBoundedBloomFilter(serString, BloomFilterTypeCode.DYNAMIC_V0); + } else { + throw new IllegalArgumentException("Bloom Filter type code not recognizable " + bloomFilterTypeCode); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterTypeCode.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterTypeCode.java new file mode 100644 index 000000000..d90bfd3ce --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterTypeCode.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom.filter; + +/** + * Bloom filter type codes. + * Please do not change the order of the entries. + */ +public enum BloomFilterTypeCode { + SIMPLE, + DYNAMIC_V0 +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterUtils.java new file mode 100644 index 000000000..85d57d4e7 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/BloomFilterUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom.filter; + +/** + * Bloom filter utils. + */ +class BloomFilterUtils { + + /** + * Used in computing the optimal Bloom filter size. This approximately equals 0.480453. + */ + private static final double LOG2_SQUARED = Math.log(2) * Math.log(2); + + /** + * @return the bitsize given the total number of entries and error rate. + */ + static int getBitSize(int numEntries, double errorRate) { + return (int) Math.ceil(numEntries * (-Math.log(errorRate) / LOG2_SQUARED)); + } + + /** + * @return the number of hashes given the bitsize and total number of entries. + */ + static int getNumHashes(int bitSize, int numEntries) { + // Number of the hash functions + return (int) Math.ceil(Math.log(2) * bitSize / numEntries); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/HoodieDynamicBoundedBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/HoodieDynamicBoundedBloomFilter.java new file mode 100644 index 000000000..46800cbae --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/HoodieDynamicBoundedBloomFilter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom.filter; + +import org.apache.hudi.exception.HoodieIndexException; + +import org.apache.hadoop.util.bloom.Key; + +import javax.xml.bind.DatatypeConverter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * Hoodie's dynamic bloom bounded bloom filter. This is based largely on Hadoop's DynamicBloomFilter, but with a bound + * on amount of entries to dynamically expand to. Once the entries added reach the bound, false positive ratio may not + * be guaranteed. + */ +public class HoodieDynamicBoundedBloomFilter implements BloomFilter { + + public static final String TYPE_CODE_PREFIX = "DYNAMIC"; + private InternalDynamicBloomFilter internalDynamicBloomFilter; + + /** + * Instantiates {@link HoodieDynamicBoundedBloomFilter} with the given args. + * + * @param numEntries The total number of entries. + * @param errorRate maximum allowable error rate. + * @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}). + * @return the {@link HoodieDynamicBoundedBloomFilter} thus created + */ + HoodieDynamicBoundedBloomFilter(int numEntries, double errorRate, int hashType, int maxNoOfEntries) { + // Bit size + int bitSize = BloomFilterUtils.getBitSize(numEntries, errorRate); + // Number of the hash functions + int numHashs = BloomFilterUtils.getNumHashes(bitSize, numEntries); + this.internalDynamicBloomFilter = new InternalDynamicBloomFilter(bitSize, numHashs, hashType, numEntries, + maxNoOfEntries); + } + + /** + * Generate {@link HoodieDynamicBoundedBloomFilter} from the given {@code serString} serialized string. + * + * @param serString the serialized string which represents the {@link HoodieDynamicBoundedBloomFilter} + * @param typeCode type code of the bloom filter + */ + HoodieDynamicBoundedBloomFilter(String serString, BloomFilterTypeCode typeCode) { + // ignoring the type code for now, since we have just one version + byte[] bytes = DatatypeConverter.parseBase64Binary(serString); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); + try { + internalDynamicBloomFilter = new InternalDynamicBloomFilter(); + internalDynamicBloomFilter.readFields(dis); + dis.close(); + } catch (IOException e) { + throw new HoodieIndexException("Could not deserialize BloomFilter instance", e); + } + } + + @Override + public void add(String key) { + internalDynamicBloomFilter.add(new Key(key.getBytes(StandardCharsets.UTF_8))); + } + + @Override + public boolean mightContain(String key) { + return internalDynamicBloomFilter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8))); + } + + @Override + public String serializeToString() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + try { + internalDynamicBloomFilter.write(dos); + byte[] bytes = baos.toByteArray(); + dos.close(); + return DatatypeConverter.printBase64Binary(bytes); + } catch (IOException e) { + throw new HoodieIndexException("Could not serialize BloomFilter instance", e); + } + } + + @Override + public BloomFilterTypeCode getBloomFilterTypeCode() { + return BloomFilterTypeCode.DYNAMIC_V0; + } +} + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java new file mode 100644 index 000000000..5468ae919 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom.filter; + +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Hoodie's internal dynamic Bloom Filter. This is largely based of {@link org.apache.hadoop.util.bloom.DynamicBloomFilter} + * with bounds on maximum number of entries. Once the max entries is reached, false positive gaurantees are not + * honored. + */ +class InternalDynamicBloomFilter extends InternalFilter { + + /** + * Threshold for the maximum number of key to record in a dynamic Bloom filter row. + */ + private int nr; + + /** + * The number of keys recorded in the current standard active Bloom filter. + */ + private int currentNbRecord; + private int maxNr; + private boolean reachedMax = false; + private int curMatrixIndex = 0; + + /** + * The matrix of Bloom filter. + */ + private org.apache.hadoop.util.bloom.BloomFilter[] matrix; + + /** + * Zero-args constructor for the serialization. + */ + public InternalDynamicBloomFilter() { + } + + /** + * Constructor. + *

+ * Builds an empty Dynamic Bloom filter. + * + * @param vectorSize The number of bits in the vector. + * @param nbHash The number of hash function to consider. + * @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}). + * @param nr The threshold for the maximum number of keys to record in a dynamic Bloom filter row. + */ + public InternalDynamicBloomFilter(int vectorSize, int nbHash, int hashType, int nr, int maxNr) { + super(vectorSize, nbHash, hashType); + + this.nr = nr; + this.currentNbRecord = 0; + this.maxNr = maxNr; + + matrix = new org.apache.hadoop.util.bloom.BloomFilter[1]; + matrix[0] = new org.apache.hadoop.util.bloom.BloomFilter(this.vectorSize, this.nbHash, this.hashType); + } + + @Override + public void add(Key key) { + if (key == null) { + throw new NullPointerException("Key can not be null"); + } + + org.apache.hadoop.util.bloom.BloomFilter bf = getActiveStandardBF(); + + if (bf == null) { + addRow(); + bf = matrix[matrix.length - 1]; + currentNbRecord = 0; + } + + bf.add(key); + + currentNbRecord++; + } + + @Override + public void and(InternalFilter filter) { + if (filter == null + || !(filter instanceof InternalDynamicBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be and-ed"); + } + + InternalDynamicBloomFilter dbf = (InternalDynamicBloomFilter) filter; + + if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) { + throw new IllegalArgumentException("filters cannot be and-ed"); + } + + for (int i = 0; i < matrix.length; i++) { + matrix[i].and(dbf.matrix[i]); + } + } + + @Override + public boolean membershipTest(Key key) { + if (key == null) { + return true; + } + + for (int i = 0; i < matrix.length; i++) { + if (matrix[i].membershipTest(key)) { + return true; + } + } + + return false; + } + + @Override + public void not() { + for (int i = 0; i < matrix.length; i++) { + matrix[i].not(); + } + } + + @Override + public void or(InternalFilter filter) { + if (filter == null + || !(filter instanceof InternalDynamicBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be or-ed"); + } + + InternalDynamicBloomFilter dbf = (InternalDynamicBloomFilter) filter; + + if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) { + throw new IllegalArgumentException("filters cannot be or-ed"); + } + for (int i = 0; i < matrix.length; i++) { + matrix[i].or(dbf.matrix[i]); + } + } + + @Override + public void xor(InternalFilter filter) { + if (filter == null + || !(filter instanceof InternalDynamicBloomFilter) + || filter.vectorSize != this.vectorSize + || filter.nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be xor-ed"); + } + InternalDynamicBloomFilter dbf = (InternalDynamicBloomFilter) filter; + + if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) { + throw new IllegalArgumentException("filters cannot be xor-ed"); + } + + for (int i = 0; i < matrix.length; i++) { + matrix[i].xor(dbf.matrix[i]); + } + } + + @Override + public String toString() { + StringBuilder res = new StringBuilder(); + + for (int i = 0; i < matrix.length; i++) { + res.append(matrix[i]); + res.append(Character.LINE_SEPARATOR); + } + return res.toString(); + } + + // Writable + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(nr); + out.writeInt(currentNbRecord); + out.writeInt(matrix.length); + for (int i = 0; i < matrix.length; i++) { + matrix[i].write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + nr = in.readInt(); + currentNbRecord = in.readInt(); + int len = in.readInt(); + matrix = new org.apache.hadoop.util.bloom.BloomFilter[len]; + for (int i = 0; i < matrix.length; i++) { + matrix[i] = new org.apache.hadoop.util.bloom.BloomFilter(); + matrix[i].readFields(in); + } + } + + /** + * Adds a new row to this dynamic Bloom filter. + */ + private void addRow() { + org.apache.hadoop.util.bloom.BloomFilter[] tmp = new org.apache.hadoop.util.bloom.BloomFilter[matrix.length + 1]; + + for (int i = 0; i < matrix.length; i++) { + tmp[i] = matrix[i]; + } + + tmp[tmp.length - 1] = new org.apache.hadoop.util.bloom.BloomFilter(vectorSize, nbHash, hashType); + matrix = tmp; + } + + /** + * Returns the active standard Bloom filter in this dynamic Bloom filter. + * + * @return BloomFilter The active standard Bloom filter. + * Null otherwise. + */ + private BloomFilter getActiveStandardBF() { + if (reachedMax) { + return matrix[curMatrixIndex++ % matrix.length]; + } + + if (currentNbRecord >= nr && (matrix.length * nr) < maxNr) { + return null; + } else if (currentNbRecord >= nr && (matrix.length * nr) >= maxNr) { + reachedMax = true; + return matrix[0]; + } + return matrix[matrix.length - 1]; + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalFilter.java new file mode 100644 index 000000000..228143b64 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalFilter.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom.filter; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.bloom.HashFunction; +import org.apache.hadoop.util.bloom.Key; +import org.apache.hadoop.util.hash.Hash; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/** + * Copied from {@link org.apache.hadoop.util.bloom.Filter}. {@link InternalDynamicBloomFilter} needs access to some of + * protected members of {@link org.apache.hadoop.util.bloom.Filter} and hence had to copy it locally. + */ +abstract class InternalFilter implements Writable { + + private static final int VERSION = -1; // negative to accommodate for old format + protected int vectorSize; + protected HashFunction hash; + protected int nbHash; + protected int hashType; + + protected InternalFilter() { + } + + /** + * Constructor. + * + * @param vectorSize The vector size of this filter. + * @param nbHash The number of hash functions to consider. + * @param hashType type of the hashing function (see {@link Hash}). + */ + protected InternalFilter(int vectorSize, int nbHash, int hashType) { + this.vectorSize = vectorSize; + this.nbHash = nbHash; + this.hashType = hashType; + this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType); + } + + /** + * Adds a key to this filter. + * + * @param key The key to add. + */ + public abstract void add(Key key); + + /** + * Determines wether a specified key belongs to this filter. + * + * @param key The key to test. + * @return boolean True if the specified key belongs to this filter. False otherwise. + */ + public abstract boolean membershipTest(Key key); + + /** + * Peforms a logical AND between this filter and a specified filter. + *

+ * Invariant: The result is assigned to this filter. + * + * @param filter The filter to AND with. + */ + public abstract void and(InternalFilter filter); + + /** + * Peforms a logical OR between this filter and a specified filter. + *

+ * Invariant: The result is assigned to this filter. + * + * @param filter The filter to OR with. + */ + public abstract void or(InternalFilter filter); + + /** + * Peforms a logical XOR between this filter and a specified filter. + *

+ * Invariant: The result is assigned to this filter. + * + * @param filter The filter to XOR with. + */ + public abstract void xor(InternalFilter filter); + + /** + * Performs a logical NOT on this filter. + *

+ * The result is assigned to this filter. + */ + public abstract void not(); + + /** + * Adds a list of keys to this filter. + * + * @param keys The list of keys. + */ + public void add(List keys) { + if (keys == null) { + throw new IllegalArgumentException("ArrayList may not be null"); + } + + for (Key key : keys) { + add(key); + } + } //end add() + + /** + * Adds a collection of keys to this filter. + * + * @param keys The collection of keys. + */ + public void add(Collection keys) { + if (keys == null) { + throw new IllegalArgumentException("Collection may not be null"); + } + for (Key key : keys) { + add(key); + } + } //end add() + + /** + * Adds an array of keys to this filter. + * + * @param keys The array of keys. + */ + public void add(Key[] keys) { + if (keys == null) { + throw new IllegalArgumentException("Key[] may not be null"); + } + for (int i = 0; i < keys.length; i++) { + add(keys[i]); + } + } //end add() + + // Writable interface + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(VERSION); + out.writeInt(this.nbHash); + out.writeByte(this.hashType); + out.writeInt(this.vectorSize); + } + + @Override + public void readFields(DataInput in) throws IOException { + int ver = in.readInt(); + if (ver > 0) { // old unversioned format + this.nbHash = ver; + this.hashType = Hash.JENKINS_HASH; + } else if (ver == VERSION) { + this.nbHash = in.readInt(); + this.hashType = in.readByte(); + } else { + throw new IOException("Unsupported version: " + ver); + } + this.vectorSize = in.readInt(); + this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType); + } +} //end class \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/BloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java similarity index 62% rename from hudi-common/src/main/java/org/apache/hudi/common/BloomFilter.java rename to hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java index 06cc8b96d..0610e082e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/BloomFilter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java @@ -16,56 +16,57 @@ * limitations under the License. */ -package org.apache.hudi.common; +package org.apache.hudi.common.bloom.filter; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hadoop.util.bloom.Key; -import org.apache.hadoop.util.hash.Hash; import javax.xml.bind.DatatypeConverter; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInput; import java.io.DataInputStream; +import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; /** - * A Bloom filter implementation built on top of {@link org.apache.hadoop.util.bloom.BloomFilter}. + * A Simple Bloom filter implementation built on top of {@link org.apache.hadoop.util.bloom.BloomFilter}. */ -public class BloomFilter { - /** - * Used in computing the optimal Bloom filter size. This approximately equals 0.480453. - */ - public static final double LOG2_SQUARED = Math.log(2) * Math.log(2); +public class SimpleBloomFilter implements BloomFilter { private org.apache.hadoop.util.bloom.BloomFilter filter = null; - public BloomFilter(int numEntries, double errorRate) { - this(numEntries, errorRate, Hash.MURMUR_HASH); - } - /** * Create a new Bloom filter with the given configurations. + * + * @param numEntries The total number of entries. + * @param errorRate maximum allowable error rate. + * @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}). */ - public BloomFilter(int numEntries, double errorRate, int hashType) { + public SimpleBloomFilter(int numEntries, double errorRate, int hashType) { // Bit size - int bitSize = (int) Math.ceil(numEntries * (-Math.log(errorRate) / LOG2_SQUARED)); + int bitSize = BloomFilterUtils.getBitSize(numEntries, errorRate); // Number of the hash functions - int numHashs = (int) Math.ceil(Math.log(2) * bitSize / numEntries); + int numHashs = BloomFilterUtils.getNumHashes(bitSize, numEntries); // The filter this.filter = new org.apache.hadoop.util.bloom.BloomFilter(bitSize, numHashs, hashType); } /** * Create the bloom filter from serialized string. + * + * @param serString serialized string which represents the {@link SimpleBloomFilter} */ - public BloomFilter(String filterStr) { + public SimpleBloomFilter(String serString) { this.filter = new org.apache.hadoop.util.bloom.BloomFilter(); - byte[] bytes = DatatypeConverter.parseBase64Binary(filterStr); + byte[] bytes = DatatypeConverter.parseBase64Binary(serString); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); try { this.filter.readFields(dis); @@ -75,6 +76,7 @@ public class BloomFilter { } } + @Override public void add(String key) { if (key == null) { throw new NullPointerException("Key cannot by null"); @@ -82,6 +84,7 @@ public class BloomFilter { filter.add(new Key(key.getBytes(StandardCharsets.UTF_8))); } + @Override public boolean mightContain(String key) { if (key == null) { throw new NullPointerException("Key cannot by null"); @@ -92,6 +95,7 @@ public class BloomFilter { /** * Serialize the bloom filter as a string. */ + @Override public String serializeToString() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); @@ -104,4 +108,32 @@ public class BloomFilter { throw new HoodieIndexException("Could not serialize BloomFilter instance", e); } } + + private void writeObject(ObjectOutputStream os) + throws IOException { + filter.write(os); + } + + private void readObject(ObjectInputStream is) + throws IOException, ClassNotFoundException { + filter = new org.apache.hadoop.util.bloom.BloomFilter(); + filter.readFields(is); + } + + // @Override + public void write(DataOutput out) throws IOException { + out.write(filter.toString().getBytes()); + } + + //@Override + public void readFields(DataInput in) throws IOException { + filter = new org.apache.hadoop.util.bloom.BloomFilter(); + filter.readFields(in); + } + + @Override + public BloomFilterTypeCode getBloomFilterTypeCode() { + return BloomFilterTypeCode.SIMPLE; + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 02add76e1..dd53e22c4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -19,7 +19,9 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilterFactory; +import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -54,7 +56,7 @@ public class ParquetUtils { /** * Read the rowKey list from the given parquet file. * - * @param filePath The parquet file path. + * @param filePath The parquet file path. * @param configuration configuration to build fs object * @return Set Set of row keys */ @@ -66,9 +68,9 @@ public class ParquetUtils { * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will * return all the rowkeys. * - * @param filePath The parquet file path. + * @param filePath The parquet file path. * @param configuration configuration to build fs object - * @param filter record keys filter + * @param filter record keys filter * @return Set Set of row keys matching candidateRecordKeys */ public static Set filterParquetRowKeys(Configuration configuration, Path filePath, Set filter) { @@ -120,7 +122,7 @@ public class ParquetUtils { } private static Map readParquetFooter(Configuration configuration, boolean required, - Path parquetFilePath, String... footerNames) { + Path parquetFilePath, String... footerNames) { Map footerVals = new HashMap<>(); ParquetMetadata footer = readMetadata(configuration, parquetFilePath); Map metadata = footer.getFileMetaData().getKeyValueMetaData(); @@ -143,15 +145,26 @@ public class ParquetUtils { * Read out the bloom filter from the parquet file meta data. */ public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) { - Map footerVals = readParquetFooter(configuration, false, parquetFilePath, - HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, - HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + Map footerVals = + readParquetFooter(configuration, false, parquetFilePath, + HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, + HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, + HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE); String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); if (null == footerVal) { // We use old style key "com.uber.hoodie.bloomfilter" footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); } - return footerVal != null ? new BloomFilter(footerVal) : null; + BloomFilter toReturn = null; + if (footerVal != null) { + if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) { + toReturn = BloomFilterFactory.fromString(footerVal, + footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)); + } else { + toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name()); + } + } + return toReturn; } public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) { @@ -197,6 +210,7 @@ public class ParquetUtils { } static class RecordKeysFilterFunction implements Function { + private final Set candidateKeys; RecordKeysFilterFunction(Set candidateKeys) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bloom/filter/TestBloomFilter.java b/hudi-common/src/test/java/org/apache/hudi/common/bloom/filter/TestBloomFilter.java new file mode 100644 index 000000000..809f07bb3 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/bloom/filter/TestBloomFilter.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom.filter; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +/** + * Unit tests {@link SimpleBloomFilter} and {@link HoodieDynamicBoundedBloomFilter}. + */ +@RunWith(Parameterized.class) +public class TestBloomFilter { + + private final String versionToTest; + + // name attribute is optional, provide an unique name for test + // multiple parameters, uses Collection + @Parameters() + public static Collection data() { + return Arrays.asList(new Object[][] { + {BloomFilterTypeCode.SIMPLE.name()}, + {BloomFilterTypeCode.DYNAMIC_V0.name()} + }); + } + + public TestBloomFilter(String versionToTest) { + this.versionToTest = versionToTest; + } + + @Test + public void testAddKey() { + List inputs = new ArrayList<>(); + int[] sizes = {100, 1000, 10000}; + for (int size : sizes) { + inputs = new ArrayList<>(); + BloomFilter filter = getBloomFilter(versionToTest, size, 0.000001, size * 10); + for (int i = 0; i < size; i++) { + String key = UUID.randomUUID().toString(); + inputs.add(key); + filter.add(key); + } + for (java.lang.String key : inputs) { + Assert.assertTrue("Filter should have returned true for " + key, filter.mightContain(key)); + } + for (int i = 0; i < 100; i++) { + String randomKey = UUID.randomUUID().toString(); + if (inputs.contains(randomKey)) { + Assert.assertTrue("Filter should have returned true for " + randomKey, filter.mightContain(randomKey)); + } + } + } + } + + @Test + public void testSerialize() throws IOException, ClassNotFoundException { + + List inputs = new ArrayList<>(); + int[] sizes = {100, 1000, 10000}; + for (int size : sizes) { + inputs = new ArrayList<>(); + BloomFilter filter = getBloomFilter(versionToTest, size, 0.000001, size * 10); + for (int i = 0; i < size; i++) { + String key = UUID.randomUUID().toString(); + inputs.add(key); + filter.add(key); + } + + String serString = filter.serializeToString(); + BloomFilter recreatedBloomFilter = BloomFilterFactory + .fromString(serString, versionToTest); + for (String key : inputs) { + Assert.assertTrue("Filter should have returned true for " + key, recreatedBloomFilter.mightContain(key)); + } + } + } + + BloomFilter getBloomFilter(String typeCode, int numEntries, double errorRate, int maxEntries) { + if (typeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) { + return BloomFilterFactory.createBloomFilter(numEntries, errorRate, -1, typeCode); + } else { + return BloomFilterFactory.createBloomFilter(numEntries, errorRate, maxEntries, typeCode); + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bloom/filter/TestInternalDynamicBloomFilter.java b/hudi-common/src/test/java/org/apache/hudi/common/bloom/filter/TestInternalDynamicBloomFilter.java new file mode 100644 index 000000000..f93355963 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/bloom/filter/TestInternalDynamicBloomFilter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.bloom.filter; + +import org.apache.hadoop.util.hash.Hash; +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; + +/** + * Unit tests {@link InternalDynamicBloomFilter} for size bounding. + */ +public class TestInternalDynamicBloomFilter { + + @Test + public void testBoundedSize() { + + int[] batchSizes = {1000, 10000, 10000, 100000, 100000, 10000}; + int indexForMaxGrowth = 3; + int maxSize = batchSizes[0] * 100; + BloomFilter filter = new HoodieDynamicBoundedBloomFilter(batchSizes[0], 0.000001, Hash.MURMUR_HASH, maxSize); + int index = 0; + int lastKnownBloomSize = 0; + while (index < batchSizes.length) { + for (int i = 0; i < batchSizes[index]; i++) { + String key = UUID.randomUUID().toString(); + filter.add(key); + } + + String serString = filter.serializeToString(); + if (index != 0) { + int curLength = serString.length(); + if (index > indexForMaxGrowth) { + Assert.assertEquals("Length should not increase after hitting max entries", curLength, lastKnownBloomSize); + } else { + Assert.assertTrue("Length should increase until max entries are reached", curLength > lastKnownBloomSize); + } + } + lastKnownBloomSize = serString.length(); + index++; + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java index 3458a3b07..893553919 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java @@ -19,22 +19,30 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.HoodieCommonTestHarness; +import org.apache.hudi.common.bloom.filter.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilterFactory; +import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -47,8 +55,23 @@ import static org.junit.Assert.assertTrue; /** * Tests parquet utils. */ +@RunWith(Parameterized.class) public class TestParquetUtils extends HoodieCommonTestHarness { + String bloomFilterTypeToTest; + + @Parameters() + public static Collection data() { + return Arrays.asList(new Object[][] { + {BloomFilterTypeCode.SIMPLE.name()}, + {BloomFilterTypeCode.DYNAMIC_V0.name()} + }); + } + + public TestParquetUtils(String bloomFilterTypeToTest) { + this.bloomFilterTypeToTest = bloomFilterTypeToTest; + } + @Before public void setup() { initPath(); @@ -78,6 +101,16 @@ public class TestParquetUtils extends HoodieCommonTestHarness { } } + private Configuration getConfiguration() { + if (bloomFilterTypeToTest.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) { + return HoodieTestUtils.getDefaultHadoopConf(); + } else { + org.apache.hadoop.conf.Configuration conf = HoodieTestUtils.getDefaultHadoopConf(); + // conf.set(); + return conf; + } + } + @Test public void testFilterParquetRowKeys() throws Exception { List rowKeys = new ArrayList<>(); @@ -107,7 +140,8 @@ public class TestParquetUtils extends HoodieCommonTestHarness { private void writeParquetFile(String filePath, List rowKeys) throws Exception { // Write out a parquet file Schema schema = HoodieAvroUtils.getRecordKeySchema(); - BloomFilter filter = new BloomFilter(1000, 0.0001); + BloomFilter filter = BloomFilterFactory + .createBloomFilter(1000, 0.0001, 10000, bloomFilterTypeToTest); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP, diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java index 92624661c..7e26c5bce 100644 --- a/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java +++ b/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java @@ -19,7 +19,9 @@ package org.apache.hudi.hive; import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilter; +import org.apache.hudi.common.bloom.filter.BloomFilterFactory; +import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode; import org.apache.hudi.common.minicluster.HdfsTestService; import org.apache.hudi.common.minicluster.ZookeeperTestService; import org.apache.hudi.common.model.HoodieAvroPayload; @@ -188,7 +190,7 @@ public class TestUtil { } static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom, - String commitTime) throws IOException, URISyntaxException, InterruptedException { + String commitTime) throws IOException, URISyntaxException, InterruptedException { HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, commitTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); @@ -196,7 +198,7 @@ public class TestUtil { } static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, - DateTime startFrom, String commitTime, String deltaCommitTime) + DateTime startFrom, String commitTime, String deltaCommitTime) throws IOException, URISyntaxException, InterruptedException { HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, commitTime); @@ -212,7 +214,7 @@ public class TestUtil { } private static HoodieCommitMetadata createLogFiles(Map> partitionWriteStats, - boolean isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException { + boolean isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); for (Entry> wEntry : partitionWriteStats.entrySet()) { String partitionPath = wEntry.getKey(); @@ -230,7 +232,7 @@ public class TestUtil { } private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, - DateTime startFrom, String commitTime) throws IOException, URISyntaxException, InterruptedException { + DateTime startFrom, String commitTime) throws IOException, URISyntaxException, InterruptedException { startFrom = startFrom.withTimeAtStartOfDay(); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); @@ -267,7 +269,8 @@ public class TestUtil { throws IOException, URISyntaxException, InterruptedException { Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema()); org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); - BloomFilter filter = new BloomFilter(1000, 0.0001); + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1, + BloomFilterTypeCode.SIMPLE.name()); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter); ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,