diff --git a/docs/admin_guide.md b/docs/admin_guide.md index 42b2185d6..9b8c9981e 100644 --- a/docs/admin_guide.md +++ b/docs/admin_guide.md @@ -21,6 +21,42 @@ Once hoodie has been built via `mvn clean install -DskipTests`, the shell can be A hoodie dataset resides on HDFS, in a location referred to as the **basePath** and we would need this location in order to connect to a Hoodie dataset. Hoodie library effectively manages this HDFS dataset internally, using .hoodie subfolder to track all metadata +To initialize a hoodie table, use the following command. + +``` +18/09/06 15:56:52 INFO annotation.AutowiredAnnotationBeanPostProcessor: JSR-330 'javax.inject.Inject' annotation found and supported for autowiring +============================================ +* * +* _ _ _ _ * +* | | | | | (_) * +* | |__| | ___ ___ __| |_ ___ * +* | __ |/ _ \ / _ \ / _` | |/ _ \ * +* | | | | (_) | (_) | (_| | | __/ * +* |_| |_|\___/ \___/ \__,_|_|\___| * +* * +============================================ + +Welcome to Hoodie CLI. Please type help if you are looking for help. +hoodie->create --path /user/hive/warehouse/table1 --tableName hoodie_table_1 --tableType COPY_ON_WRITE +..... +18/09/06 15:57:15 INFO table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE from ... +``` + +To see the description of hoodie table, use the command: +``` +hoodie:hoodie_table_1->desc +18/09/06 15:57:19 INFO timeline.HoodieActiveTimeline: Loaded instants [] + _________________________________________________________ + | Property | Value | + |========================================================| + | basePath | ... | + | metaPath | ... | + | fileSystem | hdfs | + | hoodie.table.name | hoodie_table_1 | + | hoodie.table.type | COPY_ON_WRITE | + | hoodie.archivelog.folder| | +``` + Following is a sample command to connect to a Hoodie dataset contains uber trips. ``` @@ -135,7 +171,7 @@ hoodie:trips->stats filesizes --partitionPath 2016/09/01 --sortBy "95th" --desc ________________________________________________________________________________________________ | CommitTime | Min | 10th | 50th | avg | 95th | Max | NumFiles| StdDev | |===============================================================================================| - | 20161004211210| 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 2 | 2.3 KB | + | | 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 2 | 2.3 KB | .... .... ``` @@ -158,6 +194,117 @@ hoodie:trips->stats wa In order to limit the amount of growth of .commit files on HDFS, Hoodie archives older .commit files (with due respect to the cleaner policy) into a commits.archived file. This is a sequence file that contains a mapping from commitNumber => json with raw information about the commit (same that is nicely rolled up above). + +#### Compactions + +To get an idea of the lag between compaction and writer applications, use the below command to list down all +pending compactions. + +``` +hoodie:trips->compactions show all + ___________________________________________________________________ + | Compaction Instant Time| State | Total FileIds to be Compacted| + |==================================================================| + | | REQUESTED| 35 | + | | INFLIGHT | 27 | + +``` + +To inspect a specific compaction plan, use + +``` +hoodie:trips->compaction show --instant + _________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________ + | Partition Path| File Id | Base Instant | Data File Path | Total Delta Files| getMetrics | + |================================================================================================================================================================================================================================================ + | 2018/07/17 | | | viewfs://ns-default/.../../UUID_.parquet | 1 | {TOTAL_LOG_FILES=1.0, TOTAL_IO_READ_MB=1230.0, TOTAL_LOG_FILES_SIZE=2.51255751E8, TOTAL_IO_WRITE_MB=991.0, TOTAL_IO_MB=2221.0}| + +``` + +To manually schedule or run a compaction, use the below command. This command uses spark launcher to perform compaction +operations. NOTE : Make sure no other application is scheduling compaction for this dataset concurrently + +``` +hoodie:trips->help compaction schedule +Keyword: compaction schedule +Description: Schedule Compaction + Keyword: sparkMemory + Help: Spark executor memory + Mandatory: false + Default if specified: '__NULL__' + Default if unspecified: '1G' + +* compaction schedule - Schedule Compaction +``` + +``` +hoodie:trips->help compaction run +Keyword: compaction run +Description: Run Compaction for given instant time + Keyword: tableName + Help: Table name + Mandatory: true + Default if specified: '__NULL__' + Default if unspecified: '__NULL__' + + Keyword: parallelism + Help: Parallelism for hoodie compaction + Mandatory: true + Default if specified: '__NULL__' + Default if unspecified: '__NULL__' + + Keyword: schemaFilePath + Help: Path for Avro schema file + Mandatory: true + Default if specified: '__NULL__' + Default if unspecified: '__NULL__' + + Keyword: sparkMemory + Help: Spark executor memory + Mandatory: true + Default if specified: '__NULL__' + Default if unspecified: '__NULL__' + + Keyword: retry + Help: Number of retries + Mandatory: true + Default if specified: '__NULL__' + Default if unspecified: '__NULL__' + + Keyword: compactionInstant + Help: Base path for the target hoodie dataset + Mandatory: true + Default if specified: '__NULL__' + Default if unspecified: '__NULL__' + +* compaction run - Run Compaction for given instant time +``` + +##### Up-Coming CLI for Compaction + +In the next release, more useful CLI to revert/repair compaction schedules will be added. Here is a preview of them: + +Validating a compaction plan : Check if all the files necessary for compactions are present and are valid + +``` +hoodie:trips->compaction validate --compactionInstant +``` + +The following commands must be executed without any other writer/ingestion application running. + +Sometimes, it becomes necessary to remove a fileId from a compaction-plan inorder to speed-up or unblock compaction +operation. Any new log-files that happened on this file after the compaction got scheduled will be safely renamed +so that are preserved. Hudi provides the following CLI to support it + +``` +hoodie:trips->compaction unscheduleFileId --fileId +``` + +In other cases, an entire compaction plan needs to be reverted. This is supported by the following CLI +``` +hoodie:trips->compaction unschedule --compactionInstant +``` + ## Metrics Once the Hoodie Client is configured with the right datasetname and environment for metrics, it produces the following graphite metrics, that aid in debugging hoodie datasets diff --git a/docs/concepts.md b/docs/concepts.md index 8de1915a5..4097efbb2 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -62,6 +62,23 @@ Each record is uniquely identified by a `record key` and mapped to a file id for and file id, never changes once the first version of a record has been written to a file. In short, the `file id` identifies a group of files, that contain all versions of a group of records. +## Terminologies + + * `Hudi Dataset` + A structured hive/spark table managed by Hudi. Hudi supports both partitioned and non-partitioned Hive tables. + * `Commit` + A commit marks a new batch of data applied to a dataset. Hudi maintains monotonically increasing timestamps to track commits and guarantees that a commit is atomically + published. + * `Commit Timeline` + Commit Timeline refers to the sequence of Commits that was applied in order on a dataset over its lifetime. + * `File Slice` + Hudi provides efficient handling of updates by having a fixed mapping between record key to a logical file Id. + Hudi uses MVCC to provide atomicity and isolation of readers from a writer. This means that a logical fileId will + have many physical versions of it. Each of these physical version of a file represents a complete view of the + file as of a commit and is called File Slice + * `File Group` + A file-group is a file-slice timeline. It is a list of file-slices in commit order. It is identified by `file id` + ## Copy On Write diff --git a/docs/images/async_compac_1.png b/docs/images/async_compac_1.png new file mode 100644 index 000000000..2bfbff901 Binary files /dev/null and b/docs/images/async_compac_1.png differ diff --git a/docs/images/async_compac_2.png b/docs/images/async_compac_2.png new file mode 100644 index 000000000..368628de9 Binary files /dev/null and b/docs/images/async_compac_2.png differ diff --git a/docs/images/async_compac_3.png b/docs/images/async_compac_3.png new file mode 100644 index 000000000..0e535fcb2 Binary files /dev/null and b/docs/images/async_compac_3.png differ diff --git a/docs/images/async_compac_4.png b/docs/images/async_compac_4.png new file mode 100644 index 000000000..537b2ddd3 Binary files /dev/null and b/docs/images/async_compac_4.png differ diff --git a/docs/implementation.md b/docs/implementation.md index 937878d26..0e1508380 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -22,24 +22,26 @@ Hoodie upsert/insert is merely a Spark DAG, that can be broken into two big piec ## Index Hoodie currently provides two choices for indexes : `BloomIndex` and `HBaseIndex` to map a record key into the file id to which it belongs to. This enables -us to speed up upserts significantly, without scanning over every record in the dataset. +us to speed up upserts significantly, without scanning over every record in the dataset. Hoodie Indices can be classified based on +their ability to lookup records across partition. A `global` index does not need partition information for finding the file-id for a record key +but a `non-global` does. -#### HBase Index +#### HBase Index (global) Here, we just use HBase in a straightforward way to store the mapping above. The challenge with using HBase (or any external key-value store for that matter) is performing rollback of a commit and handling partial index updates. Since the HBase table is indexed by record key and not commit Time, we would have to scan all the entries which will be prohibitively expensive. Insteead, we store the commit time with the value and discard its value if it does not belong to a valid commit. -#### Bloom Index +#### Bloom Index (non-global) This index is built by adding bloom filters with a very high false positive tolerance (e.g: 1/10^9), to the parquet file footers. The advantage of this index over HBase is the obvious removal of a big external dependency, and also nicer handling of rollbacks & partial updates since the index is part of the data file itself. -At runtime, checking the Bloom Index for a given set of record keys effectively ammonts to checking all the bloom filters within a given +At runtime, checking the Bloom Index for a given set of record keys effectively amounts to checking all the bloom filters within a given partition, against the incoming records, using a Spark join. Much of the engineering effort towards the Bloom index has gone into scaling this join -by caching the incoming RDD[HoodieRecord] to be able and dynamically tuning join parallelism, to avoid hitting Spark limitations like 2GB maximum +by caching the incoming RDD[HoodieRecord] and dynamically tuning join parallelism, to avoid hitting Spark limitations like 2GB maximum for partition size. As a result, Bloom Index implementation has been able to handle single upserts upto 5TB, in a reliable manner. @@ -61,9 +63,175 @@ records such that In this storage, index updation is a no-op, since the bloom filters are already written as a part of committing data. +In the case of Copy-On-Write, a single parquet file constitutes one `file slice` which contains one complete version of +the file + +{% include image.html file="hoodie_log_format_v2.png" alt="hoodie_log_format_v2.png" max-width="1000" %} + #### Merge On Read -Work in Progress .. .. .. .. .. +In the Merge-On-Read storage model, there are 2 logical components - one for ingesting data (both inserts/updates) into the dataset + and another for creating compacted views. The former is hereby referred to as `Writer` while the later + is referred as `Compactor`. + +##### Merge On Read Writer + + At a high level, Merge-On-Read Writer goes through same stages as Copy-On-Write writer in ingesting data. + The key difference here is that updates are appended to latest log (delta) file belonging to the latest file slice + without merging. For inserts, Hudi supports 2 modes: + + 1. Inserts to Log Files - This is done for datasets that have an indexable log files (for eg global index) + 2. Inserts to parquet files - This is done for datasets that do not have indexable log files, for eg bloom index + embedded in parquer files. Hudi treats writing new records in the same way as inserting to Copy-On-Write files. + +As in the case of Copy-On-Write, the input tagged records are partitioned such that all upserts destined to +a `file id` are grouped together. This upsert-batch is written as one or more log-blocks written to log-files. +Hudi allows clients to control log file sizes (See [Storage Configs](../configurations)) + +The WriteClient API is same for both Copy-On-Write and Merge-On-Read writers. + +With Merge-On-Read, several rounds of data-writes would have resulted in accumulation of one or more log-files. +All these log-files along with base-parquet (if exists) constitute a `file slice` which represents one complete version +of the file. + +#### Compactor + +Realtime Readers will perform in-situ merge of these delta log-files to provide the most recent (committed) view of +the dataset. To keep the query-performance in check and eventually achieve read-optimized performance, Hudi supports +compacting these log-files asynchronously to create read-optimized views. + +Asynchronous Compaction involves 2 steps: + + * `Compaction Schedule` : Hudi Write Client exposes API to create Compaction plans which contains the list of `file slice` + to be compacted atomically in a single compaction commit. Hudi allows pluggable strategies for choosing + file slices for each compaction runs. This step is typically done inline by Writer process as Hudi expects + only one schedule is being generated at a time which allows Hudi to enforce the constraint that pending compaction + plans do not step on each other file-slices. This constraint allows for multiple concurrent `Compactors` to run at + the same time. Some of the common strategies used for choosing `file slice` for compaction are: + * BoundedIO - Limit the number of file slices chosen for a compaction plan by expected total IO (read + write) + needed to complete compaction run + * Log File Size - Prefer file-slices with larger amounts of delta log data to be merged + * Day Based - Prefer file slice belonging to latest day partitions + ``` + API for scheduling compaction + /** + * Schedules a new compaction instant + * @param extraMetadata + * @return Compaction Instant timestamp if a new compaction plan is scheduled + */ + Optional scheduleCompaction(Optional> extraMetadata) throws IOException; + ``` + * `Compactor` : Hudi provides a separate API in Write Client to execute a compaction plan. The compaction + plan (just like a commit) is identified by a timestamp. Most of the design and implementation complexities for Async + Compaction is for guaranteeing snapshot isolation to readers and writer when + multiple concurrent compactors are running. Typical compactor deployment involves launching a separate + spark application which executes pending compactions when they become available. The core logic of compacting + file slices in the Compactor is very similar to that of merging updates in a Copy-On-Write table. The only + difference being in the case of compaction, there is an additional step of merging the records in delta log-files. + + Here are the main API to lookup and execute a compaction plan. + ``` + Main API in HoodieWriteClient for running Compaction: + /** + * Performs Compaction corresponding to instant-time + * @param compactionInstantTime Compaction Instant Time + * @return + * @throws IOException + */ + public JavaRDD compact(String compactionInstantTime) throws IOException; + + To lookup all pending compactions, use the API defined in HoodieReadClient + + /** + * Return all pending compactions with instant time for clients to decide what to compact next. + * @return + */ + public List> getPendingCompactions(); + ``` + +Refer to __hoodie-client/src/test/java/HoodieClientExample.java__ class for an example of how compaction +is scheduled and executed. + +##### Deployment Models + +These are typical Hoodie Writer and Compaction deployment models + + * `Inline Compaction` : At each round, a single spark application ingests new batch to dataset. It then optionally decides to schedule + a compaction run and executes it in sequence. + * `Single Dedicated Async Compactor` : The Spark application which brings in new changes to dataset (writer) periodically + schedules compaction. The Writer application does not run compaction inline. A separate spark applications periodically + probes for pending compaction and executes the compaction. + * ` Multi Async Compactors` : This mode is similar to `Single Dedicated Async Compactor` mode. The main difference being + now there can be more than one spark application picking different compactions and executing them in parallel. + In order to ensure compactors do not step on each other, they use coordination service like zookeeper to pickup unique + pending compaction instants and run them. + +The Compaction process requires one executor per file-slice in the compaction plan. So, the best resource allocation +strategy (both in terms of speed and resource usage) for clusters supporting dynamic allocation is to lookup the compaction +plan to be run to figure out the number of file slices being compacted and choose that many number of executors. + +## Async Compaction Design Deep-Dive (Optional) + +For the purpose of this section, it is important to distinguish between 2 types of commits as pertaining to the file-group: + +A commit which generates a merged and read-optimized file-slice is called `snapshot commit` (SC) with respect to that file-group. +A commit which merely appended the new/updated records assigned to the file-group into a new log block is called `delta commit` (DC) +with respect to that file-group. + +### Algorithm + +The algorithm is described with an illustration. Let us assume a scenario where there are commits SC1, DC2, DC3 that have +already completed on a data-set. Commit DC4 is currently ongoing with the writer (ingestion) process using it to upsert data. +Let us also imagine there are a set of file-groups (FG1 … FGn) in the data-set whose latest version (`File-Slice`) +contains the base file created by commit SC1 (snapshot-commit in columnar format) and a log file containing row-based +log blocks of 2 delta-commits (DC2 and DC3). + +{% include image.html file="async_compac_1.png" alt="async_compac_1.png" max-width="1000" %} + + * Writer (Ingestion) that is going to commit "DC4" starts. The record updates in this batch are grouped by file-groups + and appended in row formats to the corresponding log file as delta commit. Let us imagine a subset of file-groups has + this new log block (delta commit) DC4 added. + * Before the writer job completes, it runs the compaction strategy to decide which file-group to compact by compactor + and creates a new compaction-request commit SC5. This commit file is marked as “requested” with metadata denoting + which fileIds to compact (based on selection policy). Writer completes without running compaction (will be run async). + + {% include image.html file="async_compac_2.png" alt="async_compac_2.png" max-width="1000" %} + + * Writer job runs again ingesting next batch. It starts with commit DC6. It reads the earliest inflight compaction + request marker commit in timeline order and collects the (fileId, Compaction Commit Id “CcId” ) pairs from meta-data. + Ingestion DC6 ensures a new file-slice with base-commit “CcId” gets allocated for the file-group. + The Writer will simply append records in row-format to the first log-file (as delta-commit) assuming the + base-file (“Phantom-Base-File”) will be created eventually by the compactor. + + {% include image.html file="async_compac_3.png" alt="async_compac_3.png" max-width="1000" %} + + * Compactor runs at some time and commits at “Tc” (concurrently or before/after Ingestion DC6). It reads the commit-timeline + and finds the first unprocessed compaction request marker commit. Compactor reads the commit’s metadata finding the + file-slices to be compacted. It compacts the file-slice and creates the missing base-file (“Phantom-Base-File”) + with “CCId” as the commit-timestamp. Compactor then marks the compaction commit timestamp as completed. + It is important to realize that at data-set level, there could be different file-groups requesting compaction at + different commit timestamps. + + {% include image.html file="async_compac_4.png" alt="async_compac_4.png" max-width="1000" %} + + * Near Real-time reader interested in getting the latest snapshot will have 2 cases. Let us assume that the + incremental ingestion (writer at DC6) happened before the compaction (some time “Tc”’). + The below description is with regards to compaction from file-group perspective. + * `Reader querying at time between ingestion completion time for DC6 and compaction finish “Tc”`: + Hoodie’s implementation will be changed to become aware of file-groups currently waiting for compaction and + merge log-files corresponding to DC2-DC6 with the base-file corresponding to SC1. In essence, Hoodie will create + a pseudo file-slice by combining the 2 file-slices starting at base-commits SC1 and SC5 to one. + For file-groups not waiting for compaction, the reader behavior is essentially the same - read latest file-slice + and merge on the fly. + * `Reader querying at time after compaction finished (> “Tc”)` : In this case, reader will not find any pending + compactions in the timeline and will simply have the current behavior of reading the latest file-slice and + merging on-the-fly. + + * Read-Optimized View readers will query against the latest columnar base-file for each file-groups. + +The above algorithm explains Async compaction w.r.t a single compaction run on a single file-group. It is important +to note that multiple compaction plans can be run concurrently as they are essentially operating on different +file-groups. ## Performance