diff --git a/docs/_includes/footer.html b/docs/_includes/footer.html index 1682c9617..9847670a3 100755 --- a/docs/_includes/footer.html +++ b/docs/_includes/footer.html @@ -3,7 +3,7 @@ diff --git a/docs/admin_guide.md b/docs/admin_guide.md index 5143aa227..950a0b7f5 100644 --- a/docs/admin_guide.md +++ b/docs/admin_guide.md @@ -3,49 +3,223 @@ title: Admin Guide keywords: admin sidebar: mydoc_sidebar permalink: admin_guide.html +toc: false +summary: This section offers an overview of tools available to operate an ecosystem of Hoodie datasets --- -## Hoodie Admin CLI -### Launching Command Line +Admins/ops can gain visibility into Hoodie datasets/pipelines in the following ways - + - Administering via the Admin CLI + - Graphite metrics + - Spark UI of the Hoodie Application -* mvn clean install in hoodie-cli -* ./hoodie-cli +This section provides a glimpse into each of these, with some general guidance on troubleshooting + +## Admin CLI + +Once hoodie has been built via `mvn clean install -DskipTests`, the shell can be fired by via `cd hoodie-cli && ./hoodie-cli.sh`. +A hoodie dataset resides on HDFS, in a location referred to as the **basePath** and we would need this location in order to connect to a Hoodie dataset. +Hoodie library effectively manages this HDFS dataset internally, using .hoodie subfolder to track all metadata + +Following is a sample command to connect to a Hoodie dataset contains uber trips. -If all is good you should get a command prompt similar to this one ``` -prasanna@:~/hoodie/hoodie-cli$ ./hoodie-cli.sh -16/07/13 21:27:47 INFO xml.XmlBeanDefinitionReader: Loading XML bean definitions from URL [jar:file:/home/prasanna/hoodie/hoodie-cli/target/hoodie-cli-0.1-SNAPSHOT.jar!/META-INF/spring/spring-shell-plugin.xml] -16/07/13 21:27:47 INFO support.GenericApplicationContext: Refreshing org.springframework.context.support.GenericApplicationContext@372688e8: startup date [Wed Jul 13 21:27:47 UTC 2016]; root of context hierarchy -16/07/13 21:27:47 INFO annotation.AutowiredAnnotationBeanPostProcessor: JSR-330 'javax.inject.Inject' annotation found and supported for autowiring -============================================ -* \* -* _ _ _ _ \* -* | | | | | (_) * -* | |__| | ___ ___ __| |_ ___ * -* | __ |/ _ \ / _ \ / _` | |/ _ \ * -* | | | | (_) | (_) | (_| | | __/ * -* |_| |_|\___/ \___/ \__,_|_|\___| * -* * -============================================ +hoodie:trips->connect --path /app/uber/trips -Welcome to Hoodie CLI. Please type help if you are looking for help. -hoodie-> +16/10/05 23:20:37 INFO model.HoodieTableMetadata: Attempting to load the commits under /app/uber/trips/.hoodie with suffix .commit +16/10/05 23:20:37 INFO model.HoodieTableMetadata: Attempting to load the commits under /app/uber/trips/.hoodie with suffix .inflight +16/10/05 23:20:37 INFO model.HoodieTableMetadata: All commits :HoodieCommits{commitList=[20161002045850, 20161002052915, 20161002055918, 20161002065317, 20161002075932, 20161002082904, 20161002085949, 20161002092936, 20161002105903, 20161002112938, 20161002123005, 20161002133002, 20161002155940, 20161002165924, 20161002172907, 20161002175905, 20161002190016, 20161002192954, 20161002195925, 20161002205935, 20161002215928, 20161002222938, 20161002225915, 20161002232906, 20161003003028, 20161003005958, 20161003012936, 20161003022924, 20161003025859, 20161003032854, 20161003042930, 20161003052911, 20161003055907, 20161003062946, 20161003065927, 20161003075924, 20161003082926, 20161003085925, 20161003092909, 20161003100010, 20161003102913, 20161003105850, 20161003112910, 20161003115851, 20161003122929, 20161003132931, 20161003142952, 20161003145856, 20161003152953, 20161003155912, 20161003162922, 20161003165852, 20161003172923, 20161003175923, 20161003195931, 20161003210118, 20161003212919, 20161003215928, 20161003223000, 20161003225858, 20161004003042, 20161004011345, 20161004015235, 20161004022234, 20161004063001, 20161004072402, 20161004074436, 20161004080224, 20161004082928, 20161004085857, 20161004105922, 20161004122927, 20161004142929, 20161004163026, 20161004175925, 20161004194411, 20161004203202, 20161004211210, 20161004214115, 20161004220437, 20161004223020, 20161004225321, 20161004231431, 20161004233643, 20161005010227, 20161005015927, 20161005022911, 20161005032958, 20161005035939, 20161005052904, 20161005070028, 20161005074429, 20161005081318, 20161005083455, 20161005085921, 20161005092901, 20161005095936, 20161005120158, 20161005123418, 20161005125911, 20161005133107, 20161005155908, 20161005163517, 20161005165855, 20161005180127, 20161005184226, 20161005191051, 20161005193234, 20161005203112, 20161005205920, 20161005212949, 20161005223034, 20161005225920]} +Metadata for table trips loaded +hoodie:trips-> ``` -### Commands - - * connect --path [dataset_path] : Connect to the specific dataset by its path - * commits show : Show all details about the commits - * commits refresh : Refresh the commits from HDFS - * commit rollback --commit [commitTime] : Rollback a commit - * commit showfiles --commit [commitTime] : Show details of a commit (lists all the files modified along with other metrics) - * commit showpartitions --commit [commitTime] : Show details of a commit (lists statistics aggregated at partition level) - - * commits compare --path [otherBasePath] : Compares the current dataset commits with the path provided and tells you how many commits behind or ahead - * stats wa : Calculate commit level and overall write amplification factor (total records written / total records upserted) - * help +Once connected to the dataset, a lot of other commands become available. The shell has contextual autocomplete help (press TAB) and below is a list of all commands, few of which are reviewed in this section +are reviewed +``` +hoodie:trips->help +* ! - Allows execution of operating system (OS) commands +* // - Inline comment markers (start of line only) +* ; - Inline comment markers (start of line only) +* addpartitionmeta - Add partition metadata to a dataset, if not present +* clear - Clears the console +* cls - Clears the console +* commit rollback - Rollback a commit +* commits compare - Compare commits with another Hoodie dataset +* commit showfiles - Show file level details of a commit +* commit showpartitions - Show partition level details of a commit +* commits refresh - Refresh the commits +* commits show - Show the commits +* commits sync - Compare commits with another Hoodie dataset +* connect - Connect to a hoodie dataset +* date - Displays the local date and time +* exit - Exits the shell +* help - List all commands usage +* quit - Exits the shell +* records deduplicate - De-duplicate a partition path contains duplicates & produce repaired files to replace with +* script - Parses the specified resource file and executes its commands +* stats filesizes - File Sizes. Display summary stats on sizes of files +* stats wa - Write Amplification. Ratio of how many records were upserted to how many records were actually written +* sync validate - Validate the sync by counting the number of records +* system properties - Shows the shell's properties +* utils loadClass - Load a class +* version - Displays shell version +hoodie:trips-> +``` + + +#### Inspecting Commits + +The task of upserting or inserting a batch of incoming records is known as a **commit** in Hoodie. A commit provides basic atomicity guarantees such that only commited data is available for querying. +Each commit has a monotonically increasing string/number called the **commit number**. Typically, this is the time at which we started the commit. + +To view some basic information about the last 10 commits, + + +``` +hoodie:trips->commits show + ________________________________________________________________________________________________________________________________________________________________________ + | CommitTime | Total Written (B)| Total Files Added| Total Files Updated| Total Partitions Written| Total Records Written| Total Update Records Written| Total Errors| + |=======================================================================================================================================================================| + .... + .... + .... +hoodie:trips-> + +``` + +At the start of each write, Hoodie also writes a .inflight commit to the .hoodie folder. You can use the timestamp there to estimate how long the commit has been inflight + + +``` +$ hdfs dfs -ls /app/uber/trips/.hoodie/*.inflight +-rw-r--r-- 3 vinoth supergroup 321984 2016-10-05 23:18 /app/uber/trips/.hoodie/20161005225920.inflight +``` + + +#### Drilling Down to a specific Commit + +To understand how the writes spread across specific partiions, + + +``` +hoodie:trips->commit showpartitions --commit 20161005165855 + __________________________________________________________________________________________________________________________________________ + | Partition Path| Total Files Added| Total Files Updated| Total Records Inserted| Total Records Updated| Total Bytes Written| Total Errors| + |=========================================================================================================================================| + .... + .... +``` + +If you need file level granularity , we can do the following + + +``` +hoodie:trips->commit showfiles --commit 20161005165855 + ________________________________________________________________________________________________________________________________________________________ + | Partition Path| File ID | Previous Commit| Total Records Updated| Total Records Written| Total Bytes Written| Total Errors| + |=======================================================================================================================================================| + .... + .... +``` + +#### Statistics + +Since Hoodie directly manages file sizes for HDFS dataset, it might be good to get an overall picture + + +``` +hoodie:trips->stats filesizes --partitionPath 2016/09/01 + ________________________________________________________________________________________________ + | CommitTime | Min | 10th | 50th | avg | 95th | Max | NumFiles| StdDev | + |===============================================================================================| + | 20161004211210| 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 93.9 MB | 2 | 2.3 KB | + .... + .... +``` + +In case of Hoodie write taking much longer, it might be good to see the write amplification for any sudden increases + + +``` +hoodie:trips->stats wa + __________________________________________________________________________ + | CommitTime | Total Upserted| Total Written| Write Amplifiation Factor| + |=========================================================================| + .... + .... +``` + + +#### Archived Commits + +In order to limit the amount of growth of .commit files on HDFS, Hoodie archives older .commit files (with due respect to the cleaner policy) into a commits.archived file. +This is a sequence file that contains a mapping from commitNumber => json with raw information about the commit (same that is nicely rolled up above). + +## Metrics + +Once the Hoodie Client is configured with the right datasetname and environment for metrics, it produces the following graphite metrics, that aid in debugging hoodie datasets + + - **Commit Duration** - This is amount of time it took to successfully commit a batch of records + - **Rollback Duration** - Similarly, amount of time taken to undo partial data left over by a failed commit (happens everytime automatically after a failing write) + - **File Level metrics** - Shows the amount of new files added, versions, deleted (cleaned) in each commit + - **Record Level Metrics** - Total records inserted/updated etc per commit + - **Partition Level metrics** - number of partitions upserted (super useful to understand sudden spikes in commit duration) + +These metrics can then be plotted on a standard tool like grafana. Below is a sample commit duration chart. + +{% include image.html file="hoodie_commit_duration.png" alt="hoodie_commit_duration.png" max-width="1000" %} + + +## Troubleshooting Failures + +Section below generally aids in debugging Hoodie failures. Off the bat, the following metadata is added to every record to help triage issues easily using standard Hadoop SQL engines (Hive/Presto/Spark) + + - **_hoodie_record_key** - Treated as a primary key within each HDFS partition, basis of all updates/inserts + - **_hoodie_commit_time** - Last commit that touched this record + - **_hoodie_file_name** - Actual file name containing the record (super useful to triage duplicates) + - **_hoodie_partition_path** - Path from basePath that identifies the partition containing this record + +{% include callout.html content="Note that as of now, Hoodie assumes the application passes in the same deterministic partitionpath for a given recordKey. i.e the uniqueness of record key is only enforced within each partition" type="warning" %} + + +#### Missing records + +Please check if there were any write errors using the admin commands above, during the window at which the record could have been written. +If you do find errors, then the record was not actually written by Hoodie, but handed back to the application to decide what to do with it. + +#### Duplicates + +First of all, please confirm if you do indeed have duplicates **AFTER** ensuring the query is accessing the Hoodie datasets [properly](sql_queries.html) . + + - If confirmed, please use the metadata fields above, to identify the physical files & partition files containing the records . + - If duplicates span files across partitionpath, then this means your application is generating different partitionPaths for same recordKey, Please fix your app + - if duplicates span multiple files within the same partitionpath, please engage with mailing list. This should not happen. You can use the `records deduplicate` command to fix your data. + +#### Spark failures + +Typical upsert() DAG looks like below. Note that Hoodie client also caches intermediate RDDs to intelligently profile workload and size files and spark parallelism. +Also Spark UI shows sortByKey twice due to the probe job also being shown, nonetheless its just a single sort. + + +{% include image.html file="hoodie_upsert_dag.png" alt="hoodie_upsert_dag.png" max-width="1000" %} + + +At a high level, there are two steps + +**Index Lookup to identify files to be changed** + + - Job 1 : Triggers the input data read, converts to HoodieRecord object and then stops at obtaining a spread of input records to target partition paths + - Job 2 : Load the set of file names which we need check against + - Job 3 & 4 : Actual lookup after smart sizing of spark join parallelism, by joining RDDs in 1 & 2 above + - Job 5 : Have a tagged RDD of recordKeys with locations + +**Performing the actual writing of data** + + - Job 6 : Lazy join of incoming records against recordKey, location to provide a final set of HoodieRecord which now contain the information about which file/partitionpath they are found at (or null if insert). Then also profile the workload again to determine sizing of files + - Job 7 : Actual writing of data (update + insert + insert turned to updates to maintain file size) + +Depending on the exception source (Hoodie/Spark), the above knowledge of the DAG can be used to pinpoint the actual issue. The most often encountered failures result from YARN/HDFS temporary failures. +In the future, a more sophisticated debug/management UI would be added to the project, that can help automate some of this debugging. diff --git a/docs/community.md b/docs/community.md index 83502964c..6238dcb39 100644 --- a/docs/community.md +++ b/docs/community.md @@ -2,6 +2,7 @@ title: Community keywords: usecases sidebar: mydoc_sidebar +toc: false permalink: community.html --- diff --git a/docs/images/hoodie_commit_duration.png b/docs/images/hoodie_commit_duration.png new file mode 100644 index 000000000..2445dcbed Binary files /dev/null and b/docs/images/hoodie_commit_duration.png differ diff --git a/docs/images/hoodie_upsert_dag.png b/docs/images/hoodie_upsert_dag.png new file mode 100644 index 000000000..474050afb Binary files /dev/null and b/docs/images/hoodie_upsert_dag.png differ diff --git a/docs/incremental_processing.md b/docs/incremental_processing.md index 0764058b4..ec90056b8 100644 --- a/docs/incremental_processing.md +++ b/docs/incremental_processing.md @@ -3,8 +3,88 @@ title: Incremental Processing keywords: incremental processing sidebar: mydoc_sidebar permalink: incremental_processing.html +toc: false +summary: In this page, we will discuss incremental processing primitives that Hoodie has to offer. --- -Work In Progress +As discussed in the concepts section, the two basic primitives needed for [incrementally processing](https://www.oreilly.com/ideas/ubers-case-for-incremental-processing-on-hadoop), +data using Hoodie are `upserts` (to apply changes to a dataset) and `incremental pulls` (to obtain a change stream/log from a dataset). This section +discusses a few tools that can be used to achieve these on different contexts. + +{% include callout.html content="Instructions are currently only for Copy-on-write storage. When merge-on-read storage is added, these tools would be revised to add that support" type="info" %} +## Upserts + +Upserts can be used to apply a delta or an incremental change to a Hoodie dataset. For e.g, the incremental changes could be from a Kafka topic or files uploaded to HDFS or +even changes pulled from another Hoodie dataset. The `HoodieDeltaStreamer` utility provides the way to achieve all of these, by using the capabilities of `HoodieWriteClient`. + +{% include callout.html content="Get involved in rewriting this tool [here](https://github.com/uber/hoodie/issues/20)" type="info" %} + +The tool is a spark job (part of hoodie-utilities), that provides the following functionality + + - Ability to consume new events from Kafka, incremental imports from Sqoop or output of `HiveIncrementalPuller` or files under a folder on HDFS + - Support json, avro or a custom payload types for the incoming data + - New data is written to a Hoodie dataset, with support for checkpointing & schemas and registered onto Hive + + +## Incremental Pull + +Hoodie datasets can be pulled incrementally, which means you can get ALL and ONLY the updated & new rows since a specified commit timestamp. +This, together with upserts, are particularly useful for building data pipelines where 1 or more source hoodie tables are incrementally pulled (streams/facts), +joined with other tables (datasets/dimensions), to produce deltas to a target hoodie dataset. Then, using the delta streamer tool these deltas can be upserted into the +target hoodie dataset to complete the pipeline. + +#### Pulling through Hive + +`HiveIncrementalPuller` allows the above to be done via HiveQL, combining the benefits of Hive (reliably process complex SQL queries) and incremental primitives +(speed up query by pulling tables incrementally instead of scanning fully). The tool uses Hive JDBC to run the Hive query saving its results in a temp table. +that can later be upserted. Upsert utility (`HoodieDeltaStreamer`) has all the state it needs from the directory structure to know what should be the commit time on the target table. +e.g: `/app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}`.The Delta Hive table registered will be of the form `{tmpdb}.{source_table}_{last_commit_included}`. + +The following are the configuration options for HiveIncrementalPuller + +| **Config** | **Description** | **Default** | +|hiveUrl| Hive Server 2 URL to connect to | jdbc:hive2://hadoophiveserver.com:10000/;transportMode=http;httpPath=hs2 | +|hiveUser| Hive Server 2 Username | | +|hivePass| Hive Server 2 Password | | +|queue| YARN Queue name | | +|tmp| Directory where the temporary delta data is stored in HDFS. The directory structure will follow conventions. Please see the below section. | /app/incremental-hql/intermediate | +|extractSQLFile| The SQL to execute on the source table to extract the data. The data extracted will be all the rows that changed since a particular point in time. | | +|sourceTable| Source Table Name. Needed to set hive environment properties. | | +|targetTable| Target Table Name. Needed for the intermediate storage directory structure. | | +|sourceDataPath| Source HDFS Base Path. This is where the hoodie metadata will be read. | | +|targetDataPath| Target HDFS Base path. This is needed to compute the fromCommitTime. This is not needed if fromCommitTime is specified explicitly. | | +|tmpdb| The database to which the intermediate temp delta table will be created | hoodie_temp | +|fromCommitTime| This is the most important parameter. This is the point in time from which the changed records are pulled from. | | +|maxCommits| Number of commits to include in the pull. Setting this to -1 will include all the commits from fromCommitTime. Setting this to a value > 0, will include records that ONLY changed in the specified number of commits after fromCommitTime. This may be needed if you need to catch up say 2 commits at a time. | 3 | +|help| Utility Help | | + + +Setting the fromCommitTime=0 and maxCommits=-1 will pull in the entire source dataset and can be used to initiate backfills. If the target dataset is a hoodie dataset, +then the utility can determine if the target dataset has no commits or is behind more than 24 hour (this is configurable), +it will automatically use the backfill configuration, since applying the last 24 hours incrementally could take more time than doing a backfill. The current limitation of the tool +is the lack of support for self-joining the same table in mixed mode (normal and incremental modes). + + +#### Pulling through Spark + +`HoodieReadClient` (inside hoodie-client) offers a more elegant way to pull data from Hoodie dataset (plus more) and process it via Spark. +This class can be used within existing Spark jobs and offers the following functionality. + +| **API** | **Description** | +| listCommitsSince(),latestCommit() | Obtain commit times to pull data from | +| readSince(commitTime),readCommit(commitTime) | Provide the data from the commit time as a DataFrame, to process further on | +| read(keys) | Read out the data corresponding to the keys as a DataFrame, using Hoodie's own index for faster lookup | +| read(paths) | Read out the data under specified path, with the functionality of HoodieInputFormat. An alternative way to do SparkSQL on Hoodie datasets | +| filterExists() | Filter out already existing records from the provided RDD[HoodieRecord]. Useful for de-duplication | +| checkExists(keys) | Check if the provided keys exist in a Hoodie dataset | + + +## SQL Streamer + +work in progress, tool being refactored out into open source Hoodie + + +{% include callout.html content="Get involved in building this tool [here](https://github.com/uber/hoodie/issues/20)" type="info" %} + diff --git a/docs/sql_queries.md b/docs/sql_queries.md index 43d4eef46..45ebb4cf8 100644 --- a/docs/sql_queries.md +++ b/docs/sql_queries.md @@ -9,7 +9,7 @@ summary: In this page, we go over how to enable SQL queries on Hoodie built tabl Hoodie registers the dataset into the Hive metastore backed by `HoodieInputFormat`. This makes the data accessible to Hive & Spark & Presto automatically. To be able to perform normal SQL queries on such a dataset, we need to get the individual query engines -to call `HoodieInputFormat.getSplits()`, during query planning. +to call `HoodieInputFormat.getSplits()`, during query planning such that the right versions of files are exposed to it. In the following sections, we cover the configs needed across different query engines to achieve this.