diff --git a/docs/configurations.md b/docs/configurations.md
index bd4f4a27a..f8c946698 100644
--- a/docs/configurations.md
+++ b/docs/configurations.md
@@ -7,6 +7,8 @@ toc: false
summary: "Here we list all possible configurations and what they mean"
---
+### Configuration
+
* [HoodieWriteConfig](#HoodieWriteConfig)
Top Level Config which is passed in when HoodieWriteClent is created.
- [withPath](#withPath) (hoodie_base_path)
@@ -152,4 +154,44 @@ summary: "Here we list all possible configurations and what they mean"
`instant_time <= END_INSTANTTIME` are fetched out.
-{% include callout.html content="Hoodie is a young project. A lot of pluggable interfaces and configurations to support diverse workloads need to be created. Get involved [here](https://github.com/uber/hoodie)" type="info" %}
+### Tuning
+
+Writing data via Hoodie happens as a Spark job and thus general rules of spark debugging applies here too. Below is a list of things to keep in mind, if you are looking to improving performance or reliability.
+
+ - **Right operations** : Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. Difference between them is that bulk insert uses a disk based write path to scale to load large inputs without need to cache it.
+ - **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs
+ - **Off-heap memory** : Hoodie writes parquet files and that needs good amount of off-heap memory proportional to schema width. Consider setting something like `spark.yarn.executor.memoryOverhead` or `spark.yarn.driver.memoryOverhead`, if you are running into such failures.
+ - **Spark Memory** : Typically, hoodie needs to be able to read a single file into memory to perform merges or compactions and thus the executor memory should be sufficient to accomodate this. In addition, Hoodie caches the input to be able to intelligently place data and thus leaving some `spark.storage.memoryFraction` will generally help boost performance.
+ - **Sizing files** : Set `limitFileSize` above judiciously, to balance ingest/write latency vs number of files & consequently metadata overhead associated with it.
+ - **Timeseries/Log data** : Default configs are tuned for database/nosql changelogs where individual record sizes are large. Another very popular class of data is timeseries/event/log data that tends to be more volumnious with lot more records per partition. In such cases
+ - Consider tuning the bloom filter accuracy via `.bloomFilterFPP()/bloomFilterNumEntries()` to achieve your target index look up time
+ - Consider making a key that is prefixed with time of the event, which will enable range pruning & significantly speeding up index lookup.
+
+ Below is a full working production config
+
+ ```
+ spark.driver.extraClassPath /etc/hive/conf
+ spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
+ spark.driver.maxResultSize 2g
+ spark.driver.memory 4g
+ spark.executor.cores 1
+ spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
+ spark.executor.id driver
+ spark.executor.instances 300
+ spark.executor.memory 6g
+ spark.rdd.compress true
+
+ spark.kryoserializer.buffer.max 512m
+ spark.serializer org.apache.spark.serializer.KryoSerializer
+ spark.shuffle.memoryFraction 0.2
+ spark.shuffle.service.enabled true
+ spark.sql.hive.convertMetastoreParquet false
+ spark.storage.memoryFraction 0.6
+ spark.submit.deployMode cluster
+ spark.task.cpus 1
+ spark.task.maxFailures 4
+
+ spark.yarn.driver.memoryOverhead 1024
+ spark.yarn.executor.memoryOverhead 3072
+ spark.yarn.max.executor.failures 100
+ ```
diff --git a/docs/s3_filesystem.md b/docs/s3_filesystem.md
index adb1cefcb..09d70e625 100644
--- a/docs/s3_filesystem.md
+++ b/docs/s3_filesystem.md
@@ -17,7 +17,9 @@ There are two configurations required for Hoodie-S3 compatibility:
### AWS Credentials
-Add the required configs in your core-site.xml from where Hoodie can fetch them. Replace the `fs.defaultFS` with your S3 bucket name and Hoodie should be able to read/write from the bucket.
+Simplest way to use Hoodie with S3, is to configure your `SparkSession` or `SparkContext` with S3 credentials. Hoodie will automatically pick this up and talk to S3.
+
+Alternatively, add the required configs in your core-site.xml from where Hoodie can fetch them. Replace the `fs.defaultFS` with your S3 bucket name and Hoodie should be able to read/write from the bucket.
```
@@ -51,6 +53,22 @@ Add the required configs in your core-site.xml from where Hoodie can fetch them.
```
+
+Utilities such as hoodie-cli or deltastreamer tool, can pick up s3 creds via environmental variable prefixed with `HOODIE_ENV_`. For e.g below is a bash snippet to setup
+such variables and then have cli be able to work on datasets stored in s3
+
+```
+export HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key=$accessKey
+export HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key=$secretKey
+export HOODIE_ENV_fs_DOT_s3_DOT_awsAccessKeyId=$accessKey
+export HOODIE_ENV_fs_DOT_s3_DOT_awsSecretAccessKey=$secretKey
+export HOODIE_ENV_fs_DOT_s3n_DOT_awsAccessKeyId=$accessKey
+export HOODIE_ENV_fs_DOT_s3n_DOT_awsSecretAccessKey=$secretKey
+export HOODIE_ENV_fs_DOT_s3n_DOT_impl=org.apache.hadoop.fs.s3a.S3AFileSystem
+```
+
+
+
### AWS Libs
AWS hadoop libraries to add to our classpath
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index e0fe99379..ebec1dd7b 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -41,7 +41,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String TABLE_NAME = "hoodie.table.name";
private static final String BASE_PATH_PROP = "hoodie.base.path";
private static final String AVRO_SCHEMA = "hoodie.avro.schema";
- private static final String DEFAULT_PARALLELISM = "200";
+ private static final String DEFAULT_PARALLELISM = "1500";
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
index 09d1a2cab..45b69e0b6 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
@@ -74,6 +74,13 @@ public class HoodieAvroUtils {
return reader.read(null, decoder);
}
+ public static boolean isMetadataField(String fieldName) {
+ return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName)
+ || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
+ || HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
+ || HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
+ || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName);
+ }
/**
* Adds the Hoodie metadata fields to the given schema
@@ -98,7 +105,9 @@ public class HoodieAvroUtils {
parentFields.add(partitionPathField);
parentFields.add(fileNameField);
for (Schema.Field field : schema.getFields()) {
- parentFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), null));
+ if (!isMetadataField(field.name())) {
+ parentFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), null));
+ }
}
Schema mergedSchema = Schema
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java
index 44672ec4b..158aa0039 100644
--- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieROTablePathFilter.java
@@ -20,7 +20,6 @@ import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
-import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import java.io.Serializable;
@@ -61,6 +60,9 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
private HashSet nonHoodiePathCache;
+ private transient FileSystem fs;
+
+
public HoodieROTablePathFilter() {
hoodiePathCache = new HashMap<>();
nonHoodiePathCache = new HashSet<>();
@@ -79,7 +81,6 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
return null;
}
-
@Override
public boolean accept(Path path) {
@@ -88,9 +89,8 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
}
Path folder = null;
try {
- FileSystem fs = path.getFileSystem(FSUtils.prepareHadoopConf(new Configuration()));
- if (fs.isDirectory(path)) {
- return true;
+ if (fs == null) {
+ fs = path.getFileSystem(new Configuration());
}
// Assumes path is a file
diff --git a/hoodie-spark/pom.xml b/hoodie-spark/pom.xml
index 3e8878ea9..634dab06b 100644
--- a/hoodie-spark/pom.xml
+++ b/hoodie-spark/pom.xml
@@ -142,7 +142,7 @@
com.databricks
spark-avro_2.11
- 3.2.0
+ 4.0.0
com.fasterxml.jackson.core
diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala
index 4312636fa..e92043490 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala
@@ -43,6 +43,16 @@ object AvroConversionUtils {
}
}
+ def getNewRecordNamespace(elementDataType: DataType,
+ currentRecordNamespace: String,
+ elementName: String): String = {
+
+ elementDataType match {
+ case StructType(_) => s"$currentRecordNamespace.$elementName"
+ case _ => currentRecordNamespace
+ }
+ }
+
def createConverterToAvro(dataType: DataType,
structName: String,
recordNamespace: String): (Any) => Any = {
@@ -60,7 +70,10 @@ object AvroConversionUtils {
case DateType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Date].getTime
case ArrayType(elementType, _) =>
- val elementConverter = createConverterToAvro(elementType, structName, recordNamespace)
+ val elementConverter = createConverterToAvro(
+ elementType,
+ structName,
+ getNewRecordNamespace(elementType, recordNamespace, structName))
(item: Any) => {
if (item == null) {
null
@@ -77,7 +90,10 @@ object AvroConversionUtils {
}
}
case MapType(StringType, valueType, _) =>
- val valueConverter = createConverterToAvro(valueType, structName, recordNamespace)
+ val valueConverter = createConverterToAvro(
+ valueType,
+ structName,
+ getNewRecordNamespace(valueType, recordNamespace, structName))
(item: Any) => {
if (item == null) {
null
@@ -94,7 +110,10 @@ object AvroConversionUtils {
val schema: Schema = SchemaConverters.convertStructToAvro(
structType, builder, recordNamespace)
val fieldConverters = structType.fields.map(field =>
- createConverterToAvro(field.dataType, field.name, recordNamespace))
+ createConverterToAvro(
+ field.dataType,
+ field.name,
+ getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
(item: Any) => {
if (item == null) {
null
diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala
index dec9160e2..dd744da6d 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala
@@ -134,11 +134,16 @@ class DefaultSource extends RelationProvider
df: DataFrame): BaseRelation = {
val parameters = parametersWithWriteDefaults(optParams).toMap
+ val sparkContext = sqlContext.sparkContext
val path = parameters.get("path")
val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME)
if (path.isEmpty || tblName.isEmpty) {
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.")
}
+ val serializer = sparkContext.getConf.get("spark.serializer")
+ if (!serializer.equals("org.apache.spark.serializer.KryoSerializer")) {
+ throw new HoodieException(s"${serializer} serialization is not supported by hoodie. Please use kryo.")
+ }
val storageType = parameters(STORAGE_TYPE_OPT_KEY)
val operation = parameters(OPERATION_OPT_KEY)
@@ -146,11 +151,12 @@ class DefaultSource extends RelationProvider
// register classes & schemas
val structName = s"${tblName.get}_record"
val nameSpace = s"hoodie.${tblName.get}"
- sqlContext.sparkContext.getConf.registerKryoClasses(
+ sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
- sqlContext.sparkContext.getConf.registerAvroSchemas(schema)
+ sparkContext.getConf.registerAvroSchemas(schema)
+ log.info(s"Registered avro schema : ${schema.toString(true)}");
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(
@@ -167,7 +173,7 @@ class DefaultSource extends RelationProvider
val basePath = new Path(parameters.get("path").get)
- val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(basePath)
// Handle various save modes
@@ -190,12 +196,11 @@ class DefaultSource extends RelationProvider
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType);
properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived");
- HoodieTableMetaClient.initializePathAsHoodieDataset(
- sqlContext.sparkContext.hadoopConfiguration, path.get, properties);
+ HoodieTableMetaClient.initializePathAsHoodieDataset(sparkContext.hadoopConfiguration, path.get, properties);
}
// Create a HoodieWriteClient & issue the write.
- val client = DataSourceUtils.createHoodieClient(new JavaSparkContext(sqlContext.sparkContext),
+ val client = DataSourceUtils.createHoodieClient(new JavaSparkContext(sparkContext),
schema.toString,
path.get,
tblName.get,