1
0

[HUDI-106] Adding support for DynamicBloomFilter (#976)

- Introduced configs for bloom filter type
- Implemented dynamic bloom filter with configurable max number of keys
- BloomFilterFactory abstractions; Defaults to current simple bloom filter
This commit is contained in:
Sivabalan Narayanan
2019-12-17 19:06:24 -08:00
committed by vinoth chandar
parent 7498ca71cb
commit 14881e99e0
23 changed files with 1069 additions and 83 deletions

28
LICENSE
View File

@@ -293,3 +293,31 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
This product includes code from org.apache.hadoop.
* org.apache.hudi.common.bloom.filter.InternalDynamicBloomFilter.java adapted from org.apache.hadoop.util.bloom.DynamicBloomFilter.java
* org.apache.hudi.common.bloom.filter.InternalFilter copied from classes in org.apache.hadoop.util.bloom package
with the following license
Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-------------------------------------------------------------------------------

View File

@@ -17,21 +17,18 @@
package org.apache.hudi.cli package org.apache.hudi.cli
import java.util
import java.util.Map
import org.apache.avro.Schema import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.filter.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ParquetUtils import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.common.{BloomFilter, HoodieJsonPayload}
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieParquetConfig, HoodieParquetWriter} import org.apache.hudi.io.storage.{HoodieParquetConfig, HoodieParquetWriter}
import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -44,7 +41,8 @@ object SparkHelpers {
def skipKeysAndWriteNewFile(commitTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) { def skipKeysAndWriteNewFile(commitTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) {
val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile) val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile)
val schema: Schema = sourceRecords.get(0).getSchema val schema: Schema = sourceRecords.get(0).getSchema
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble) val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble,
HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter) val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble) val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf, HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO.toDouble)
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema) val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema)

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.config; package org.apache.hudi.config;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import javax.annotation.concurrent.Immutable; import javax.annotation.concurrent.Immutable;
@@ -27,7 +28,6 @@ import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.util.Properties; import java.util.Properties;
/** /**
* Indexing related config. * Indexing related config.
*/ */
@@ -54,6 +54,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
// TODO: On by default. Once stable, we will remove the other mode. // TODO: On by default. Once stable, we will remove the other mode.
public static final String BLOOM_INDEX_BUCKETIZED_CHECKING_PROP = "hoodie.bloom.index.bucketized.checking"; public static final String BLOOM_INDEX_BUCKETIZED_CHECKING_PROP = "hoodie.bloom.index.bucketized.checking";
public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING = "true"; public static final String DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING = "true";
public static final String BLOOM_INDEX_FILTER_TYPE = "hoodie.bloom.index.filter.type";
public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = BloomFilterTypeCode.SIMPLE.name();
public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "hoodie.bloom.index.filter.dynamic.max.entries";
public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "100000";
// 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter. // 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter.
// 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions. // 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions.
public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket"; public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket";
@@ -194,6 +199,10 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING); BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, DEFAULT_BLOOM_INDEX_BUCKETIZED_CHECKING);
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_KEYS_PER_BUCKET_PROP), setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_KEYS_PER_BUCKET_PROP),
BLOOM_INDEX_KEYS_PER_BUCKET_PROP, DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET); BLOOM_INDEX_KEYS_PER_BUCKET_PROP, DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET);
setDefaultOnCondition(props, !props.contains(BLOOM_INDEX_FILTER_TYPE),
BLOOM_INDEX_FILTER_TYPE, DEFAULT_BLOOM_INDEX_FILTER_TYPE);
setDefaultOnCondition(props, !props.contains(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES),
HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES);
// Throws IllegalArgumentException if the value set is not a known Hoodie Index Type // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP)); HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
return config; return config;

View File

@@ -96,6 +96,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
private ConsistencyGuardConfig consistencyGuardConfig; private ConsistencyGuardConfig consistencyGuardConfig;
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
@@ -369,6 +370,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS)); return Integer.valueOf(props.getProperty(HoodieHBaseIndexConfig.HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS));
} }
public String getBloomFilterType() {
return props.getProperty(HoodieIndexConfig.BLOOM_INDEX_FILTER_TYPE);
}
public int getDynamicBloomFilterMaxNumEntries() {
return Integer.parseInt(props.getProperty(HoodieIndexConfig.HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES));
}
/** /**
* Fraction of the global share of QPS that should be allocated to this job. Let's say there are 3 jobs which have * Fraction of the global share of QPS that should be allocated to this job. Let's say there are 3 jobs which have
* input size in terms of number of rows required for HbaseIndexing as x, 2x, 3x respectively. Then this fraction for * input size in terms of number of rows required for HbaseIndexing as x, 2x, 3x respectively. Then this fraction for

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.io; package org.apache.hudi.io;
import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;

View File

@@ -19,7 +19,8 @@
package org.apache.hudi.io.storage; package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
@@ -51,7 +52,10 @@ public class HoodieStorageWriterFactory {
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter( private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(
String commitTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable) String commitTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable)
throws IOException { throws IOException {
BloomFilter filter = new BloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP()); BloomFilter filter = BloomFilterFactory
.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(),
config.getDynamicBloomFilterMaxNumEntries(),
config.getBloomFilterType());
HoodieAvroWriteSupport writeSupport = HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);

View File

@@ -21,6 +21,9 @@ package org.apache.hudi.common;
import org.apache.hudi.HoodieReadClient; import org.apache.hudi.HoodieReadClient;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
@@ -217,7 +220,8 @@ public class HoodieClientTestUtils {
List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {
if (filter == null) { if (filter == null) {
filter = new BloomFilter(10000, 0.0000001); filter = BloomFilterFactory
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
} }
HoodieAvroWriteSupport writeSupport = HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);

View File

@@ -19,9 +19,11 @@
package org.apache.hudi.index.bloom; package org.apache.hudi.index.bloom;
import org.apache.hudi.HoodieClientTestHarness; import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -248,7 +250,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
// We write record1, record2 to a parquet file, but the bloom filter contains (record1, // We write record1, record2 to a parquet file, but the bloom filter contains (record1,
// record2, record3). // record2, record3).
BloomFilter filter = new BloomFilter(10000, 0.0000001); BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
filter.add(record3.getRecordKey()); filter.add(record3.getRecordKey());
String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1, record2), String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1, record2),
schema, filter, true); schema, filter, true);
@@ -451,7 +453,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
HoodieRecord record2 = HoodieRecord record2 =
new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);
BloomFilter filter = new BloomFilter(10000, 0.0000001); BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1,
BloomFilterTypeCode.SIMPLE.name());
filter.add(record2.getRecordKey()); filter.add(record2.getRecordKey());
String filename = String filename =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, filter, true); HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1), schema, filter, true);

View File

@@ -20,11 +20,11 @@ package org.apache.hudi.table;
import org.apache.hudi.HoodieClientTestHarness; import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus; import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;

View File

@@ -18,7 +18,8 @@
package org.apache.hudi.avro; package org.apache.hudi.avro;
import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.HoodieDynamicBoundedBloomFilter;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.avro.AvroWriteSupport;
@@ -40,6 +41,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter"; public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter";
public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key"; public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key";
public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key"; public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key";
public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";
public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, BloomFilter bloomFilter) { public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, BloomFilter bloomFilter) {
super(schema, avroSchema); super(schema, avroSchema);
@@ -55,6 +57,9 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey); extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey); extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
} }
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
}
} }
return new WriteSupport.FinalizedWriteContext(extraMetaData); return new WriteSupport.FinalizedWriteContext(extraMetaData);
} }

View File

@@ -16,34 +16,35 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.common; package org.apache.hudi.common.bloom.filter;
import org.junit.Test;
import java.io.IOException;
/** /**
* Tests bloom filter {@link BloomFilter}. * A Bloom filter interface.
*/ */
public class TestBloomFilter { public interface BloomFilter {
@Test /**
public void testAddKey() { * Add a key to the {@link BloomFilter}.
BloomFilter filter = new BloomFilter(100, 0.0000001); *
filter.add("key1"); * @param key the key to the added to the {@link BloomFilter}
assert (filter.mightContain("key1")); */
} void add(String key);
@Test /**
public void testSerialize() throws IOException, ClassNotFoundException { * Tests for key membership.
BloomFilter filter = new BloomFilter(1000, 0.0000001); *
filter.add("key1"); * @param key the key to be checked for membership
filter.add("key2"); * @return {@code true} if key may be found, {@code false} if key is not found for sure.
String filterStr = filter.serializeToString(); */
boolean mightContain(String key);
// Rebuild /**
BloomFilter newFilter = new BloomFilter(filterStr); * Serialize the bloom filter as a string.
assert (newFilter.mightContain("key1")); */
assert (newFilter.mightContain("key2")); String serializeToString();
}
/**
* @return the bloom index type code.
**/
BloomFilterTypeCode getBloomFilterTypeCode();
} }

View File

@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.bloom.filter;
import org.apache.hadoop.util.hash.Hash;
/**
* A Factory class to generate different versions of {@link BloomFilter}.
*/
public class BloomFilterFactory {
/**
* Creates a new {@link BloomFilter} with the given args.
*
* @param numEntries total number of entries
* @param errorRate max allowed error rate
* @param bloomFilterTypeCode bloom filter type code
* @return the {@link BloomFilter} thus created
*/
public static BloomFilter createBloomFilter(int numEntries, double errorRate, int maxNumberOfEntries,
String bloomFilterTypeCode) {
if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) {
return new SimpleBloomFilter(numEntries, errorRate, Hash.MURMUR_HASH);
} else if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.DYNAMIC_V0.name())) {
return new HoodieDynamicBoundedBloomFilter(numEntries, errorRate, Hash.MURMUR_HASH, maxNumberOfEntries);
} else {
throw new IllegalArgumentException("Bloom Filter type code not recognizable " + bloomFilterTypeCode);
}
}
/**
* Generate {@link BloomFilter} from serialized String.
*
* @param serString the serialized string of the {@link BloomFilter}
* @param bloomFilterTypeCode bloom filter type code as string
* @return the {@link BloomFilter} thus generated from the passed in serialized string
*/
public static BloomFilter fromString(String serString, String bloomFilterTypeCode) {
if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) {
return new SimpleBloomFilter(serString);
} else if (bloomFilterTypeCode.equalsIgnoreCase(BloomFilterTypeCode.DYNAMIC_V0.name())) {
return new HoodieDynamicBoundedBloomFilter(serString, BloomFilterTypeCode.DYNAMIC_V0);
} else {
throw new IllegalArgumentException("Bloom Filter type code not recognizable " + bloomFilterTypeCode);
}
}
}

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.bloom.filter;
/**
* Bloom filter type codes.
* Please do not change the order of the entries.
*/
public enum BloomFilterTypeCode {
SIMPLE,
DYNAMIC_V0
}

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.bloom.filter;
/**
* Bloom filter utils.
*/
class BloomFilterUtils {
/**
* Used in computing the optimal Bloom filter size. This approximately equals 0.480453.
*/
private static final double LOG2_SQUARED = Math.log(2) * Math.log(2);
/**
* @return the bitsize given the total number of entries and error rate.
*/
static int getBitSize(int numEntries, double errorRate) {
return (int) Math.ceil(numEntries * (-Math.log(errorRate) / LOG2_SQUARED));
}
/**
* @return the number of hashes given the bitsize and total number of entries.
*/
static int getNumHashes(int bitSize, int numEntries) {
// Number of the hash functions
return (int) Math.ceil(Math.log(2) * bitSize / numEntries);
}
}

View File

@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.bloom.filter;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hadoop.util.bloom.Key;
import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* Hoodie's dynamic bloom bounded bloom filter. This is based largely on Hadoop's DynamicBloomFilter, but with a bound
* on amount of entries to dynamically expand to. Once the entries added reach the bound, false positive ratio may not
* be guaranteed.
*/
public class HoodieDynamicBoundedBloomFilter implements BloomFilter {
public static final String TYPE_CODE_PREFIX = "DYNAMIC";
private InternalDynamicBloomFilter internalDynamicBloomFilter;
/**
* Instantiates {@link HoodieDynamicBoundedBloomFilter} with the given args.
*
* @param numEntries The total number of entries.
* @param errorRate maximum allowable error rate.
* @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}).
* @return the {@link HoodieDynamicBoundedBloomFilter} thus created
*/
HoodieDynamicBoundedBloomFilter(int numEntries, double errorRate, int hashType, int maxNoOfEntries) {
// Bit size
int bitSize = BloomFilterUtils.getBitSize(numEntries, errorRate);
// Number of the hash functions
int numHashs = BloomFilterUtils.getNumHashes(bitSize, numEntries);
this.internalDynamicBloomFilter = new InternalDynamicBloomFilter(bitSize, numHashs, hashType, numEntries,
maxNoOfEntries);
}
/**
* Generate {@link HoodieDynamicBoundedBloomFilter} from the given {@code serString} serialized string.
*
* @param serString the serialized string which represents the {@link HoodieDynamicBoundedBloomFilter}
* @param typeCode type code of the bloom filter
*/
HoodieDynamicBoundedBloomFilter(String serString, BloomFilterTypeCode typeCode) {
// ignoring the type code for now, since we have just one version
byte[] bytes = DatatypeConverter.parseBase64Binary(serString);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
try {
internalDynamicBloomFilter = new InternalDynamicBloomFilter();
internalDynamicBloomFilter.readFields(dis);
dis.close();
} catch (IOException e) {
throw new HoodieIndexException("Could not deserialize BloomFilter instance", e);
}
}
@Override
public void add(String key) {
internalDynamicBloomFilter.add(new Key(key.getBytes(StandardCharsets.UTF_8)));
}
@Override
public boolean mightContain(String key) {
return internalDynamicBloomFilter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8)));
}
@Override
public String serializeToString() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
try {
internalDynamicBloomFilter.write(dos);
byte[] bytes = baos.toByteArray();
dos.close();
return DatatypeConverter.printBase64Binary(bytes);
} catch (IOException e) {
throw new HoodieIndexException("Could not serialize BloomFilter instance", e);
}
}
@Override
public BloomFilterTypeCode getBloomFilterTypeCode() {
return BloomFilterTypeCode.DYNAMIC_V0;
}
}

View File

@@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.bloom.filter;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* Hoodie's internal dynamic Bloom Filter. This is largely based of {@link org.apache.hadoop.util.bloom.DynamicBloomFilter}
* with bounds on maximum number of entries. Once the max entries is reached, false positive gaurantees are not
* honored.
*/
class InternalDynamicBloomFilter extends InternalFilter {
/**
* Threshold for the maximum number of key to record in a dynamic Bloom filter row.
*/
private int nr;
/**
* The number of keys recorded in the current standard active Bloom filter.
*/
private int currentNbRecord;
private int maxNr;
private boolean reachedMax = false;
private int curMatrixIndex = 0;
/**
* The matrix of Bloom filter.
*/
private org.apache.hadoop.util.bloom.BloomFilter[] matrix;
/**
* Zero-args constructor for the serialization.
*/
public InternalDynamicBloomFilter() {
}
/**
* Constructor.
* <p>
* Builds an empty Dynamic Bloom filter.
*
* @param vectorSize The number of bits in the vector.
* @param nbHash The number of hash function to consider.
* @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}).
* @param nr The threshold for the maximum number of keys to record in a dynamic Bloom filter row.
*/
public InternalDynamicBloomFilter(int vectorSize, int nbHash, int hashType, int nr, int maxNr) {
super(vectorSize, nbHash, hashType);
this.nr = nr;
this.currentNbRecord = 0;
this.maxNr = maxNr;
matrix = new org.apache.hadoop.util.bloom.BloomFilter[1];
matrix[0] = new org.apache.hadoop.util.bloom.BloomFilter(this.vectorSize, this.nbHash, this.hashType);
}
@Override
public void add(Key key) {
if (key == null) {
throw new NullPointerException("Key can not be null");
}
org.apache.hadoop.util.bloom.BloomFilter bf = getActiveStandardBF();
if (bf == null) {
addRow();
bf = matrix[matrix.length - 1];
currentNbRecord = 0;
}
bf.add(key);
currentNbRecord++;
}
@Override
public void and(InternalFilter filter) {
if (filter == null
|| !(filter instanceof InternalDynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be and-ed");
}
InternalDynamicBloomFilter dbf = (InternalDynamicBloomFilter) filter;
if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
throw new IllegalArgumentException("filters cannot be and-ed");
}
for (int i = 0; i < matrix.length; i++) {
matrix[i].and(dbf.matrix[i]);
}
}
@Override
public boolean membershipTest(Key key) {
if (key == null) {
return true;
}
for (int i = 0; i < matrix.length; i++) {
if (matrix[i].membershipTest(key)) {
return true;
}
}
return false;
}
@Override
public void not() {
for (int i = 0; i < matrix.length; i++) {
matrix[i].not();
}
}
@Override
public void or(InternalFilter filter) {
if (filter == null
|| !(filter instanceof InternalDynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be or-ed");
}
InternalDynamicBloomFilter dbf = (InternalDynamicBloomFilter) filter;
if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
throw new IllegalArgumentException("filters cannot be or-ed");
}
for (int i = 0; i < matrix.length; i++) {
matrix[i].or(dbf.matrix[i]);
}
}
@Override
public void xor(InternalFilter filter) {
if (filter == null
|| !(filter instanceof InternalDynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be xor-ed");
}
InternalDynamicBloomFilter dbf = (InternalDynamicBloomFilter) filter;
if (dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
throw new IllegalArgumentException("filters cannot be xor-ed");
}
for (int i = 0; i < matrix.length; i++) {
matrix[i].xor(dbf.matrix[i]);
}
}
@Override
public String toString() {
StringBuilder res = new StringBuilder();
for (int i = 0; i < matrix.length; i++) {
res.append(matrix[i]);
res.append(Character.LINE_SEPARATOR);
}
return res.toString();
}
// Writable
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(nr);
out.writeInt(currentNbRecord);
out.writeInt(matrix.length);
for (int i = 0; i < matrix.length; i++) {
matrix[i].write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
nr = in.readInt();
currentNbRecord = in.readInt();
int len = in.readInt();
matrix = new org.apache.hadoop.util.bloom.BloomFilter[len];
for (int i = 0; i < matrix.length; i++) {
matrix[i] = new org.apache.hadoop.util.bloom.BloomFilter();
matrix[i].readFields(in);
}
}
/**
* Adds a new row to <i>this</i> dynamic Bloom filter.
*/
private void addRow() {
org.apache.hadoop.util.bloom.BloomFilter[] tmp = new org.apache.hadoop.util.bloom.BloomFilter[matrix.length + 1];
for (int i = 0; i < matrix.length; i++) {
tmp[i] = matrix[i];
}
tmp[tmp.length - 1] = new org.apache.hadoop.util.bloom.BloomFilter(vectorSize, nbHash, hashType);
matrix = tmp;
}
/**
* Returns the active standard Bloom filter in <i>this</i> dynamic Bloom filter.
*
* @return BloomFilter The active standard Bloom filter.
* <code>Null</code> otherwise.
*/
private BloomFilter getActiveStandardBF() {
if (reachedMax) {
return matrix[curMatrixIndex++ % matrix.length];
}
if (currentNbRecord >= nr && (matrix.length * nr) < maxNr) {
return null;
} else if (currentNbRecord >= nr && (matrix.length * nr) >= maxNr) {
reachedMax = true;
return matrix[0];
}
return matrix[matrix.length - 1];
}
}

View File

@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.bloom.filter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.bloom.HashFunction;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
/**
* Copied from {@link org.apache.hadoop.util.bloom.Filter}. {@link InternalDynamicBloomFilter} needs access to some of
* protected members of {@link org.apache.hadoop.util.bloom.Filter} and hence had to copy it locally.
*/
abstract class InternalFilter implements Writable {
private static final int VERSION = -1; // negative to accommodate for old format
protected int vectorSize;
protected HashFunction hash;
protected int nbHash;
protected int hashType;
protected InternalFilter() {
}
/**
* Constructor.
*
* @param vectorSize The vector size of <i>this</i> filter.
* @param nbHash The number of hash functions to consider.
* @param hashType type of the hashing function (see {@link Hash}).
*/
protected InternalFilter(int vectorSize, int nbHash, int hashType) {
this.vectorSize = vectorSize;
this.nbHash = nbHash;
this.hashType = hashType;
this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType);
}
/**
* Adds a key to <i>this</i> filter.
*
* @param key The key to add.
*/
public abstract void add(Key key);
/**
* Determines wether a specified key belongs to <i>this</i> filter.
*
* @param key The key to test.
* @return boolean True if the specified key belongs to <i>this</i> filter. False otherwise.
*/
public abstract boolean membershipTest(Key key);
/**
* Peforms a logical AND between <i>this</i> filter and a specified filter.
* <p>
* <b>Invariant</b>: The result is assigned to <i>this</i> filter.
*
* @param filter The filter to AND with.
*/
public abstract void and(InternalFilter filter);
/**
* Peforms a logical OR between <i>this</i> filter and a specified filter.
* <p>
* <b>Invariant</b>: The result is assigned to <i>this</i> filter.
*
* @param filter The filter to OR with.
*/
public abstract void or(InternalFilter filter);
/**
* Peforms a logical XOR between <i>this</i> filter and a specified filter.
* <p>
* <b>Invariant</b>: The result is assigned to <i>this</i> filter.
*
* @param filter The filter to XOR with.
*/
public abstract void xor(InternalFilter filter);
/**
* Performs a logical NOT on <i>this</i> filter.
* <p>
* The result is assigned to <i>this</i> filter.
*/
public abstract void not();
/**
* Adds a list of keys to <i>this</i> filter.
*
* @param keys The list of keys.
*/
public void add(List<Key> keys) {
if (keys == null) {
throw new IllegalArgumentException("ArrayList<Key> may not be null");
}
for (Key key : keys) {
add(key);
}
} //end add()
/**
* Adds a collection of keys to <i>this</i> filter.
*
* @param keys The collection of keys.
*/
public void add(Collection<Key> keys) {
if (keys == null) {
throw new IllegalArgumentException("Collection<Key> may not be null");
}
for (Key key : keys) {
add(key);
}
} //end add()
/**
* Adds an array of keys to <i>this</i> filter.
*
* @param keys The array of keys.
*/
public void add(Key[] keys) {
if (keys == null) {
throw new IllegalArgumentException("Key[] may not be null");
}
for (int i = 0; i < keys.length; i++) {
add(keys[i]);
}
} //end add()
// Writable interface
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(VERSION);
out.writeInt(this.nbHash);
out.writeByte(this.hashType);
out.writeInt(this.vectorSize);
}
@Override
public void readFields(DataInput in) throws IOException {
int ver = in.readInt();
if (ver > 0) { // old unversioned format
this.nbHash = ver;
this.hashType = Hash.JENKINS_HASH;
} else if (ver == VERSION) {
this.nbHash = in.readInt();
this.hashType = in.readByte();
} else {
throw new IOException("Unsupported version: " + ver);
}
this.vectorSize = in.readInt();
this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType);
}
} //end class

View File

@@ -16,56 +16,57 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.common; package org.apache.hudi.common.bloom.filter;
import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
/** /**
* A Bloom filter implementation built on top of {@link org.apache.hadoop.util.bloom.BloomFilter}. * A Simple Bloom filter implementation built on top of {@link org.apache.hadoop.util.bloom.BloomFilter}.
*/ */
public class BloomFilter {
/** public class SimpleBloomFilter implements BloomFilter {
* Used in computing the optimal Bloom filter size. This approximately equals 0.480453.
*/
public static final double LOG2_SQUARED = Math.log(2) * Math.log(2);
private org.apache.hadoop.util.bloom.BloomFilter filter = null; private org.apache.hadoop.util.bloom.BloomFilter filter = null;
public BloomFilter(int numEntries, double errorRate) {
this(numEntries, errorRate, Hash.MURMUR_HASH);
}
/** /**
* Create a new Bloom filter with the given configurations. * Create a new Bloom filter with the given configurations.
*
* @param numEntries The total number of entries.
* @param errorRate maximum allowable error rate.
* @param hashType type of the hashing function (see {@link org.apache.hadoop.util.hash.Hash}).
*/ */
public BloomFilter(int numEntries, double errorRate, int hashType) { public SimpleBloomFilter(int numEntries, double errorRate, int hashType) {
// Bit size // Bit size
int bitSize = (int) Math.ceil(numEntries * (-Math.log(errorRate) / LOG2_SQUARED)); int bitSize = BloomFilterUtils.getBitSize(numEntries, errorRate);
// Number of the hash functions // Number of the hash functions
int numHashs = (int) Math.ceil(Math.log(2) * bitSize / numEntries); int numHashs = BloomFilterUtils.getNumHashes(bitSize, numEntries);
// The filter // The filter
this.filter = new org.apache.hadoop.util.bloom.BloomFilter(bitSize, numHashs, hashType); this.filter = new org.apache.hadoop.util.bloom.BloomFilter(bitSize, numHashs, hashType);
} }
/** /**
* Create the bloom filter from serialized string. * Create the bloom filter from serialized string.
*
* @param serString serialized string which represents the {@link SimpleBloomFilter}
*/ */
public BloomFilter(String filterStr) { public SimpleBloomFilter(String serString) {
this.filter = new org.apache.hadoop.util.bloom.BloomFilter(); this.filter = new org.apache.hadoop.util.bloom.BloomFilter();
byte[] bytes = DatatypeConverter.parseBase64Binary(filterStr); byte[] bytes = DatatypeConverter.parseBase64Binary(serString);
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
try { try {
this.filter.readFields(dis); this.filter.readFields(dis);
@@ -75,6 +76,7 @@ public class BloomFilter {
} }
} }
@Override
public void add(String key) { public void add(String key) {
if (key == null) { if (key == null) {
throw new NullPointerException("Key cannot by null"); throw new NullPointerException("Key cannot by null");
@@ -82,6 +84,7 @@ public class BloomFilter {
filter.add(new Key(key.getBytes(StandardCharsets.UTF_8))); filter.add(new Key(key.getBytes(StandardCharsets.UTF_8)));
} }
@Override
public boolean mightContain(String key) { public boolean mightContain(String key) {
if (key == null) { if (key == null) {
throw new NullPointerException("Key cannot by null"); throw new NullPointerException("Key cannot by null");
@@ -92,6 +95,7 @@ public class BloomFilter {
/** /**
* Serialize the bloom filter as a string. * Serialize the bloom filter as a string.
*/ */
@Override
public String serializeToString() { public String serializeToString() {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos); DataOutputStream dos = new DataOutputStream(baos);
@@ -104,4 +108,32 @@ public class BloomFilter {
throw new HoodieIndexException("Could not serialize BloomFilter instance", e); throw new HoodieIndexException("Could not serialize BloomFilter instance", e);
} }
} }
private void writeObject(ObjectOutputStream os)
throws IOException {
filter.write(os);
}
private void readObject(ObjectInputStream is)
throws IOException, ClassNotFoundException {
filter = new org.apache.hadoop.util.bloom.BloomFilter();
filter.readFields(is);
}
// @Override
public void write(DataOutput out) throws IOException {
out.write(filter.toString().getBytes());
}
//@Override
public void readFields(DataInput in) throws IOException {
filter = new org.apache.hadoop.util.bloom.BloomFilter();
filter.readFields(in);
}
@Override
public BloomFilterTypeCode getBloomFilterTypeCode() {
return BloomFilterTypeCode.SIMPLE;
}
} }

View File

@@ -19,7 +19,9 @@
package org.apache.hudi.common.util; package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
@@ -143,15 +145,26 @@ public class ParquetUtils {
* Read out the bloom filter from the parquet file meta data. * Read out the bloom filter from the parquet file meta data.
*/ */
public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) { public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) {
Map<String, String> footerVals = readParquetFooter(configuration, false, parquetFilePath, Map<String, String> footerVals =
readParquetFooter(configuration, false, parquetFilePath,
HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
if (null == footerVal) { if (null == footerVal) {
// We use old style key "com.uber.hoodie.bloomfilter" // We use old style key "com.uber.hoodie.bloomfilter"
footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
} }
return footerVal != null ? new BloomFilter(footerVal) : null; BloomFilter toReturn = null;
if (footerVal != null) {
if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
toReturn = BloomFilterFactory.fromString(footerVal,
footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
} else {
toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name());
}
}
return toReturn;
} }
public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) { public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) {
@@ -197,6 +210,7 @@ public class ParquetUtils {
} }
static class RecordKeysFilterFunction implements Function<String, Boolean> { static class RecordKeysFilterFunction implements Function<String, Boolean> {
private final Set<String> candidateKeys; private final Set<String> candidateKeys;
RecordKeysFilterFunction(Set<String> candidateKeys) { RecordKeysFilterFunction(Set<String> candidateKeys) {

View File

@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.bloom.filter;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
/**
* Unit tests {@link SimpleBloomFilter} and {@link HoodieDynamicBoundedBloomFilter}.
*/
@RunWith(Parameterized.class)
public class TestBloomFilter {
private final String versionToTest;
// name attribute is optional, provide an unique name for test
// multiple parameters, uses Collection<Object[]>
@Parameters()
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{BloomFilterTypeCode.SIMPLE.name()},
{BloomFilterTypeCode.DYNAMIC_V0.name()}
});
}
public TestBloomFilter(String versionToTest) {
this.versionToTest = versionToTest;
}
@Test
public void testAddKey() {
List<String> inputs = new ArrayList<>();
int[] sizes = {100, 1000, 10000};
for (int size : sizes) {
inputs = new ArrayList<>();
BloomFilter filter = getBloomFilter(versionToTest, size, 0.000001, size * 10);
for (int i = 0; i < size; i++) {
String key = UUID.randomUUID().toString();
inputs.add(key);
filter.add(key);
}
for (java.lang.String key : inputs) {
Assert.assertTrue("Filter should have returned true for " + key, filter.mightContain(key));
}
for (int i = 0; i < 100; i++) {
String randomKey = UUID.randomUUID().toString();
if (inputs.contains(randomKey)) {
Assert.assertTrue("Filter should have returned true for " + randomKey, filter.mightContain(randomKey));
}
}
}
}
@Test
public void testSerialize() throws IOException, ClassNotFoundException {
List<String> inputs = new ArrayList<>();
int[] sizes = {100, 1000, 10000};
for (int size : sizes) {
inputs = new ArrayList<>();
BloomFilter filter = getBloomFilter(versionToTest, size, 0.000001, size * 10);
for (int i = 0; i < size; i++) {
String key = UUID.randomUUID().toString();
inputs.add(key);
filter.add(key);
}
String serString = filter.serializeToString();
BloomFilter recreatedBloomFilter = BloomFilterFactory
.fromString(serString, versionToTest);
for (String key : inputs) {
Assert.assertTrue("Filter should have returned true for " + key, recreatedBloomFilter.mightContain(key));
}
}
}
BloomFilter getBloomFilter(String typeCode, int numEntries, double errorRate, int maxEntries) {
if (typeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) {
return BloomFilterFactory.createBloomFilter(numEntries, errorRate, -1, typeCode);
} else {
return BloomFilterFactory.createBloomFilter(numEntries, errorRate, maxEntries, typeCode);
}
}
}

View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.bloom.filter;
import org.apache.hadoop.util.hash.Hash;
import org.junit.Assert;
import org.junit.Test;
import java.util.UUID;
/**
* Unit tests {@link InternalDynamicBloomFilter} for size bounding.
*/
public class TestInternalDynamicBloomFilter {
@Test
public void testBoundedSize() {
int[] batchSizes = {1000, 10000, 10000, 100000, 100000, 10000};
int indexForMaxGrowth = 3;
int maxSize = batchSizes[0] * 100;
BloomFilter filter = new HoodieDynamicBoundedBloomFilter(batchSizes[0], 0.000001, Hash.MURMUR_HASH, maxSize);
int index = 0;
int lastKnownBloomSize = 0;
while (index < batchSizes.length) {
for (int i = 0; i < batchSizes[index]; i++) {
String key = UUID.randomUUID().toString();
filter.add(key);
}
String serString = filter.serializeToString();
if (index != 0) {
int curLength = serString.length();
if (index > indexForMaxGrowth) {
Assert.assertEquals("Length should not increase after hitting max entries", curLength, lastKnownBloomSize);
} else {
Assert.assertTrue("Length should increase until max entries are reached", curLength > lastKnownBloomSize);
}
}
lastKnownBloomSize = serString.length();
index++;
}
}
}

View File

@@ -19,22 +19,30 @@
package org.apache.hudi.common.util; package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@@ -47,8 +55,23 @@ import static org.junit.Assert.assertTrue;
/** /**
* Tests parquet utils. * Tests parquet utils.
*/ */
@RunWith(Parameterized.class)
public class TestParquetUtils extends HoodieCommonTestHarness { public class TestParquetUtils extends HoodieCommonTestHarness {
String bloomFilterTypeToTest;
@Parameters()
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{BloomFilterTypeCode.SIMPLE.name()},
{BloomFilterTypeCode.DYNAMIC_V0.name()}
});
}
public TestParquetUtils(String bloomFilterTypeToTest) {
this.bloomFilterTypeToTest = bloomFilterTypeToTest;
}
@Before @Before
public void setup() { public void setup() {
initPath(); initPath();
@@ -78,6 +101,16 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
} }
} }
private Configuration getConfiguration() {
if (bloomFilterTypeToTest.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) {
return HoodieTestUtils.getDefaultHadoopConf();
} else {
org.apache.hadoop.conf.Configuration conf = HoodieTestUtils.getDefaultHadoopConf();
// conf.set();
return conf;
}
}
@Test @Test
public void testFilterParquetRowKeys() throws Exception { public void testFilterParquetRowKeys() throws Exception {
List<String> rowKeys = new ArrayList<>(); List<String> rowKeys = new ArrayList<>();
@@ -107,7 +140,8 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
private void writeParquetFile(String filePath, List<String> rowKeys) throws Exception { private void writeParquetFile(String filePath, List<String> rowKeys) throws Exception {
// Write out a parquet file // Write out a parquet file
Schema schema = HoodieAvroUtils.getRecordKeySchema(); Schema schema = HoodieAvroUtils.getRecordKeySchema();
BloomFilter filter = new BloomFilter(1000, 0.0001); BloomFilter filter = BloomFilterFactory
.createBloomFilter(1000, 0.0001, 10000, bloomFilterTypeToTest);
HoodieAvroWriteSupport writeSupport = HoodieAvroWriteSupport writeSupport =
new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter);
ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP, ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP,

View File

@@ -19,7 +19,9 @@
package org.apache.hudi.hive; package org.apache.hudi.hive;
import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.bloom.filter.BloomFilter;
import org.apache.hudi.common.bloom.filter.BloomFilterFactory;
import org.apache.hudi.common.bloom.filter.BloomFilterTypeCode;
import org.apache.hudi.common.minicluster.HdfsTestService; import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.minicluster.ZookeeperTestService; import org.apache.hudi.common.minicluster.ZookeeperTestService;
import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -267,7 +269,8 @@ public class TestUtil {
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema()); Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
BloomFilter filter = new BloomFilter(1000, 0.0001); BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
BloomFilterTypeCode.SIMPLE.name());
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter);
ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024, ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024,
ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,