[HUDI-3534] [RFC-34] Added the implementation details for the BigQuery integration (#4503)
This commit is contained in:
committed by
GitHub
parent
c34eb07598
commit
8add740d22
@@ -69,7 +69,7 @@ The list of all RFCs can be found here.
|
||||
| 31 | [Hive integration Improvement](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+31%3A+Hive+integration+Improvment)| `UNDER REVIEW` |
|
||||
| 32 | [Kafka Connect Sink for Hudi](https://cwiki.apache.org/confluence/display/HUDI/RFC-32+Kafka+Connect+Sink+for+Hudi)| `ONGOING` |
|
||||
| 33 | [Hudi supports more comprehensive Schema Evolution](https://cwiki.apache.org/confluence/display/HUDI/RFC+-+33++Hudi+supports+more+comprehensive+Schema+Evolution)| `ONGOING` |
|
||||
| 34 | [Hudi BigQuery Integration (WIP)](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=188745980) | `IN PROGRESS` |
|
||||
| 34 | [Hudi BigQuery Integration](./rfc-34/rfc-34.md) | `COMPLETED` |
|
||||
| 35 | [Make Flink MOR table writing streaming friendly](https://cwiki.apache.org/confluence/display/HUDI/RFC-35%3A+Make+Flink+MOR+table+writing+streaming+friendly)| `UNDER REVIEW` |
|
||||
| 36 | [HUDI Metastore Server](https://cwiki.apache.org/confluence/display/HUDI/%5BWIP%5D+RFC-36%3A+HUDI+Metastore+Server)| `UNDER REVIEW` |
|
||||
| 37 | [Hudi Metadata based Bloom Index](rfc-37/rfc-37.md) | `IN PROGRESS` |
|
||||
|
||||
BIN
rfc/rfc-34/big-query-arch.png
Normal file
BIN
rfc/rfc-34/big-query-arch.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 37 KiB |
166
rfc/rfc-34/rfc-34.md
Normal file
166
rfc/rfc-34/rfc-34.md
Normal file
@@ -0,0 +1,166 @@
|
||||
# Hudi BigQuery Integration
|
||||
|
||||
## Abstract
|
||||
|
||||
BigQuery is Google Cloud's fully managed, petabyte-scale, and cost-effective analytics data warehouse that lets you run
|
||||
analytics over vast amounts of data in near real time. BigQuery
|
||||
currently [doesn’t support](https://cloud.google.com/bigquery/external-data-cloud-storage) Apache Hudi, but it has
|
||||
support for the Parquet and other formats. The proposal is to implement a BigQuerySync similar to HiveSync to sync the
|
||||
Hudi table as the BigQuery External Parquet table, so that users can query the Hudi tables using BigQuery. Uber is
|
||||
already syncing some of its Hudi tables to BigQuery data mart this will help them to write, sync and query.
|
||||
|
||||
## Background
|
||||
|
||||
Hudi table types define how data is indexed & laid out on the DFS and how the above primitives and timeline activities
|
||||
are implemented on top of such organization (i.e how data is written). In turn, query types define how the underlying
|
||||
data is exposed to the queries (i.e how data is read).
|
||||
|
||||
Hudi supports the following table types:
|
||||
|
||||
* [Copy On Write](https://hudi.apache.org/docs/table_types#copy-on-write-table): Stores data using exclusively columnar
|
||||
file formats (e.g parquet). Updates simply version & rewrite the files by performing a synchronous merge during write.
|
||||
* [Merge On Read](https://hudi.apache.org/docs/table_types#merge-on-read-table): Stores data using a combination of
|
||||
columnar (e.g parquet) + row based (e.g avro) file formats. Updates are logged to delta files & later compacted to
|
||||
produce new versions of columnar files synchronously or asynchronously.
|
||||
|
||||
Hudi maintains multiple versions of the Parquet files and tracks the latest version using Hudi metadata (Cow), since
|
||||
BigQuery doesn’t support Hudi yet, when you sync the Hudi’s parquet files to BigQuery and query it without Hudi’s
|
||||
metadata layer, it will query all the versions of the parquet files which might cause duplicate rows.
|
||||
|
||||
To avoid the above scenario, this proposal is to implement a BigQuery sync tool which will use the Hudi metadata to know
|
||||
which files are latest and filter only the latest version of parquet files to BigQuery external table so that users can
|
||||
query the Hudi tables without any duplicate records.
|
||||
|
||||
## Implementation
|
||||
|
||||
This new feature will implement
|
||||
the [AbstractSyncTool](https://github.com/apache/hudi/blob/master/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java)
|
||||
similar to
|
||||
the [HiveSyncTool](https://github.com/apache/hudi/blob/master/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java)
|
||||
named BigQuerySyncTool with sync methods for CoW tables. The sync implementation will identify the latest parquet files
|
||||
for each .commit file and keep these manifests synced with the BigQuery manifest table. Spark datasource & DeltaStreamer
|
||||
can already take a list of such classes to keep these manifests synced.
|
||||
|
||||
###
|
||||
|
||||

|
||||
|
||||
To avoid duplicate records on the Hudi CoW table, we need to generate the list of latest snapshot files and create a BQ
|
||||
table for it, then use that table to filter the duplicate records from the history table.
|
||||
|
||||
### Steps to create Hudi table on BigQuery
|
||||
|
||||
1. Let's say you have a Hudi table data on google cloud storage (GCS).
|
||||
|
||||
```
|
||||
CREATE TABLE dwh.bq_demo_partitioned_cow (
|
||||
id bigint,
|
||||
name string,
|
||||
price double,
|
||||
ts bigint,
|
||||
dt string
|
||||
)
|
||||
using hudi
|
||||
partitioned by (dt)
|
||||
options (
|
||||
type = 'cow',
|
||||
primaryKey = 'id',
|
||||
preCombineField = 'ts',
|
||||
hoodie.datasource.write.drop.partition.columns = 'true'
|
||||
)
|
||||
location 'gs://hudi_datasets/bq_demo_partitioned_cow/';
|
||||
```
|
||||
|
||||
BigQuery doesn't accept the partition column in the parquet schema, hence we need to drop the partition columns from the
|
||||
schema by enabling this flag:
|
||||
|
||||
```
|
||||
hoodie.datasource.write.drop.partition.columns = 'true'
|
||||
```
|
||||
|
||||
2. As part of the BigQuerySync, the sync tool will generate/update the manifest files inside the .hoodie metadata files.
|
||||
For tables which already exist, you can generate a manifest file for the Hudi table which has the list of the latest
|
||||
snapshot parquet file names in a CSV format with only one column the file name. The location of the manifest file
|
||||
will be on the .hoodie metadata folder (`gs://bucket_name/table_name/.hoodie/manifest/latest_snapshot_files.csv`)
|
||||
|
||||
```
|
||||
// this command is coming soon.
|
||||
// the alternative for this command could be a JAVA API to generate the manifest.
|
||||
GENERATE symlink_format_manifest FOR TABLE dwh.bq_demo_partitioned_cow;
|
||||
```
|
||||
|
||||
3. Create a BQ table named `hudi_table_name_manifest` with only one column filename with this location gs:
|
||||
//bucket_name/table_name/.hoodie/manifest/latest_snapshot_files.csv.
|
||||
|
||||
```
|
||||
CREATE EXTERNAL TABLE `my-first-project.dwh.bq_demo_partitioned_cow_manifest`
|
||||
(
|
||||
filename STRING
|
||||
)
|
||||
OPTIONS(
|
||||
format="CSV",
|
||||
uris=["gs://hudi_datasets/bq_demo_partitioned_cow/.hoodie/manifest/latest_snapshot_files.csv"]
|
||||
);
|
||||
```
|
||||
|
||||
4. Create another BQ table named `hudi_table_name_history` with this location `gs://bucket_name/table_name`, don't use
|
||||
this table to query the data, this table will have duplicate records since it scans all the versions of parquet files
|
||||
in the table/partition folders.
|
||||
|
||||
```
|
||||
CREATE EXTERNAL TABLE `my-first-project.dwh.bq_demo_partitioned_cow_history`
|
||||
WITH
|
||||
PARTITION COLUMNS
|
||||
OPTIONS(
|
||||
ignore_unknown_values=true,
|
||||
format="PARQUET",
|
||||
hive_partition_uri_prefix="gs://hudi_datasets/bq_demo_partitioned_cow/",
|
||||
uris=["gs://hudi_snowflake/bq_demo_partitioned_cow/dt=*"]
|
||||
);
|
||||
```
|
||||
|
||||
5. Create a BQ view with the same hudi table name with this query, this view you created has the data from the Hudi
|
||||
table without any duplicates, you can use that table to query the data.
|
||||
|
||||
```
|
||||
CREATE VIEW `my-first-project.dwh.bq_demo_partitioned_cow` AS
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
`my-first-project.dwh.bq_demo_partitioned_cow_history`
|
||||
WHERE
|
||||
_hoodie_file_name IN (
|
||||
SELECT
|
||||
filename
|
||||
FROM
|
||||
`my-first-project.dwh.bq_demo_partitioned_cow_manifest`
|
||||
);
|
||||
```
|
||||
|
||||
BigQuerySync tool will
|
||||
use [HoodieTableMetaClient](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java)
|
||||
methods to get the list of latest set of parquet data files to generate the manifest csv file, then will invoke
|
||||
the [BigQuery Java Client](https://github.com/googleapis/java-bigquery/blob/main/samples/snippets/src/main/java/com/example/bigquery/CreateTableExternalHivePartitioned.java)
|
||||
to create the manifest table, history table and hudi table views.
|
||||
|
||||
**All the steps described here will be automated, all you have to do is to supply a bunch of configs to enable the
|
||||
BigQuery sync.**
|
||||
|
||||
## Rollout/Adoption Plan
|
||||
|
||||
There are no impacts to existing users since this is entirely a new feature to support a new use case hence there are no
|
||||
migrations/behavior changes required.
|
||||
|
||||
After the BigQuery sync tool has been implemented, I will reach out to Uber's Hudi/BigQuery team to rollout this feature
|
||||
for their BigQuery ingestion service.
|
||||
|
||||
## Test Plan
|
||||
|
||||
This RFC aims to implement a new SyncTool to sync the Hudi table to BigQuery, to test this feature, there will be some
|
||||
test tables created and updated on to the BigQuery along with unit tests for the code. Since this is an entirely new
|
||||
feature, I am confident that this will not cause any regressions during and after roll out.
|
||||
|
||||
## Future Plans
|
||||
|
||||
After this feature has been rolled out, the same model can be applied to sync the Hudi tables to other external data
|
||||
warehouses like Snowflake.
|
||||
Reference in New Issue
Block a user