From c7a8e15c7880c09edf3d7e4ab0882dc5209029cd Mon Sep 17 00:00:00 2001 From: vinoth chandar Date: Fri, 17 Feb 2017 08:25:17 -0800 Subject: [PATCH] Docs for impl & comparison (#79) * Initial version of comparison, implementation * Finished doc for comparison to other systems --- docs/comparison.md | 48 +++++++++++++++++++++++- docs/implementation.md | 84 ++++++++++++++++++++++++++++++++++++++++-- docs/powered_by.md | 3 +- 3 files changed, 129 insertions(+), 6 deletions(-) diff --git a/docs/comparison.md b/docs/comparison.md index 0898417d0..b5aa0d1d0 100644 --- a/docs/comparison.md +++ b/docs/comparison.md @@ -3,13 +3,59 @@ title: Comparison keywords: usecases sidebar: mydoc_sidebar permalink: comparison.html +toc: false --- -Work In Progress +Hoodie fills a big void for processing data on top of HDFS, and thus mostly co-exists nicely with these technologies. However, +it would be useful to understand how Hoodie fits into the current big data ecosystem, contrasting it with a few related systems +and bring out the different tradeoffs these systems have accepted in their design. ## Kudu +[Apache Kudu](https://kudu.apache.org) is a storage system that has similar goals as Hoodie, which is to bring real-time analytics on petabytes of data via first +class support for `upserts`. A key differentiator is that Kudu also attempts to serve as a datastore for OLTP workloads, something that Hoodie does not aspire to be. +Consequently, Kudu does not support incremental pulling (as of early 2017), something Hoodie does to enable incremental processing use cases. + + +Kudu diverges from a distributed file system abstraction and HDFS altogether, with its own set of storage servers talking to each other via RAFT. +Hoodie, on the other hand, is designed to work with an underlying Hadoop compatible filesystem (HDFS,S3 or Ceph) and does not have its own fleet of storage servers, +instead relying on Apache Spark to do the heavy-lifting. Thu, Hoodie can be scaled easily, just like other Spark jobs, while Kudu would require hardware +& operational support, typical to datastores like HBase or Vertica. We have not at this point, done any head to head benchmarks against Kudu (given RTTable is WIP). +But, if we were to go with results shared by [CERN](https://db-blog.web.cern.ch/blog/zbigniew-baranowski/2017-01-performance-comparison-different-file-formats-and-storage-engines) , +we expect Hoodie to positioned at something that ingests parquet with superior performance. + + ## Hive Transactions +[Hive Transactions/ACID](https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions) is another similar effort, which tries to implement storage like +`merge-on-read`, on top of ORC file format. Understandably, this feature is heavily tied to Hive and other efforts like [LLAP](https://cwiki.apache.org/confluence/display/Hive/LLAP). +Hive transactions does not offer the read-optimized storage option or the incremental pulling, that Hoodie does. In terms of implementation choices, Hoodie leverages +the full power of a processing framework like Spark, while Hive transactions feature is implemented underneath by Hive tasks/queries kicked off by user or the Hive metastore. +Based on our production experience, embedding Hoodie as a library into existing Spark pipelines was much easier and less operationally heavy, compared with the other approach. +Hoodie is also designed to work with non-hive enginers like Presto/Spark and will incorporate file formats other than parquet over time. + ## HBase +Even though [HBase](https://hbase.apache.org) is ultimately a key-value store for OLTP workloads, users often tend to associate HBase with analytics given the proximity to Hadoop. +Given HBase is heavily write-optimized, it supports sub-second upserts out-of-box and Hive-on-HBase lets users query that data. However, in terms of actual performance for analytical workloads, +hybrid columnar storage formats like Parquet/ORC handily beat HBase, since these workloads are predominantly read-heavy. Hoodie bridges this gap between faster data and having +analytical storage formats. From an operational perspective, arming users with a library that provides faster data, is more scalable, than managing a big farm of HBase region servers, +just for analytics. Finally, HBase does not support incremental processing primitives like `commit times`, `incremental pull` as first class citizens like Hoodie. + +## Stream Processing + +A popular question, we get is : "How does Hoodie relate to stream processing systems?", which we will try to answer here. Simply put, Hoodie can integrate with +batch (`copy-on-write storage`) and streaming (`merge-on-read storage`) jobs of today, to store the computed results in Hadoop. For Spark apps, this can happen via direct +integration of Hoodie library with Spark/Spark streaming DAGs. In case of Non-Spark processing systems (eg: Flink, Hive), the processing can be done in the respective systems +and later sent into a Hoodie table via a Kafka topic/HDFS intermediate file. (TODO: Need link to SQLStreamer/DeltaStreamer after reworking). In more conceptual level, data processing +pipelines just consist of three components : `source`, `processing`, `sink`, with users ultimately running queries against the sink to use the results of the pipeline. +Hoodie can act as either a source or sink, that stores data on HDFS. Applicability of Hoodie to a given stream processing pipeline ultimately boils down to suitability +of Presto/SparkSQL/Hive for your queries. + +More advanced use cases revolve around the concepts of [incremental processing](https://www.oreilly.com/ideas/ubers-case-for-incremental-processing-on-hadoop), which effectively +uses Hoodie even inside the `processing` engine to speed up typical batch pipelines. For e.g: Hoodie can be used as a state store inside a processing DAG (similar +to how [rocksDB](https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_backends.html#the-rocksdbstatebackend) is used by Flink). This is an item on the roadmap +and will eventually happen as a [Beam Runner](https://github.com/uber/hoodie/issues/8) + + + diff --git a/docs/implementation.md b/docs/implementation.md index b1222e78b..fcbd39c2f 100644 --- a/docs/implementation.md +++ b/docs/implementation.md @@ -1,14 +1,90 @@ --- -title: Performance -keywords: performance +title: Implementation +keywords: implementation sidebar: mydoc_sidebar +toc: false permalink: implementation.html --- -Work In Progress +Hoodie is implemented as a Spark library, which makes it easy to integrate into existing data pipelines or ingestion +libraries (which we will refer to as `hoodie clients`). Hoodie Clients prepare an `RDD[HoodieRecord]` that contains the data to be upserted and +Hoodie upsert/insert is merely a Spark DAG, that can be broken into two big pieces. + + - **Indexing** : A big part of Hoodie's efficiency comes from indexing the mapping from record keys to the file ids, to which they belong to. + This index also helps the `HoodieWriteClient` separate upserted records into inserts and updates, so they can be treated differently. + `HoodieReadClient` supports operations such as `filterExists` (used for de-duplication of table) and an efficient batch `read(keys)` api, that + can read out the records corresponding to the keys using the index much quickly, than a typical scan via a query. The index is also atomically + updated each commit, and is also rolled back when commits are rolled back. + + - **Storage** : The storage part of the DAG is responsible for taking an `RDD[HoodieRecord]`, that has been tagged as + an insert or update via index lookup, and writing it out efficiently onto storage. ## Index -## Data Storage +Hoodie currently provides two choices for indexes : `BloomIndex` and `HBaseIndex` to map a record key into the file id to which it belongs to. This enables +us to speed up upserts significantly, without scanning over every record in the dataset. + +#### HBase Index + +Here, we just use HBase in a straightforward way to store the mapping above. The challenge with using HBase (or any external key-value store + for that matter) is performing rollback of a commit and handling partial index updates. + Since the HBase table is indexed by record key and not commit Time, we would have to scan all the entries which will be prohibitively expensive. + Insteead, we store the commit time with the value and discard its value if it does not belong to a valid commit. + +#### Bloom Index + +This index is built by adding bloom filters with a very high false positive tolerance (e.g: 1/10^9), to the parquet file footers. +The advantage of this index over HBase is the obvious removal of a big external dependency, and also nicer handling of rollbacks & partial updates +since the index is part of the data file itself. + +At runtime, checking the Bloom Index for a given set of record keys effectively ammonts to checking all the bloom filters within a given +partition, against the incoming records, using a Spark join. Much of the engineering effort towards the Bloom index has gone into scaling this join +by caching the incoming RDD[HoodieRecord] to be able and dynamically tuning join parallelism, to avoid hitting Spark limitations like 2GB maximum +for partition size. As a result, Bloom Index implementation has been able to handle single upserts upto 5TB, in a reliable manner. + + +## Storage + +The implementation specifics of the two storage types, introduced in [concepts](concepts.html) section, are detailed below. + + +#### Copy On Write + +The Spark DAG for this storage, is relatively simpler. The key goal here is to group the tagged hoodie record RDD, into a series of +updates and inserts, by using a partitioner. To achieve the goals of maintaining file sizes, we first sample the input to obtain a `workload profile` +that understands the spread of inserts vs updates, their distribution among the partitions etc. With this information, we bin-pack the +records such that + + - For updates, the latest version of the that file id, is rewritten once, with new values for all records that have changed + - For inserts, the records are first packed onto the smallest file in each partition path, until it reaches the configured maximum size. + Any remaining records after that, are again packed into new file id groups, again meeting the size requirements. + +In this storage, index updation is a no-op, since the bloom filters are already written as a part of committing data. + +#### Merge On Read + +Work in Progress .. .. .. .. .. ## Performance + +In this section, we go over some real world performance numbers for Hoodie upserts, incremental pull and compare them against +the conventional alternatives for achieving these tasks. + +#### Upsert vs Bulk Loading + + + + + +#### Incremental Scan vs Full Scan + + + + + +#### Scalability of Upserts + + +#### Copy On Write Regular Query Performance + + diff --git a/docs/powered_by.md b/docs/powered_by.md index f4e8e7d2d..6164cabdd 100644 --- a/docs/powered_by.md +++ b/docs/powered_by.md @@ -8,4 +8,5 @@ permalink: powered_by.html ## Uber Hoodie was originally developed at [Uber](https://uber.com), to achieve [low latency database ingestion, with high efficiency](http://www.slideshare.net/vinothchandar/hadoop-strata-talk-uber-your-hadoop-has-arrived/32). -It has been in production since Aug 2016, powering highly business critical (7/10 most used including trips,riders,partners totalling 100s of TBs) tables on Hadoop. It also powers several incremental Hive ETL pipelines and being currently integrated into Uber's data dispersal system. +It has been in production since Aug 2016, powering ~100 highly business critical tables on Hadoop, worth 100s of TBs(including top 10 including trips,riders,partners). +It also powers several incremental Hive ETL pipelines and being currently integrated into Uber's data dispersal system.