[HUDI-2589] RFC-37: Metadata table based bloom index (#3989)
Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
committed by
GitHub
parent
16138db4f2
commit
72f7348830
@@ -60,7 +60,7 @@ The list of all RFCs can be found here.
|
||||
| 34 | [Hudi BigQuery Integration (WIP)](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=188745980) | `UNDER REVIEW` |
|
||||
| 35 | [Make Flink MOR table writing streaming friendly](https://cwiki.apache.org/confluence/display/HUDI/RFC-35%3A+Make+Flink+MOR+table+writing+streaming+friendly) | `UNDER REVIEW` |
|
||||
| 36 | [HUDI Metastore Server](https://cwiki.apache.org/confluence/display/HUDI/%5BWIP%5D+RFC-36%3A+HUDI+Metastore+Server) | `UNDER REVIEW` |
|
||||
| 37 | [Hudi metadata based bloom index] | `UNDER REVIEW` |
|
||||
| 37 | [Hudi Metadata based Bloom Index](rfc-37/rfc-37.md) | `IN PROGRESS` |
|
||||
| 38 | [Spark Datasource V2 Integration] | `UNDER REVIEW` |
|
||||
| 39 | [Incremental source for Debezium](./rfc-39/rfc-39.md) | `IN PROGRESS` |
|
||||
| 40 | [Hudi Connector for Trino](./rfc-40/rfc-40.md) | `IN PROGRESS` |
|
||||
|
||||
BIN
rfc/rfc-37/metadata_index_1.png
Normal file
BIN
rfc/rfc-37/metadata_index_1.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 217 KiB |
BIN
rfc/rfc-37/metadata_index_bloom_partition.png
Normal file
BIN
rfc/rfc-37/metadata_index_bloom_partition.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 196 KiB |
BIN
rfc/rfc-37/metadata_index_col_stats.png
Normal file
BIN
rfc/rfc-37/metadata_index_col_stats.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 56 KiB |
329
rfc/rfc-37/rfc-37.md
Normal file
329
rfc/rfc-37/rfc-37.md
Normal file
@@ -0,0 +1,329 @@
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
# RFC-37: Metadata based Bloom Index
|
||||
|
||||
## Proposers
|
||||
- @nsivabalan
|
||||
- @manojpec
|
||||
|
||||
## Approvers
|
||||
- @vinothchandar
|
||||
- @satishkotha
|
||||
|
||||
## Status
|
||||
JIRA: https://issues.apache.org/jira/browse/HUDI-2703
|
||||
|
||||
## Abstract
|
||||
Hudi maintains several indices to locate/map incoming records to file groups during writes. Most commonly used record
|
||||
index is the HoodieBloomIndex. Larger tables and global index has performance issues as the bloom filter from a large
|
||||
number of data files needed to be read and looked up. Reading from several files over the cloud object storage like S3
|
||||
also faces request throttling issues. We are proposing to build a new Metadata index (metadata table based bloom index)
|
||||
to boost the performance of existing bloom index.
|
||||
|
||||
## Background
|
||||
|
||||
HoodieBloomIndex is used to find the location of incoming records during every write. Bloom index assists Hudi in
|
||||
deterministically routing records to a given file group and to distinguish inserts vs updates. This aggregate bloom
|
||||
index is built from several bloom filters stored in the base file footers. Prior to bloom filter lookup, the file
|
||||
pruning for the incoming records is also done based on the record key min/max stats stored in the base file footers. In
|
||||
this RFC, we plan to build a new index for the bloom filters under the metadata table which to assist in bloom index
|
||||
based record location tagging. This overlaps
|
||||
with [RFC-27 Data skipping index ](https://cwiki.apache.org/confluence/display/HUDI/RFC-27+Data+skipping+index+to+improve+query+performance)
|
||||
in the read path for improving the query performance.
|
||||
|
||||
## Design
|
||||
HoodieBloomIndex involves the following steps to find the right location of incoming records
|
||||
|
||||
1. Find all the interested partitions and list all its data files.
|
||||
2. File Pruning: Load record key min/max details from all the interested data file footers. Filter files and generate
|
||||
files to keys mapping for the incoming records based on the key ranges using range interval tree built from
|
||||
previously loaded min/max details.
|
||||
3. Bloom Filter lookup: Filter files and prune files to keys mapping for the incoming keys mapping based on the bloom
|
||||
filter key lookup
|
||||
4. Final Look up in actual data files to find the right location of every incoming record
|
||||
|
||||
As we could see from step 1 and 2, we are in need of min and max values for "_hoodie_record_key" and bloom filters
|
||||
from all interested data files to perform the location tagging. In this design, we will add these key stats and
|
||||
bloom filter to the metadata table and thereby able to quickly load the interested details and do faster lookups.
|
||||
|
||||
Metadata table already has one partition `files` to help in partition file listing. For the metadata table based
|
||||
indices, we are proposing to add following two new partitions:
|
||||
1. `bloom_filter` - for the file level bloom filter
|
||||
2. `column_stats` - for the key range stats
|
||||
|
||||
Why metadata table:
|
||||
Metadata table uses HBase HFile - the tree map file format to store and retrieve data. HFile is an indexed file format
|
||||
and supports map like faster lookups by keys. Since, we will be storing stats/bloom for every file and the index will do
|
||||
lookups based on files, we should be able to benefit from the faster lookups in HFile.
|
||||
|
||||
<img src="metadata_index_1.png" alt="High Level Metadata Index Design" width="480"/>
|
||||
|
||||
Following sections will talk about different partitions, key formats and then dive into the data and control flows.
|
||||
|
||||
### MetaIndex/BloomFilter:
|
||||
|
||||
A new partition `bloom_filter` will be added under the metadata table. Bloom filters from all the base files in the
|
||||
data table will be added here. Metadata table is already in the HFile format. The existing metadata payload schema will
|
||||
be extended and shared for this partition also. The type field will be used to detect the bloom filter payload record.
|
||||
Here is the schema for the bloom filter payload record.
|
||||
```
|
||||
{
|
||||
"doc": "Metadata Index of bloom filters for all data files in the user table",
|
||||
"name": "BloomFilterMetadata",
|
||||
"type": [
|
||||
"null",
|
||||
{
|
||||
"doc": "Data file bloom filter details",
|
||||
"name": "HoodieMetadataBloomFilter",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"doc": "Bloom filter type code",
|
||||
"name": "type",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"doc": "Instant timestamp when this metadata was created/updated",
|
||||
"name": "timestamp",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"doc": "Bloom filter binary byte array",
|
||||
"name": "bloomFilter",
|
||||
"type": "bytes"
|
||||
},
|
||||
{
|
||||
"doc": "Bloom filter entry valid/deleted flag",
|
||||
"name": "isDeleted",
|
||||
"type": "boolean"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The key for the bloom filter record would be an encoded string representing the partition and base file combo. The
|
||||
partition and the file names are converted to deterministic hash based IDs, and then they are base64 encoded. Hash based
|
||||
IDs are easy to generate for the incoming new inserts records and for the lookup for the updated records. It doesn't
|
||||
need any dictionary to be added for the reverse lookups. Hash bits are chosen based on the cardinality and the collision
|
||||
probability desired for the support max scale deployment. Base64 encoding the hash IDs further reduces the on-disk
|
||||
storage space for these keys.
|
||||
|
||||
```
|
||||
key = base64_encode(concat(hash64(partition name), hash128(file name)))
|
||||
```
|
||||
|
||||
<img src="metadata_index_bloom_partition.png" alt="Bloom filter partition" width="500"/>
|
||||
|
||||
### MetaIndex/ColumnStats:
|
||||
|
||||
Another new partition `column_stats` will also be added under the metadata table to make the record key lookup code path
|
||||
much more performant. This metadata index also helps in the data skipping (please look at RFC-27 for more details). In
|
||||
the context of faster record key lookups for the update use cases, proposing `column_stats` index to be used for
|
||||
file pruning when generating the file to candidate keys mapping for the update records.The existing metadata payload
|
||||
schema will be extended and shared for this partition also. The type field will be used to detect the column stats
|
||||
payload record. Here is the schema for the column stats payload record.
|
||||
|
||||
```
|
||||
{
|
||||
"doc": "Metadata Index of column statistics for all data files in the user table",
|
||||
"name": "ColumnStatsMetadata",
|
||||
"type": [
|
||||
"null",
|
||||
{
|
||||
"doc": "Data file column statistics",
|
||||
"name": "HoodieColumnStats",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"doc": "File name for which this column statistics applies",
|
||||
"name": "fileName",
|
||||
"type": [
|
||||
"null",
|
||||
"string"
|
||||
]
|
||||
},
|
||||
{
|
||||
"doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
|
||||
"name": "minValue",
|
||||
"type": [
|
||||
"null",
|
||||
"string"
|
||||
]
|
||||
},
|
||||
{
|
||||
"doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
|
||||
"name": "maxValue",
|
||||
"type": [
|
||||
"null",
|
||||
"string"
|
||||
]
|
||||
},
|
||||
{
|
||||
"doc": "Total count of values",
|
||||
"name": "valueCount",
|
||||
"type": [
|
||||
"null",
|
||||
"long"
|
||||
]
|
||||
},
|
||||
{
|
||||
"doc": "Total count of null values",
|
||||
"name": "nullCount",
|
||||
"type": [
|
||||
"null",
|
||||
"long"
|
||||
]
|
||||
},
|
||||
{
|
||||
"doc": "Total storage size on disk",
|
||||
"name": "totalSize",
|
||||
"type": [
|
||||
"null",
|
||||
"long"
|
||||
]
|
||||
},
|
||||
{
|
||||
"doc": "Total uncompressed storage size on disk",
|
||||
"name": "totalUncompressedSize",
|
||||
"type": [
|
||||
"null",
|
||||
"long"
|
||||
]
|
||||
},
|
||||
{
|
||||
"doc": "Column range entry valid/deleted flag",
|
||||
"name": "isDeleted",
|
||||
"type": "boolean"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Column stats records hold key ranges (min and max) for the file. The key for the column stat record would be an
|
||||
encoded string representing the tuple set of column name, partition name and the base file. The string names of
|
||||
these fields are converted to deterministic hash based IDs, and then they are base64 encoded, just like the
|
||||
bloom filter key.
|
||||
|
||||
```
|
||||
key = base64_encode(concat(hash64(column name), hash64(partition name), hash128(file name)))
|
||||
```
|
||||
|
||||
While Hash based IDs have quite a few desirable properties in the context of Hudi index lookups, there is an impact
|
||||
on the column level schema changes though. Refer to [Schema Evolution](#Schema-Evolution) section for more details.
|
||||
|
||||
Below picture gives a pictorial representation of Column stats partition in metadata table.
|
||||
<img src="metadata_index_col_stats.png" alt="Column Stats Partition" width="480"/>
|
||||
|
||||
### Metadata Index lookup:
|
||||
|
||||
For the incoming upsert records, given their keys, tag their current location. The new algorithm for the
|
||||
index lookup would be
|
||||
|
||||
1. Generate the list of partitions and the list of keys under each partition to be looked up
|
||||
2. For all the involved partitions, load all its file list
|
||||
3. Level 1: Range pruning using `column_stats` index:
|
||||
1. For each of the record key, generate the column stats index lookup key based on the tuple
|
||||
(__hoodie_record_key, partition name, file path)
|
||||
2. Meta index lookup with the above key and if available get the value payload with the column stats details
|
||||
3. Prune the partition and its candidate files based on the range comparisons
|
||||
4. Level 2: Record pruning using `bloom_filter` index:
|
||||
1. From the shortlisted file candidates per partition, generate bloom filter index lookup key based on the tuple
|
||||
(partition name, file path)
|
||||
2. Meta index lookup with the above key to load the base file bloom filter
|
||||
3. Bloom filter lookup for the record key to generate the candidate keys that are probably available in the base file
|
||||
5. Level 3: Record validation
|
||||
1. Given the list of files and their candidate keys from above pruning, do the actual file lookup to confirm the keys
|
||||
2. Return the location (file id) of the final matching keys
|
||||
|
||||
### Schema Evolution:
|
||||
|
||||
HashID based key are deterministically generated from the tuple input. That is, for the tuple consisting of column name,
|
||||
partition name and file name, the key generated would always be the same. So, a table where the schema gets changed over
|
||||
time would have an impact on the keys already generated. The most common schema evolution use cases like change of
|
||||
column type, adding a new column are not affected though. Other relatively uncommon use cases like column name rename,
|
||||
dropping a column and adding a column with dropped name would have indices referring them more than needed. This would
|
||||
lead to the index lookup matching stale/new records across evolved schemas.
|
||||
|
||||
To avoid looking up stale/new index records, here are the design options we have:
|
||||
1. (Preferred) Query rewrite / Result recordset pruning
|
||||
1. Schema evolution layer should introduce query rewrite stage to detect evolved schemas for the input query and
|
||||
optionally include additional predicates to the query
|
||||
2. The resultant recordset can also be pruned based on the commit time and the schema change time
|
||||
3. Making input tuple set schema aware
|
||||
1. Along with column name, partition name and file path, a version/tag can also be added to make the key
|
||||
generated very schema specific. But, this choice has a performance impact as the lookup now has to be more of a
|
||||
prefix based instead of pointed lookups. That is, index lookup have to return records for all the versions/tags
|
||||
and pruning on top of this have to be done.
|
||||
|
||||
## Implementation
|
||||
|
||||
1. No change to the HoodieIndex public interface.
|
||||
```
|
||||
/**
|
||||
* Looks up the index and tags each incoming record with a location of a file that contains
|
||||
* the row (if it is actually present).
|
||||
*/
|
||||
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
|
||||
public abstract HoodieData<HoodieRecord<T>> tagLocation(
|
||||
HoodieData<HoodieRecord<T>> records, HoodieEngineContext context,
|
||||
HoodieTable hoodieTable) throws HoodieIndexException;
|
||||
```
|
||||
2. HoodieBloomIndex::explodeRecordsWithFileComparisons() will be extended to check for a new config `hoodie.metadata.file_pruning.enable`
|
||||
and if enabled, metadata table based column stat will be used for file pruning based on key ranges.
|
||||
3.
|
||||
|
||||
|
||||
### Writer flow:
|
||||
Let's walk through the writer flow to update these partitions.
|
||||
|
||||
Whenever a new commit is getting applied to metadata table, we do the following.<br>
|
||||
1. Files partition - prepare records for adding
|
||||
2. Column_stats partition - prepare records for adding
|
||||
[ColumnIndexID][PartitionIndexID][FileIndexID] => ColumnStats
|
||||
This involves reading the base file footers to fetch min max values for each column
|
||||
3. Bloom_filter partition - prepare records for adding
|
||||
[PartitionIndexID][FileIndexID] => BloomFilter
|
||||
This involves reading the base file footers.
|
||||
We can amortize the cost across (2) and (3) and just read it once and prepare/populate records for both partitions.
|
||||
4. Commit all these records to metadata table.
|
||||
|
||||
We need to ensure we have all sufficient info in WriteStatus get sent to metadata writer for every commit.
|
||||
|
||||
### Reader flow:
|
||||
When a new batch of write is ingested into Hudi, we need to tag the records with their
|
||||
original file group location. And this index will leverage both the partitions to deduce the
|
||||
record key => file name mappings. Refer to Metadata Index lookup section for more details.
|
||||
|
||||
## Rollout/Adoption Plan
|
||||
* Release 0.10.0 is a FlagDay release. Mean, the old metadata table will be wiped out and a new one will be built.
|
||||
* Metadata Index feature is planning for 0.10.x version. Any preparatory changes/features (
|
||||
like `Metadata new indexing for existing tables`, RFC proposal and doc pending) that are needed to have this feature
|
||||
in the later minor release need to be rolled out as part of 0.10.0
|
||||
* TODO: More details on the rollout plan
|
||||
|
||||
## Test Plan
|
||||
* Functionality
|
||||
* Tag location for existing keys
|
||||
* Tag location for non-existing keys
|
||||
* Performance
|
||||
* Prove Metadata based indices are helping upsert use cases
|
||||
* Upgrade
|
||||
* TODO: More details on the test plan
|
||||
Reference in New Issue
Block a user