From 48aa026dc4887eaf78eacb8ceb428f8534c6d03b Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Tue, 25 Sep 2018 16:04:50 -0700 Subject: [PATCH] Adding documentation for migration guide and COW vs MOR tradeoffs, moving some docs around for more clarity --- docs/_data/sidebars/mydoc_sidebar.yml | 12 +++-- docs/concepts.md | 55 ++++++++++++++------- docs/configurations.md | 3 +- docs/incremental_processing.md | 2 +- docs/migration_guide.md | 71 +++++++++++++++++++++++++++ docs/powered_by.md | 6 +++ docs/quickstart.md | 26 ++++++---- 7 files changed, 143 insertions(+), 32 deletions(-) create mode 100644 docs/migration_guide.md diff --git a/docs/_data/sidebars/mydoc_sidebar.yml b/docs/_data/sidebars/mydoc_sidebar.yml index e790c982f..c4f6c46f2 100644 --- a/docs/_data/sidebars/mydoc_sidebar.yml +++ b/docs/_data/sidebars/mydoc_sidebar.yml @@ -35,18 +35,22 @@ entries: url: /concepts.html output: web - - title: Configurations - url: /configurations.html - output: web - - title: Implementation url: /implementation.html output: web + - title: Configurations + url: /configurations.html + output: web + - title: SQL Queries url: /sql_queries.html output: web + - title: Migration Guide + url: /migration_guide.html + output: web + - title: Incremental Processing url: /incremental_processing.html output: web diff --git a/docs/concepts.md b/docs/concepts.md index 4097efbb2..e757a8944 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -20,6 +20,8 @@ Such key activities include * `COMMITS` - A single commit captures information about an **atomic write** of a batch of records into a dataset. Commits are identified by a monotonically increasing timestamp, denoting the start of the write operation. * `CLEANS` - Background activity that gets rid of older versions of files in the dataset, that are no longer needed. + * `DELTA_COMMITS` - A single commit captures information about an **atomic write** of a batch of records into a + MergeOnRead storage type of dataset * `COMPACTIONS` - Background activity to reconcile differential data structures within Hoodie e.g: moving updates from row based log files to columnar formats. @@ -33,6 +35,24 @@ When there is late arriving data (data intended for 9:00 arriving >1 hr late at With the help of the timeline, an incremental query attempting to get all new data that was committed successfully since 10:00 hours, is able to very efficiently consume only the changed files without say scanning all the time buckets > 07:00. +## Terminologies + + * `Hudi Dataset` + A structured hive/spark dataset managed by Hudi. Hudi supports both partitioned and non-partitioned Hive tables. + * `Commit` + A commit marks a new batch of data applied to a dataset. Hudi maintains monotonically increasing timestamps to track commits and guarantees that a commit is atomically + published. + * `Commit Timeline` + Commit Timeline refers to the sequence of Commits that was applied in order on a dataset over its lifetime. + * `File Slice` + Hudi provides efficient handling of updates by having a fixed mapping between record key to a logical file Id. + Hudi uses MVCC to provide atomicity and isolation of readers from a writer. This means that a logical fileId will + have many physical versions of it. Each of these physical version of a file represents a complete view of the + file as of a commit and is called File Slice + * `File Group` + A file-group is a file-slice timeline. It is a list of file-slices in commit order. It is identified by `file id` + + ## Storage Types Hoodie storage types capture how data is indexed & laid out on the filesystem, and how the above primitives and timeline activities are implemented on top of @@ -62,23 +82,6 @@ Each record is uniquely identified by a `record key` and mapped to a file id for and file id, never changes once the first version of a record has been written to a file. In short, the `file id` identifies a group of files, that contain all versions of a group of records. -## Terminologies - - * `Hudi Dataset` - A structured hive/spark table managed by Hudi. Hudi supports both partitioned and non-partitioned Hive tables. - * `Commit` - A commit marks a new batch of data applied to a dataset. Hudi maintains monotonically increasing timestamps to track commits and guarantees that a commit is atomically - published. - * `Commit Timeline` - Commit Timeline refers to the sequence of Commits that was applied in order on a dataset over its lifetime. - * `File Slice` - Hudi provides efficient handling of updates by having a fixed mapping between record key to a logical file Id. - Hudi uses MVCC to provide atomicity and isolation of readers from a writer. This means that a logical fileId will - have many physical versions of it. Each of these physical version of a file represents a complete view of the - file as of a commit and is called File Slice - * `File Group` - A file-group is a file-slice timeline. It is a list of file-slices in commit order. It is identified by `file id` - ## Copy On Write @@ -136,3 +139,21 @@ There are lot of interesting things happening in this example, which bring out t The intention of merge on read storage, is to enable near real-time processing directly on top of Hadoop, as opposed to copying data out to specialized systems, which may not be able to handle the data volume. + +## Trade offs when choosing different storage types and views + +### Storage Types + +| Trade-off | CopyOnWrite | MergeOnRead | +|-------------- |------------------| ------------------| +| Data Latency | Higher | Lower | +| Update cost (I/O) | Higher (rewrite entire parquet) | Lower (append to delta file) | +| Parquet File Size | Smaller (high update(I/0) cost) | Larger (low update cost) | +| Write Amplification | Higher | Lower (depending on compaction strategy) | + +### Hudi Views + +| Trade-off | ReadOptimized | RealTime | +|-------------- |------------------| ------------------| +| Data Latency | Higher | Lower | +| Query Latency | Lower (raw columnar performance) | Higher (merge columnar + row based delta) | \ No newline at end of file diff --git a/docs/configurations.md b/docs/configurations.md index 74d360854..ec3caf9e9 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -160,7 +160,8 @@ summary: "Here we list all possible configurations and what they mean" Writing data via Hoodie happens as a Spark job and thus general rules of spark debugging applies here too. Below is a list of things to keep in mind, if you are looking to improving performance or reliability. - - **Right operations** : Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. Difference between them is that bulk insert uses a disk based write path to scale to load large inputs without need to cache it. + - **Write operations** : Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. + Difference between them is that bulk insert uses a disk based write path to scale to load large inputs without need to cache it. - **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs. We recommend having shuffle parallelism `hoodie.[insert|upsert|bulkinsert].shuffle.parallelism` such that its atleast input_data_size/500MB - **Off-heap memory** : Hoodie writes parquet files and that needs good amount of off-heap memory proportional to schema width. Consider setting something like `spark.yarn.executor.memoryOverhead` or `spark.yarn.driver.memoryOverhead`, if you are running into such failures. - **Spark Memory** : Typically, hoodie needs to be able to read a single file into memory to perform merges or compactions and thus the executor memory should be sufficient to accomodate this. In addition, Hoodie caches the input to be able to intelligently place data and thus leaving some `spark.storage.memoryFraction` will generally help boost performance. diff --git a/docs/incremental_processing.md b/docs/incremental_processing.md index 3ea9b82de..973875a0b 100644 --- a/docs/incremental_processing.md +++ b/docs/incremental_processing.md @@ -11,7 +11,7 @@ As discussed in the concepts section, the two basic primitives needed for [incre data using Hoodie are `upserts` (to apply changes to a dataset) and `incremental pulls` (to obtain a change stream/log from a dataset). This section discusses a few tools that can be used to achieve these on different contexts. -## Ingesting/Writing Data +## Incremental Ingestion Following means can be used to apply a delta or an incremental change to a Hoodie dataset. For e.g, the incremental changes could be from a Kafka topic or files uploaded to HDFS or even changes pulled from another Hoodie dataset. diff --git a/docs/migration_guide.md b/docs/migration_guide.md new file mode 100644 index 000000000..a5d5506c4 --- /dev/null +++ b/docs/migration_guide.md @@ -0,0 +1,71 @@ +--- +title: Migration Guide +keywords: migration guide +sidebar: mydoc_sidebar +permalink: migration_guide.html +toc: false +summary: In this page, we will discuss some available tools for migrating your existing dataset into a Hudi managed +dataset + + +Hudi maintains metadata such as commit timeline and indexes to manage a dataset. The commit timelines helps to understand the actions happening on a dataset as well as the current state of a dataset. Indexes are used by Hudi to maintain a record key to file id mapping to efficiently locate a record. At the moment, Hudi supports writing only parquet columnar formats. +To be able to start using Hudi for your existing dataset, you will need to migrate your existing dataset into a Hudi managed dataset. There are a couple of ways to achieve this. + + +## Approaches + + +### Approach 1 + +Hudi can be used to manage an existing dataset without affecting/altering the historical data already present in the +dataset. Hudi has been implemented to be compatible with such a mixed dataset with a caveat that either the complete +Hive partition is Hudi managed or not. Thus the lowest granularity at which Hudi manages a dataset is a Hive +partition. Start using the datasource API or the WriteClient to write to the dataset and make sure you start writing +to a new partition or convert your last N partitions into Hudi instead of the entire table. Note, since the historical + partitions are not managed by HUDI, none of the primitives provided by HUDI work on the data in those partitions. More concretely, one cannot perform upserts or incremental pull on such older partitions not managed by the HUDI dataset. +Take this approach if your dataset is an append only type of dataset and you do not expect to perform any updates to existing (or non Hudi managed) partitions. + + +### Approach 2 + +Import your existing dataset into a Hudi managed dataset. Since all the data is Hudi managed, none of the limitations + of Approach 1 apply here. Updates spanning any partitions can be applied to this dataset and Hudi will efficiently + make the update available to queries. Note that not only do you get to use all Hoodie primitives on this dataset, + there are other additional advantages of doing this. Hudi automatically manages file sizes of a Hudi managed dataset + . You can define the desired file size when converting this dataset and Hudi will ensure it writes out files + adhering to the config. It will also ensure that smaller files later get corrected by routing some new inserts into + small files rather than writing new small ones thus maintaining the health of your cluster. + +There are a few options when choosing this approach. +#### Option 1 +Use the HDFSParquetImporter tool. As the name suggests, this only works if your existing dataset is in +parquet file +format. This tool essentially starts a Spark Job to read the existing parquet dataset and converts it into a HUDI managed dataset by re-writing all the data. +#### Option 2 +For huge datasets, this could be as simple as : for partition in [list of partitions in source dataset] { + val inputDF = spark.read.format("any_input_format").load("partition_path") + inputDF.write.format("com.uber.hoodie").option()....save("basePath") + } +#### Option 3 +Write your own custom logic of how to load an existing dataset into a Hudi managed one. Please read about the RDD API + [here](quickstart.md). + +``` +Using the HDFSParquetImporter Tool. Once hoodie has been built via `mvn clean install -DskipTests`, the shell can be +fired by via `cd hoodie-cli && ./hoodie-cli.sh`. + +hoodie->hdfsparquetimport + --upsert false + --srcPath /user/parquet/dataset/basepath + --targetPath + /user/hoodie/dataset/basepath + --tableName hoodie_table + --tableType COPY_ON_WRITE + --rowKeyField _row_key + --partitionPathField partitionStr + --parallelism 1500 + --schemaFilePath /user/table/schema + --format parquet + --sparkMemory 6g + --retry 2 +``` \ No newline at end of file diff --git a/docs/powered_by.md b/docs/powered_by.md index 63abc226a..d4d6fe1bf 100644 --- a/docs/powered_by.md +++ b/docs/powered_by.md @@ -26,7 +26,13 @@ It also powers several incremental Hive ETL pipelines and being currently integr 3. ["Incremental Processing on Large Analytical Datasets"](https://spark-summit.org/2017/events/incremental-processing-on-large-analytical-datasets/) - By Prasanna Rajaperumal June 2017, Spark Summit 2017, San Francisco, CA. [Slides](https://www.slideshare.net/databricks/incremental-processing-on-large-analytical-datasets-with-prasanna-rajaperumal-and-vinoth-chandar) [Video](https://www.youtube.com/watch?v=3HS0lQX-cgo&feature=youtu.be) +4. ["Hudi: Unifying storage and serving for batch and near-real-time analytics"](https://conferences.oreilly.com/strata/strata-ny/public/schedule/detail/70937) - By Nishith Agarwal & Balaji Vardarajan + September 2018, Strata Data Conference, New York, NY +5. ["Hudi: Large-Scale, Near Real-Time Pipelines at Uber"](https://databricks +.com/session/hudi-near-real-time-spark-pipelines-at-petabyte-scale) - By Vinoth Chander & Nishith Agarwal + October 2018, Spark+AI Summit Europe, London, UK + ## Articles 1. ["The Case for incremental processing on Hadoop"](https://www.oreilly.com/ideas/ubers-case-for-incremental-processing-on-hadoop) - O'reilly Ideas article by Vinoth Chandar diff --git a/docs/quickstart.md b/docs/quickstart.md index d025f3e0d..41ec9a96d 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -27,7 +27,7 @@ $ mvn clean install -DskipTests -DskipITs -Dhive11 {% include note.html content="Setup your local hadoop/hive test environment, so you can play with entire ecosystem. See [this](http://www.bytearray.io/2016/05/setting-up-hadoopyarnsparkhive-on-mac.html) for reference" %} -## Supported Versions +## Version Compatibility Hoodie requires Java 8 to be installed. Hoodie works with Spark-2.x versions. We have verified that hoodie works with the following combination of Hadoop/Hive/Spark. @@ -58,7 +58,13 @@ export SPARK_CONF_DIR=$SPARK_HOME/conf export PATH=$JAVA_HOME/bin:$HIVE_HOME/bin:$HADOOP_HOME/bin:$SPARK_INSTALL/bin:$PATH ``` -### DataSource API +### Supported API's + +Use the DataSource API to quickly start reading or writing hoodie datasets in few lines of code. Ideal for most +ingestion use-cases. +Use the RDD API to perform more involved actions on a hoodie dataset + +#### DataSource API Run __hoodie-spark/src/test/java/HoodieJavaApp.java__ class, to place a two commits (commit 1 => 100 inserts, commit 2 => 100 updates to previously inserted 100 records) onto your HDFS/local filesystem. Use the wrapper script to run from command-line @@ -86,14 +92,16 @@ Usage:
[options] The class lets you choose table names, output paths and one of the storage types. In your own applications, be sure to include the `hoodie-spark` module as dependency and follow a similar pattern to write/read datasets via the datasource. -### RDD API +#### RDD API RDD level APIs give you more power and control over things, via the `hoodie-client` module . Refer to __hoodie-client/src/test/java/HoodieClientExample.java__ class for an example. -## Register Dataset to Hive Metastore +## Query a Hoodie dataset + +### Register Dataset to Hive Metastore Now, lets see how we can publish this data into Hive. @@ -215,11 +223,11 @@ ALTER TABLE `hoodie_rt` ADD IF NOT EXISTS PARTITION (datestr='2015-03-17') LOCAT -## Querying The Dataset +### Using different query engines Now, we can proceed to query the dataset, as we would normally do across all the three query engines supported. -### HiveQL +#### HiveQL Let's first perform a query on the latest committed snapshot of the table @@ -232,7 +240,7 @@ Time taken: 18.05 seconds, Fetched: 1 row(s) hive> ``` -### SparkSQL +#### SparkSQL Spark is super easy, once you get Hive working as above. Just spin up a Spark Shell as below @@ -249,7 +257,7 @@ scala> sqlContext.sql("select count(*) from hoodie_test").show(10000) You can also use the sample queries in __hoodie-utilities/src/test/java/HoodieSparkSQLExample.java__ for running on `hoodie_rt` -### Presto +#### Presto Checkout the 'master' branch on OSS Presto, build it, and place your installation somewhere. @@ -263,7 +271,7 @@ select count(*) from hive.default.hoodie_test -## Incremental Queries +## Incremental Queries of a Hoodie dataset Let's now perform a query, to obtain the __ONLY__ changed rows since a commit in the past.