[HUDI-2973] RFC-27: Data skipping index to improve query performance (#4728)
- Updating the schema used for data skipping index
This commit is contained in:
committed by
GitHub
parent
907e60c252
commit
51ee5005a6
@@ -34,19 +34,20 @@ JIRA: https://issues.apache.org/jira/browse/HUDI-1822
|
|||||||
|
|
||||||
## Abstract
|
## Abstract
|
||||||
|
|
||||||
Query engines typically scan large amounts of irrelevant data for query planning and execution. Some workarounds are
|
Query engines typically scan large amounts of data for query planning and execution. Few data skipping strategies are
|
||||||
available to reduce amount of irrelevant data scanned. These include
|
available to reduce the amount of data scanned, like
|
||||||
- Partition pruning
|
|
||||||
- File pruning <br>
|
|
||||||
- Some data file formats contain metadata including range information for certain columns (for parquet, this metadata
|
|
||||||
is stored in footer).
|
|
||||||
- As part of query planning, all range information from data files is read.
|
|
||||||
- Irrelevant data files are then pruned based on predicates and available range information
|
|
||||||
|
|
||||||
Partition pruning typically puts the burden on users to select partitions where the data may exist. File pruning approach
|
- Partition pruning
|
||||||
is expensive and does not scale if there are large number of partitions and data files to be scanned. So we propose a
|
- User has to select the partitions to narrow down the data to be scanned for the query.
|
||||||
new solution to store additional information as part of Hudi metadata table to implement data skipping index. The
|
- File pruning
|
||||||
goals of data skipping index is to provide:
|
- Some data file formats contain metadata including range information for certain columns, like for parquet, this
|
||||||
|
metadata is stored in the file footer. As part of query planning, all range information from data files are loaded
|
||||||
|
and data files are then pruned based on the comparisons done for the query expression with the column range
|
||||||
|
information.
|
||||||
|
- This approach does not scale if there are a large number of partitions and data files to be scanned.
|
||||||
|
|
||||||
|
We propose a new data skipping approach here for improving the query performance. to store additional information as
|
||||||
|
part of Hudi metadata table to implement data skipping index. The goals of data skipping index is to provide:
|
||||||
|
|
||||||
- Global index: Users query for information they need without need for specifying partitions. Index can effectively find
|
- Global index: Users query for information they need without need for specifying partitions. Index can effectively find
|
||||||
data files in the table.
|
data files in the table.
|
||||||
@@ -90,18 +91,18 @@ So, high level requirement for this column_stats partition is (pertaining to thi
|
|||||||
|
|
||||||
### Storage format
|
### Storage format
|
||||||
To cater to the above requirement, we plan to encode column name, partition path and file name to the keys in HFile.
|
To cater to the above requirement, we plan to encode column name, partition path and file name to the keys in HFile.
|
||||||
Since HFile supports efficient range/prefix search, our look up should be very fast.
|
Since HFile supports efficient range/prefix search, our lookup should be very fast.
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
We plan to generate unique and random and unique hash IDs for all 3 components
|
We plan to generate unique and random and unique hash IDs for all 3 components
|
||||||
- ColumnID :
|
- ColumnIndexID :
|
||||||
- base64(hash32(column name))
|
- base64(hash32(column name))
|
||||||
- on-disk size = 12bytes per col_stat per file
|
- on-disk size = 12bytes per col_stat per file
|
||||||
- PartitionID:
|
- PartitionIndexID:
|
||||||
- base64(hash32(partition name))
|
- base64(hash32(partition name))
|
||||||
- on-disk size = 12bytes per partition
|
- on-disk size = 12bytes per partition
|
||||||
- FileID:
|
- FileIndexID:
|
||||||
- base64(hash128(file name))
|
- base64(hash128(file name))
|
||||||
- on-disk size = 24bytes per file
|
- on-disk size = 24bytes per file
|
||||||
|
|
||||||
@@ -127,8 +128,8 @@ We plan to generate unique and random and unique hash IDs for all 3 components
|
|||||||
Takes up larger space in-memory and on-disk compared to Sequential IDs. Theoretically, the compression ratio should be lesser compared to Sequential IDs.
|
Takes up larger space in-memory and on-disk compared to Sequential IDs. Theoretically, the compression ratio should be lesser compared to Sequential IDs.
|
||||||
|
|
||||||
Key format in column_stats partition<br/>
|
Key format in column_stats partition<br/>
|
||||||
- [colId][PartitionId][FileId]
|
- ColumnStatsIndexID = ColumnIndexID.append(PartitionIndexID).append(FileIndexID)
|
||||||
- [colId]+"agg"+[PartitionId]
|
- ColumnStatsAggregateIndexID = ColumnIndexID.append(PartitionIndexID)
|
||||||
|
|
||||||
First type will be used to store one entry per column per file. And second type will be used to store one aggregated
|
First type will be used to store one entry per column per file. And second type will be used to store one aggregated
|
||||||
entry per column per partition. This will be a fixed size key. Lookups don't have to search for ID delimiters as in the
|
entry per column per partition. This will be a fixed size key. Lookups don't have to search for ID delimiters as in the
|
||||||
@@ -140,17 +141,17 @@ our use-case as we have chosen the key format consciously having this in mind.
|
|||||||
|
|
||||||
Given a list of columns and optionally partitions, return a list of matching file names.
|
Given a list of columns and optionally partitions, return a list of matching file names.
|
||||||
|
|
||||||
1. We can do prefix search of [ColumnID] or [ColumnID][PartitionID]
|
1. We can do prefix search of [ColumnIndexID] or [ColumnIndexID][PartitionIndexID]
|
||||||
- If both columnId and partitionIds are supplied, we will do range read of [colId][partitionId].
|
- If both columnId and partitionIds are supplied, we will do range read of [colId][partitionId].
|
||||||
- If list of partitions not available as part of query, we will first look up [colId]+"agg" to do prefix search
|
- If list of partitions not available as part of query, we will first look up [colId]+"agg" to do prefix search
|
||||||
for partition level stats. Filter for those partitions which matches the predicates and then follow (1) as in previous line.
|
for partition level stats. Filter for those partitions which matches the predicates and then follow (1) as in previous line.
|
||||||
|
|
||||||
2. Fetch only interested entries for [colId][partitionId] list.
|
2. Fetch only interested entries for [colId][partitionId] list.
|
||||||
3. Will look up the stats and filter for matching FileIDs
|
3. Will look up the stats and filter for matching FileIndexIDs
|
||||||
4. Reverse lookup in Files partition to get FileID to FileName mapping.
|
4. Reverse lookup in Files partition to get FileIndexID to FileName mapping.
|
||||||
|
|
||||||
Note:
|
Note:
|
||||||
As you could see here, reverse look up of FileId to fileName mapping has to go into "Files" partition to satisfy our requirement.
|
As you could see here, reverse look up of FileIndexID to fileName mapping has to go into "Files" partition to satisfy our requirement.
|
||||||
So, "Files" partition will be added with additional entries of fileId to fileName mappings on the write path.
|
So, "Files" partition will be added with additional entries of fileId to fileName mappings on the write path.
|
||||||
|
|
||||||
#### Sharding:
|
#### Sharding:
|
||||||
@@ -194,54 +195,75 @@ field will be used to detect the column stats payload record. Here is the schema
|
|||||||
.
|
.
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
"doc": "Metadata Index of column statistics for all data files in the user table",
|
||||||
"name": "ColumnStatsMetadata",
|
"name": "ColumnStatsMetadata",
|
||||||
"doc": "Contains information about column statistics for all data files in the table",
|
|
||||||
"type": [
|
"type": [
|
||||||
"null",
|
"null",
|
||||||
{
|
{
|
||||||
"type": "record",
|
"doc": "Data file column statistics",
|
||||||
"name": "HoodieColumnStats",
|
"name": "HoodieColumnStats",
|
||||||
|
"type": "record",
|
||||||
"fields": [
|
"fields": [
|
||||||
{
|
{
|
||||||
"name": "rangeLow",
|
"doc": "File name for which this column statistics applies",
|
||||||
|
"name": "fileName",
|
||||||
"type": [
|
"type": [
|
||||||
"null",
|
"null",
|
||||||
"bytes"
|
"string"
|
||||||
],
|
]
|
||||||
"doc": "Low end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type"
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "rangeHigh",
|
"doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
|
||||||
|
"name": "minValue",
|
||||||
"type": [
|
"type": [
|
||||||
"null",
|
"null",
|
||||||
"bytes"
|
"string"
|
||||||
],
|
]
|
||||||
"doc": "High end of the range. For now, this is a String. Based on main data table schema, we can convert it to appropriate type"
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name":"total_values",
|
"doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
|
||||||
"type":["long", "null"],
|
"name": "maxValue",
|
||||||
"doc" : "Stores total values for this column in the resepective data file"
|
"type": [
|
||||||
},
|
"null",
|
||||||
|
"string"
|
||||||
|
]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name":"total_nulls",
|
"doc": "Total count of values",
|
||||||
"type":["long", "null"],
|
"name": "valueCount",
|
||||||
"doc" : "Stores total null values for this column in the resepective data file"
|
"type": [
|
||||||
},
|
"null",
|
||||||
|
"long"
|
||||||
|
]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name":"total_nans",
|
"doc": "Total count of null values",
|
||||||
"type":["long", "null"],
|
"name": "nullCount",
|
||||||
"doc" : "Stores total Nan values for this column in the resepective data file"
|
"type": [
|
||||||
},
|
"null",
|
||||||
|
"long"
|
||||||
|
]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name":"total_size_on_disk",
|
"doc": "Total storage size on disk",
|
||||||
"type":["long", "null"],
|
"name": "totalSize",
|
||||||
"doc" : "Stores total size occupied by this column on disk corresponding to the resepective data file"
|
"type": [
|
||||||
},
|
"null",
|
||||||
|
"long"
|
||||||
|
]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
|
"doc": "Total uncompressed storage size on disk",
|
||||||
|
"name": "totalUncompressedSize",
|
||||||
|
"type": [
|
||||||
|
"null",
|
||||||
|
"long"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"doc": "Column range entry valid/deleted flag",
|
||||||
"name": "isDeleted",
|
"name": "isDeleted",
|
||||||
"type": "boolean",
|
"type": "boolean"
|
||||||
"doc": "True if this file has been deleted"
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@@ -254,7 +276,7 @@ encoded string as discussed earlier.
|
|||||||
|
|
||||||
```
|
```
|
||||||
key = base64_encode(hash64(column name) + hash64(partition name) + hash128(file path))
|
key = base64_encode(hash64(column name) + hash64(partition name) + hash128(file path))
|
||||||
key = base64_encode(hash64(column name) + "agg" + hash64(partition name))
|
key = base64_encode(hash64(column name) + hash64(partition name))
|
||||||
```
|
```
|
||||||
|
|
||||||
While Hash based IDs have quite a few desirable properties in the context of Hudi index lookups, there is an impact
|
While Hash based IDs have quite a few desirable properties in the context of Hudi index lookups, there is an impact
|
||||||
@@ -265,13 +287,13 @@ Let's walk through the writer flow to update column_stats partition in metadata
|
|||||||
|
|
||||||
1. Files partition - prepare records for adding // just calling out whats required in the context of column_stats
|
1. Files partition - prepare records for adding // just calling out whats required in the context of column_stats
|
||||||
partition. General files partition will be updated as usual to store file listing information.
|
partition. General files partition will be updated as usual to store file listing information.
|
||||||
- FileID => FileName mapping entries
|
- FileIndexID => FileName mapping entries
|
||||||
- PartitionID => PartitionName entry, if not already exists
|
- PartitionIndexID => PartitionName entry, if not already exists
|
||||||
- Since these IDs are hash based IDs, no look up of prior usages is required here. If not, we need to know what was
|
- Since these IDs are hash based IDs, no look up of prior usages is required here. If not, we need to know what was
|
||||||
the last assigned ID and then go about assigning new incremental/sequential IDs, which slows down the performance significantly
|
the last assigned ID and then go about assigning new incremental/sequential IDs, which slows down the performance significantly
|
||||||
2. Column_stats partition - prepare records for adding
|
2. Column_stats partition - prepare records for adding
|
||||||
- [ColumnID][PartitionID][FileID] => ColumnStat
|
- [ColumnIndexID][PartitionIndexID][FileIndexID] => ColumnStat
|
||||||
- [ColumnId]"agg"[PartitionId] => ColumnStat
|
- [ColumnIndexID]"agg"[PartitionIndexID] => ColumnStat
|
||||||
- This involves reading the base file footers to fetch min max and other stats to populate values for the record.
|
- This involves reading the base file footers to fetch min max and other stats to populate values for the record.
|
||||||
d. Commit all these records to metadata table.
|
d. Commit all these records to metadata table.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user