From 8f1d362015aeff281c7327d657b0dd7d89ff3083 Mon Sep 17 00:00:00 2001 From: vinothchandar Date: Sun, 10 Jun 2018 18:54:58 -0700 Subject: [PATCH] Fixing deps & serialization for RTView - hoodie-hadoop-mr now needs objectsize bundled - Also updated docs with additional tuning tips --- docs/configurations.md | 5 ++++- hoodie-hadoop-mr/pom.xml | 6 ++++++ .../hadoop/realtime/HoodieRealtimeFileSplit.java | 16 ++++++++-------- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/docs/configurations.md b/docs/configurations.md index f8c946698..4639fb2e5 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -159,13 +159,16 @@ summary: "Here we list all possible configurations and what they mean" Writing data via Hoodie happens as a Spark job and thus general rules of spark debugging applies here too. Below is a list of things to keep in mind, if you are looking to improving performance or reliability. - **Right operations** : Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. Difference between them is that bulk insert uses a disk based write path to scale to load large inputs without need to cache it. - - **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs + - **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs. We recommend having shuffle parallelism `hoodie.[insert|upsert|bulkinsert].shuffle.parallelism` such that its atleast input_data_size/500MB - **Off-heap memory** : Hoodie writes parquet files and that needs good amount of off-heap memory proportional to schema width. Consider setting something like `spark.yarn.executor.memoryOverhead` or `spark.yarn.driver.memoryOverhead`, if you are running into such failures. - **Spark Memory** : Typically, hoodie needs to be able to read a single file into memory to perform merges or compactions and thus the executor memory should be sufficient to accomodate this. In addition, Hoodie caches the input to be able to intelligently place data and thus leaving some `spark.storage.memoryFraction` will generally help boost performance. - **Sizing files** : Set `limitFileSize` above judiciously, to balance ingest/write latency vs number of files & consequently metadata overhead associated with it. - **Timeseries/Log data** : Default configs are tuned for database/nosql changelogs where individual record sizes are large. Another very popular class of data is timeseries/event/log data that tends to be more volumnious with lot more records per partition. In such cases - Consider tuning the bloom filter accuracy via `.bloomFilterFPP()/bloomFilterNumEntries()` to achieve your target index look up time - Consider making a key that is prefixed with time of the event, which will enable range pruning & significantly speeding up index lookup. + - **GC Tuning** : Please be sure to follow garbage collection tuning tips from Spark tuning guide to avoid OutOfMemory errors + - [Must] Use G1/CMS Collector. Sample CMS Flags to add to spark.executor.extraJavaOptions : ``-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof` + - If it keeps OOMing still, reduce spark memory conservatively: `spark.memory.fraction=0.2, spark.memory.storageFraction=0.2` allowing it to spill rather than OOM. (reliably slow vs crashing intermittently) Below is a full working production config diff --git a/hoodie-hadoop-mr/pom.xml b/hoodie-hadoop-mr/pom.xml index 7bf618011..eae2eb8d7 100644 --- a/hoodie-hadoop-mr/pom.xml +++ b/hoodie-hadoop-mr/pom.xml @@ -79,6 +79,11 @@ com.twitter parquet-avro + + com.twitter.common + objectsize + 0.0.12 + org.apache.avro avro @@ -114,6 +119,7 @@ com.uber.hoodie:hoodie-common com.twitter:parquet-avro + com.twitter.common:objectsize diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java index 5ba7545b7..36f6a54b7 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java @@ -63,22 +63,22 @@ public class HoodieRealtimeFileSplit extends FileSplit { } private static void writeString(String str, DataOutput out) throws IOException { - byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8); - out.writeInt(pathBytes.length); - out.write(pathBytes); + byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + out.writeInt(bytes.length); + out.write(bytes); } private static String readString(DataInput in) throws IOException { - byte[] pathBytes = new byte[in.readInt()]; - in.readFully(pathBytes); - return new String(pathBytes, StandardCharsets.UTF_8); + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + return new String(bytes, StandardCharsets.UTF_8); } @Override public void write(DataOutput out) throws IOException { super.write(out); - + writeString(basePath, out); writeString(maxCommitTime, out); out.writeInt(deltaFilePaths.size()); for (String logFilePath : deltaFilePaths) { @@ -89,7 +89,7 @@ public class HoodieRealtimeFileSplit extends FileSplit { @Override public void readFields(DataInput in) throws IOException { super.readFields(in); - + basePath = readString(in); maxCommitTime = readString(in); int totalLogFiles = in.readInt(); deltaFilePaths = new ArrayList<>(totalLogFiles);