The streaming reader should only monitor the delta log files, if there are parquet commits but we recognize as logs, the reader would report FileNotFound exception.
Currently we assign the buckets by record partition path which could
cause hotspot if the partition field is datetime type. Changes to assign
buckets by grouping the record whth their key first, the assignment is
valid if only there is no conflict(two task write to the same bucket).
This patch also changes the coordinator execution to be asynchronous.
Current we did a soft delete for DELETE row data when writes into hoodie
table. For streaming read of MOR table, the Flink reader detects the
delete records and still emit them if the record key semantics are still
kept.
This is useful and actually a must for streaming ETL pipeline
incremental computation.
Read optimized query returns the records from:
* COW table: the latest parquet files
* MOR table: parquet file records from the latest compaction committed
The SQL PRIMARY KEY semantics is very same with Hoodie record key, using
PRIMARY KEY is more straight-forward way instead of a table option:
hoodie.datasource.write.recordkey.field.
After this change, both PRIMARY KEY and table option can define hoodie
record key, while the PRIMARY KEY has higher priority if both are
defined.
Note: a column with PRIMARY KEY constraint is forced to be non-nullable.
We should implement the interface HoodieTableSource.explainSource to
track the table source signature diff for all kinds of pushing down,
such as filter pushing or limit pushing.
In order to support object storage, we need these changes:
* Use the Hadoop filesystem so that we can find the plugin filesystem
* Do not fetch file size until the file handle is closed
* Do not close the opened filesystem because we want to use the
filesystem cache
A Flink SQL table has DDL that defines the table schema, we can use that
to infer the Avro schema and there is no need to declare a Avro schema
explicitly anymore.
But we still keep the config option for explicit Avro schema in case
there is corner cases that the inferred schema is not correct
(especially for the nullability).
Supports two read modes:
* Read the full data set starting from the latest commit instant and
subsequent incremental data set
* Read data set that starts from a specified commit instant
- The #initializeState executes before #open, thus, the
#checkPartitionsLoaded may see null `initialPartitionsToLoad`
- Only load the existing partitions
This is the #step 2 of RFC-24:
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal
This PR introduce a BucketAssigner that assigns bucket ID (partition
path & fileID) for each stream record.
There is no need to look up index and partition the records anymore in
the following pipeline for these records,
we actually decide the write target location before the write and each
record computes its location when the BucketAssigner receives it, thus,
the indexing is with streaming style.
Computing locations for a batch of records all at a time is resource
consuming so a pressure to the engine,
we should avoid that in streaming system.