diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala index e92043490..0dd03ee15 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala @@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericData.Record import org.apache.avro.generic.GenericRecord import org.apache.avro.{Schema, SchemaBuilder} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row} @@ -34,7 +35,9 @@ object AvroConversionUtils { def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { val dataType = df.schema - df.rdd.mapPartitions { records => + val encoder = RowEncoder.apply(dataType).resolveAndBind() + df.queryExecution.toRdd.map(encoder.fromRow) + .mapPartitions { records => if (records.isEmpty) Iterator.empty else { val convertor = createConverterToAvro(dataType, structName, recordNamespace) diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala index 63e1d19c5..7e4f8f03e 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DataSourceOptions.scala @@ -152,6 +152,28 @@ object DataSourceWriteOptions { val INSERT_DROP_DUPS_OPT_KEY = "hoodie.datasource.write.insert.drop.duplicates" val DEFAULT_INSERT_DROP_DUPS_OPT_VAL = "false" + /** + * Flag to indicate how many times streaming job should retry for a failed microbatch + * By default 3 + */ + val STREAMING_RETRY_CNT_OPT_KEY = "hoodie.datasource.write.streaming.retry.count" + val DEFAULT_STREAMING_RETRY_CNT_OPT_VAL = "3" + + /** + * Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch + * By default 2000 and it will be doubled by every retry + */ + val STREAMING_RETRY_INTERVAL_MS_OPT_KEY = "hoodie.datasource.write.streaming.retry.interval.ms" + val DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL = "2000" + + /** + * Flag to indicate whether to ignore any non exception error (e.g. writestatus error) + * within a streaming microbatch + * By default true (in favor of streaming progressing over data integrity) + */ + val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch" + val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true" + // HIVE SYNC SPECIFIC CONFIGS //NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes // unexpected issues with config getting reset diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala index 608974baf..dcf7628bd 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -18,30 +18,20 @@ package com.uber.hoodie -import java.util -import java.util.Optional import java.util.concurrent.ConcurrentHashMap import com.uber.hoodie.DataSourceReadOptions._ -import com.uber.hoodie.DataSourceWriteOptions._ -import com.uber.hoodie.common.table.HoodieTableMetaClient -import com.uber.hoodie.common.util.{FSUtils, TypedProperties} -import com.uber.hoodie.config.HoodieWriteConfig import com.uber.hoodie.exception.HoodieException -import com.uber.hoodie.hive.{HiveSyncConfig, HiveSyncTool} -import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.conf.HiveConf import org.apache.log4j.LogManager -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources._ +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer +import scala.collection.mutable /** * Hoodie Spark Datasource, for reading and writing hoodie datasets @@ -51,6 +41,7 @@ class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider with DataSourceRegister + with StreamSinkProvider with Serializable { private val log = LogManager.getLogger(classOf[DefaultSource]) @@ -66,7 +57,7 @@ class DefaultSource extends RelationProvider * @param parameters * @return */ - def parametersWithReadDefaults(parameters: Map[String, String]) = { + def parametersWithReadDefaults(parameters: Map[String, String]): mutable.Map[String, String] = { val defaultsMap = new ConcurrentHashMap[String, String](mapAsJavaMap(parameters)) defaultsMap.putIfAbsent(VIEW_TYPE_OPT_KEY, DEFAULT_VIEW_TYPE_OPT_VAL) mapAsScalaMap(defaultsMap) @@ -106,216 +97,27 @@ class DefaultSource extends RelationProvider } } - /** - * Add default options for unspecified write options keys. - * - * @param parameters - * @return - */ - def parametersWithWriteDefaults(parameters: Map[String, String]) = { - val defaultsMap = new ConcurrentHashMap[String, String](mapAsJavaMap(parameters)) - defaultsMap.putIfAbsent(OPERATION_OPT_KEY, DEFAULT_OPERATION_OPT_VAL) - defaultsMap.putIfAbsent(STORAGE_TYPE_OPT_KEY, DEFAULT_STORAGE_TYPE_OPT_VAL) - defaultsMap.putIfAbsent(PRECOMBINE_FIELD_OPT_KEY, DEFAULT_PRECOMBINE_FIELD_OPT_VAL) - defaultsMap.putIfAbsent(PAYLOAD_CLASS_OPT_KEY, DEFAULT_PAYLOAD_OPT_VAL) - defaultsMap.putIfAbsent(RECORDKEY_FIELD_OPT_KEY, DEFAULT_RECORDKEY_FIELD_OPT_VAL) - defaultsMap.putIfAbsent(PARTITIONPATH_FIELD_OPT_KEY, DEFAULT_PARTITIONPATH_FIELD_OPT_VAL) - defaultsMap.putIfAbsent(KEYGENERATOR_CLASS_OPT_KEY, DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) - defaultsMap.putIfAbsent(COMMIT_METADATA_KEYPREFIX_OPT_KEY, DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL) - defaultsMap.putIfAbsent(INSERT_DROP_DUPS_OPT_KEY, DEFAULT_INSERT_DROP_DUPS_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_SYNC_ENABLED_OPT_KEY, DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_DATABASE_OPT_KEY, DEFAULT_HIVE_DATABASE_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_TABLE_OPT_KEY, DEFAULT_HIVE_TABLE_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_USER_OPT_KEY, DEFAULT_HIVE_USER_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_PASS_OPT_KEY, DEFAULT_HIVE_PASS_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_URL_OPT_KEY, DEFAULT_HIVE_URL_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_PARTITION_FIELDS_OPT_KEY, DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_ASSUME_DATE_PARTITION_OPT_KEY, DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL) - mapAsScalaMap(defaultsMap) - } - - def toProperties(params: Map[String, String]): TypedProperties = { - val props = new TypedProperties() - params.foreach(kv => props.setProperty(kv._1, kv._2)) - props - } - - override def createRelation(sqlContext: SQLContext, mode: SaveMode, optParams: Map[String, String], df: DataFrame): BaseRelation = { - val parameters = parametersWithWriteDefaults(optParams).toMap - val sparkContext = sqlContext.sparkContext - val path = parameters.get("path") - val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) - if (path.isEmpty || tblName.isEmpty) { - throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") - } - val serializer = sparkContext.getConf.get("spark.serializer") - if (!serializer.equals("org.apache.spark.serializer.KryoSerializer")) { - throw new HoodieException(s"${serializer} serialization is not supported by hoodie. Please use kryo.") - } - - val storageType = parameters(STORAGE_TYPE_OPT_KEY) - val operation = - // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true - // Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly - // or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) . - if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean && - parameters(OPERATION_OPT_KEY) == UPSERT_OPERATION_OPT_VAL) { - - log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " + - s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " + - s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL") - - INSERT_OPERATION_OPT_VAL - } else { - parameters(OPERATION_OPT_KEY) - } - - // register classes & schemas - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" - sparkContext.getConf.registerKryoClasses( - Array(classOf[org.apache.avro.generic.GenericData], - classOf[org.apache.avro.Schema])) - val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - sparkContext.getConf.registerAvroSchemas(schema) - log.info(s"Registered avro schema : ${schema.toString(true)}"); - - // Convert to RDD[HoodieRecord] - val keyGenerator = DataSourceUtils.createKeyGenerator( - parameters(KEYGENERATOR_CLASS_OPT_KEY), - toProperties(parameters) - ) - val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) - val hoodieAllIncomingRecords = genericRecords.map(gr => { - val orderingVal = DataSourceUtils.getNestedFieldValAsString( - gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] - DataSourceUtils.createHoodieRecord(gr, - orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) - }).toJavaRDD(); - - val jsc = new JavaSparkContext(sparkContext) - - val hoodieRecords = - if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { - DataSourceUtils.dropDuplicates( - jsc, - hoodieAllIncomingRecords, - mapAsJavaMap(parameters)) - } else { - hoodieAllIncomingRecords - } - - if (!hoodieRecords.isEmpty()) { - val basePath = new Path(parameters.get("path").get) - val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) - var exists = fs.exists(basePath) - - // Handle various save modes - if (mode == SaveMode.ErrorIfExists && exists) { - throw new HoodieException(s"basePath ${basePath} already exists.") - } - if (mode == SaveMode.Ignore && exists) { - log.warn(s" basePath ${basePath} already exists. Ignoring & not performing actual writes.") - return createRelation(sqlContext, parameters, df.schema) - } - if (mode == SaveMode.Overwrite && exists) { - log.warn(s" basePath ${basePath} already exists. Deleting existing data & overwriting with new data.") - fs.delete(basePath, true) - exists = false - } - - // Create the dataset if not present (APPEND mode) - if (!exists) { - HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, - tblName.get, "archived") - } - - // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, - schema.toString, - path.get, - tblName.get, - mapAsJavaMap(parameters) - ) - val commitTime = client.startCommit(); - - val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) - // Check for errors and commit the write. - val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() - if (errorCount == 0) { - log.info("No errors. Proceeding to commit the write."); - val metaMap = parameters.filter(kv => - kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) - val success = if (metaMap.isEmpty) { - client.commit(commitTime, writeStatuses) - } else { - client.commit(commitTime, writeStatuses, - Optional.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) - } - - if (success) { - log.info("Commit " + commitTime + " successful!") - } - else { - log.info("Commit " + commitTime + " failed!") - } - - val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false) - if (hiveSyncEnabled) { - log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") - val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) - syncHive(basePath, fs, parameters) - } - client.close - } else { - log.error(s"$operation failed with ${errorCount} errors :"); - if (log.isTraceEnabled) { - log.trace("Printing out the top 100 errors") - writeStatuses.rdd.filter(ws => ws.hasErrors) - .take(100) - .foreach(ws => { - log.trace("Global error :", ws.getGlobalError) - if (ws.getErrors.size() > 0) { - ws.getErrors.foreach(kt => - log.trace(s"Error for key: ${kt._1}", kt._2)) - } - }) - } - } - } else { - log.info("new batch has no new records, skipping...") - } + val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams).toMap + HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df) createRelation(sqlContext, parameters, df.schema) } - private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = { - val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters) - val hiveConf: HiveConf = new HiveConf() - hiveConf.addResource(fs.getConf) - new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable() - true + override def createSink(sqlContext: SQLContext, + optParams: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams).toMap + new HoodieStreamingSink( + sqlContext, + parameters, + partitionColumns, + outputMode) } - private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = { - val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig() - hiveSyncConfig.basePath = basePath.toString - hiveSyncConfig.assumeDatePartitioning = - parameters.get(HIVE_ASSUME_DATE_PARTITION_OPT_KEY).exists(r => r.toBoolean) - hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY) - hiveSyncConfig.tableName = parameters(HIVE_TABLE_OPT_KEY) - hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY) - hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY) - hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY) - hiveSyncConfig.partitionFields = - ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).toList: _*) - hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) - hiveSyncConfig - } override def shortName(): String = "hoodie" } diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala new file mode 100644 index 000000000..93ad54878 --- /dev/null +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ +package com.uber.hoodie + +import java.util +import java.util.concurrent.ConcurrentHashMap +import java.util.Optional + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ListBuffer + +import com.uber.hoodie.DataSourceWriteOptions._ +import com.uber.hoodie.common.table.HoodieTableMetaClient +import com.uber.hoodie.common.util.{FSUtils, TypedProperties} +import com.uber.hoodie.config.HoodieWriteConfig +import com.uber.hoodie.exception.HoodieException +import com.uber.hoodie.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} + +import scala.collection.mutable + +private[hoodie] object HoodieSparkSqlWriter { + + private val log = LogManager.getLogger("HoodieSparkSQLWriter") + + def write(sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + df: DataFrame): (Boolean, Option[String]) = { + + val sparkContext = sqlContext.sparkContext + val path = parameters.get("path") + val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) + if (path.isEmpty || tblName.isEmpty) { + throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") + } + val serializer = sparkContext.getConf.get("spark.serializer") + if (!serializer.equals("org.apache.spark.serializer.KryoSerializer")) { + throw new HoodieException(s"${serializer} serialization is not supported by hoodie. Please use kryo.") + } + + val storageType = parameters(STORAGE_TYPE_OPT_KEY) + val operation = + // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true + // Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly + // or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) . + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean && + parameters(OPERATION_OPT_KEY) == UPSERT_OPERATION_OPT_VAL) { + + log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " + + s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " + + s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL") + + INSERT_OPERATION_OPT_VAL + } else { + parameters(OPERATION_OPT_KEY) + } + + // register classes & schemas + val structName = s"${tblName.get}_record" + val nameSpace = s"hoodie.${tblName.get}" + sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + sparkContext.getConf.registerAvroSchemas(schema) + log.info(s"Registered avro schema : ${schema.toString(true)}") + + // Convert to RDD[HoodieRecord] + val keyGenerator = DataSourceUtils.createKeyGenerator( + parameters(KEYGENERATOR_CLASS_OPT_KEY), + toProperties(parameters) + ) + val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) + val hoodieAllIncomingRecords = genericRecords.map(gr => { + val orderingVal = DataSourceUtils.getNestedFieldValAsString( + gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] + DataSourceUtils.createHoodieRecord(gr, + orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) + }).toJavaRDD(); + + val jsc = new JavaSparkContext(sparkContext) + + val hoodieRecords = + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { + DataSourceUtils.dropDuplicates( + jsc, + hoodieAllIncomingRecords, + mapAsJavaMap(parameters)) + } else { + hoodieAllIncomingRecords + } + + if (hoodieRecords.isEmpty()) { + log.info("new batch has no new records, skipping...") + return (true, None) + } + + val basePath = new Path(parameters.get("path").get) + val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) + var exists = fs.exists(basePath) + + // Handle various save modes + if (mode == SaveMode.ErrorIfExists && exists) { + throw new HoodieException(s"basePath ${basePath} already exists.") + } + if (mode == SaveMode.Ignore && exists) { + log.warn(s" basePath ${basePath} already exists. Ignoring & not performing actual writes.") + return (true, None) + } + if (mode == SaveMode.Overwrite && exists) { + log.warn(s" basePath ${basePath} already exists. Deleting existing data & overwriting with new data.") + fs.delete(basePath, true) + exists = false + } + + // Create the dataset if not present (APPEND mode) + if (!exists) { + HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, + tblName.get, "archived") + } + + // Create a HoodieWriteClient & issue the write. + val client = DataSourceUtils.createHoodieClient(jsc, + schema.toString, + path.get, + tblName.get, + mapAsJavaMap(parameters) + ) + val commitTime = client.startCommit() + + val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) + // Check for errors and commit the write. + val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() + val writeSuccessful = + if (errorCount == 0) { + log.info("No errors. Proceeding to commit the write.") + val metaMap = parameters.filter(kv => + kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) + val commitSuccess = if (metaMap.isEmpty) { + client.commit(commitTime, writeStatuses) + } else { + client.commit(commitTime, writeStatuses, + Optional.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } + + if (commitSuccess) { + log.info("Commit " + commitTime + " successful!") + } + else { + log.info("Commit " + commitTime + " failed!") + } + + val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false) + val syncHiveSucess = if (hiveSyncEnabled) { + log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") + val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) + syncHive(basePath, fs, parameters) + } else { + true + } + client.close() + commitSuccess && syncHiveSucess + } else { + log.error(s"$operation failed with ${errorCount} errors :"); + if (log.isTraceEnabled) { + log.trace("Printing out the top 100 errors") + writeStatuses.rdd.filter(ws => ws.hasErrors) + .take(100) + .foreach(ws => { + log.trace("Global error :", ws.getGlobalError) + if (ws.getErrors.size() > 0) { + ws.getErrors.foreach(kt => + log.trace(s"Error for key: ${kt._1}", kt._2)) + } + }) + } + false + } + (writeSuccessful, Some(commitTime)) + } + + /** + * Add default options for unspecified write options keys. + * + * @param parameters + * @return + */ + def parametersWithWriteDefaults(parameters: Map[String, String]): mutable.Map[String, String] = { + val defaultsMap = new ConcurrentHashMap[String, String](mapAsJavaMap(parameters)) + defaultsMap.putIfAbsent(OPERATION_OPT_KEY, DEFAULT_OPERATION_OPT_VAL) + defaultsMap.putIfAbsent(STORAGE_TYPE_OPT_KEY, DEFAULT_STORAGE_TYPE_OPT_VAL) + defaultsMap.putIfAbsent(PRECOMBINE_FIELD_OPT_KEY, DEFAULT_PRECOMBINE_FIELD_OPT_VAL) + defaultsMap.putIfAbsent(PAYLOAD_CLASS_OPT_KEY, DEFAULT_PAYLOAD_OPT_VAL) + defaultsMap.putIfAbsent(RECORDKEY_FIELD_OPT_KEY, DEFAULT_RECORDKEY_FIELD_OPT_VAL) + defaultsMap.putIfAbsent(PARTITIONPATH_FIELD_OPT_KEY, DEFAULT_PARTITIONPATH_FIELD_OPT_VAL) + defaultsMap.putIfAbsent(KEYGENERATOR_CLASS_OPT_KEY, DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) + defaultsMap.putIfAbsent(COMMIT_METADATA_KEYPREFIX_OPT_KEY, DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL) + defaultsMap.putIfAbsent(INSERT_DROP_DUPS_OPT_KEY, DEFAULT_INSERT_DROP_DUPS_OPT_VAL) + defaultsMap.putIfAbsent(STREAMING_RETRY_CNT_OPT_KEY, DEFAULT_STREAMING_RETRY_CNT_OPT_VAL) + defaultsMap.putIfAbsent(STREAMING_RETRY_INTERVAL_MS_OPT_KEY, DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL) + defaultsMap.putIfAbsent(STREAMING_IGNORE_FAILED_BATCH_OPT_KEY, DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_SYNC_ENABLED_OPT_KEY, DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_DATABASE_OPT_KEY, DEFAULT_HIVE_DATABASE_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_TABLE_OPT_KEY, DEFAULT_HIVE_TABLE_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_USER_OPT_KEY, DEFAULT_HIVE_USER_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_PASS_OPT_KEY, DEFAULT_HIVE_PASS_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_URL_OPT_KEY, DEFAULT_HIVE_URL_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_PARTITION_FIELDS_OPT_KEY, DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL) + defaultsMap.putIfAbsent(HIVE_ASSUME_DATE_PARTITION_OPT_KEY, DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL) + mapAsScalaMap(defaultsMap) + } + + def toProperties(params: Map[String, String]): TypedProperties = { + val props = new TypedProperties() + params.foreach(kv => props.setProperty(kv._1, kv._2)) + props + } + + private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = { + val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters) + val hiveConf: HiveConf = new HiveConf() + hiveConf.addResource(fs.getConf) + new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable() + true + } + + private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = { + val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig() + hiveSyncConfig.basePath = basePath.toString + hiveSyncConfig.assumeDatePartitioning = + parameters.get(HIVE_ASSUME_DATE_PARTITION_OPT_KEY).exists(r => r.toBoolean) + hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY) + hiveSyncConfig.tableName = parameters(HIVE_TABLE_OPT_KEY) + hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY) + hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY) + hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY) + hiveSyncConfig.partitionFields = + ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).toList: _*) + hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) + hiveSyncConfig + } + +} \ No newline at end of file diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieStreamingSink.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieStreamingSink.scala new file mode 100644 index 000000000..f8109ccd2 --- /dev/null +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieStreamingSink.scala @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ +package com.uber.hoodie + +import com.uber.hoodie.exception.HoodieCorruptedDataException +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.streaming.OutputMode +import org.apache.log4j.LogManager + +import scala.util.{Failure, Success, Try} + +class HoodieStreamingSink(sqlContext: SQLContext, + options: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode) + extends Sink + with Serializable { + @volatile private var latestBatchId = -1L + + private val log = LogManager.getLogger(classOf[HoodieStreamingSink]) + + private val retryCnt = options(DataSourceWriteOptions.STREAMING_RETRY_CNT_OPT_KEY).toInt + private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong + private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY).toBoolean + + private val mode = + if (outputMode == OutputMode.Append()) { + SaveMode.Append + } else { + SaveMode.Overwrite + } + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + retry(retryCnt, retryIntervalMs)( + Try( + HoodieSparkSqlWriter.write( + sqlContext, + mode, + options, + data) + ) match { + case Success((true, commitOps)) => + log.info(s"Micro batch id=$batchId succeeded" + + commitOps.map(commit => s" for commit=$commit").getOrElse(" with no new commits")) + Success((true, commitOps)) + case Failure(e) => + // clean up persist rdds in the write process + data.sparkSession.sparkContext.getPersistentRDDs + .foreach { + case (id, rdd) => + rdd.unpersist() + } + log.error(s"Micro batch id=$batchId threw following expection: ", e) + if (ignoreFailedBatch) { + log.info(s"Ignore the exception and move on streaming as per " + + s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration") + Success((true, None)) + } else { + if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...") + Failure(e) + } + case Success((false, commitOps)) => + log.error(s"Micro batch id=$batchId ended up with errors" + + commitOps.map(commit => s" for commit=$commit").getOrElse("")) + if (ignoreFailedBatch) { + log.info(s"Ignore the errors and move on streaming as per " + + s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration") + Success((true, None)) + } else { + if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...") + Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors")) + } + } + ) match { + case Failure(e) => + if (!ignoreFailedBatch) { + log.error(s"Micro batch id=$batchId threw following expections," + + s"aborting streaming app to avoid data loss: ", e) + // spark sometimes hangs upon exceptions and keep on hold of the executors + // this is to force exit upon errors / exceptions and release all executors + // will require redeployment / supervise mode to restart the streaming + System.exit(1) + } + case Success(_) => + log.info(s"Micro batch id=$batchId succeeded") + } + } + + override def toString: String = s"HoodieStreamingSink[${options("path")}]" + + @annotation.tailrec + private def retry[T](n: Int, waitInMillis: Long)(fn: => Try[T]): Try[T] = { + fn match { + case x: util.Success[T] => x + case _ if n > 1 => + Thread.sleep(waitInMillis) + retry(n - 1, waitInMillis * 2)(fn) + case f => f + } + } +} \ No newline at end of file diff --git a/hoodie-spark/src/test/java/HoodieJavaStreamingApp.java b/hoodie-spark/src/test/java/HoodieJavaStreamingApp.java new file mode 100644 index 000000000..dd5586373 --- /dev/null +++ b/hoodie-spark/src/test/java/HoodieJavaStreamingApp.java @@ -0,0 +1,279 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.uber.hoodie.DataSourceReadOptions; +import com.uber.hoodie.DataSourceWriteOptions; +import com.uber.hoodie.HoodieDataSourceHelpers; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.hive.MultiPartKeysValueExtractor; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.*; +import org.apache.spark.sql.streaming.DataStreamWriter; +import org.apache.spark.sql.streaming.OutputMode; +import org.apache.spark.sql.streaming.ProcessingTime; + +/** + * Sample program that writes & reads hoodie datasets via the Spark datasource streaming + */ +public class HoodieJavaStreamingApp { + + @Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie sample table") + private String tablePath = "file:///tmp/hoodie/streaming/sample-table"; + + @Parameter(names = {"--streaming-source-path", "-ssp"}, description = "path for streaming source file folder") + private String streamingSourcePath = "file:///tmp/hoodie/streaming/source"; + + @Parameter(names = {"--streaming-checkpointing-path", "-scp"}, + description = "path for streaming checking pointing folder") + private String streamingCheckpointingPath = "file:///tmp/hoodie/streaming/checkpoint"; + + @Parameter(names = {"--streaming-duration-in-ms", "-sdm"}, + description = "time in millisecond for the streaming duration") + private Long streamingDurationInMs = 15000L; + + @Parameter(names = {"--table-name", "-n"}, description = "table name for Hoodie sample table") + private String tableName = "hoodie_test"; + + @Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ") + private String tableType = HoodieTableType.MERGE_ON_READ.name(); + + @Parameter(names = {"--hive-sync", "-hv"}, description = "Enable syncing to hive") + private Boolean enableHiveSync = false; + + @Parameter(names = {"--hive-db", "-hd"}, description = "hive database") + private String hiveDB = "default"; + + @Parameter(names = {"--hive-table", "-ht"}, description = "hive table") + private String hiveTable = "hoodie_sample_test"; + + @Parameter(names = {"--hive-user", "-hu"}, description = "hive username") + private String hiveUser = "hive"; + + @Parameter(names = {"--hive-password", "-hp"}, description = "hive password") + private String hivePass = "hive"; + + @Parameter(names = {"--hive-url", "-hl"}, description = "hive JDBC URL") + private String hiveJdbcUrl = "jdbc:hive2://localhost:10000"; + + @Parameter(names = {"--use-multi-partition-keys", "-mp"}, description = "Use Multiple Partition Keys") + private Boolean useMultiPartitionKeys = false; + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + + private static Logger logger = LogManager.getLogger(HoodieJavaStreamingApp.class); + + public static void main(String[] args) throws Exception { + HoodieJavaStreamingApp cli = new HoodieJavaStreamingApp(); + JCommander cmd = new JCommander(cli, args); + + if (cli.help) { + cmd.usage(); + System.exit(1); + } + cli.run(); + } + + /** + * + * @throws Exception + */ + public void run() throws Exception { + // Spark session setup.. + SparkSession spark = SparkSession.builder().appName("Hoodie Spark Streaming APP") + .config("spark.serializer", + "org.apache.spark.serializer.KryoSerializer").master("local[1]") + .getOrCreate(); + JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext()); + + // folder path clean up and creation, preparing the environment + FileSystem fs = FileSystem.get(jssc.hadoopConfiguration()); + fs.delete(new Path(streamingSourcePath), true); + fs.delete(new Path(streamingCheckpointingPath), true); + fs.delete(new Path(tablePath), true); + fs.mkdirs(new Path(streamingSourcePath)); + + // Generator of some records to be loaded in. + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + List records1 = DataSourceTestUtils.convertToStringList( + dataGen.generateInserts("001", 100)); + Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); + + List records2 = DataSourceTestUtils.convertToStringList( + dataGen.generateUpdates("002", 100)); + + Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2)); + + // setup the input for streaming + Dataset streamingInput = spark.readStream().schema(inputDF1.schema()) + .json(streamingSourcePath); + + + // start streaming and showing + ExecutorService executor = Executors.newFixedThreadPool(2); + + // thread for spark strucutured streaming + Future streamFuture = executor.submit(new Callable() { + public Void call() throws Exception { + logger.info("===== Streaming Starting ====="); + stream(streamingInput); + logger.info("===== Streaming Ends ====="); + return null; + } + }); + + // thread for adding data to the streaming source and showing results over time + Future showFuture = executor.submit(new Callable() { + public Void call() throws Exception { + logger.info("===== Showing Starting ====="); + show(spark, fs, inputDF1, inputDF2); + logger.info("===== Showing Ends ====="); + return null; + } + }); + + // let the threads run + streamFuture.get(); + showFuture.get(); + + executor.shutdown(); + } + + /** + * Adding data to the streaming source and showing results over time + * @param spark + * @param fs + * @param inputDF1 + * @param inputDF2 + * @throws Exception + */ + public void show(SparkSession spark, + FileSystem fs, + Dataset inputDF1, + Dataset inputDF2) throws Exception { + inputDF1.write().mode(SaveMode.Append).json(streamingSourcePath); + // wait for spark streaming to process one microbatch + Thread.sleep(3000); + String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + logger.info("First commit at instant time :" + commitInstantTime1); + + inputDF2.write().mode(SaveMode.Append).json(streamingSourcePath); + // wait for spark streaming to process one microbatch + Thread.sleep(3000); + String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + logger.info("Second commit at instant time :" + commitInstantTime1); + + /** + * Read & do some queries + */ + Dataset hoodieROViewDF = spark.read().format("com.uber.hoodie") + // pass any path glob, can include hoodie & non-hoodie + // datasets + .load(tablePath + "/*/*/*/*"); + hoodieROViewDF.registerTempTable("hoodie_ro"); + spark.sql("describe hoodie_ro").show(); + // all trips whose fare was greater than 2. + spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_ro where fare > 2.0") + .show(); + + if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) { + /** + * Consume incrementally, only changes in commit 2 above. Currently only supported for COPY_ON_WRITE TABLE + */ + Dataset hoodieIncViewDF = spark.read().format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), + DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), + commitInstantTime1) // Only changes in write 2 above + .load( + tablePath); // For incremental view, pass in the root/base path of dataset + + logger.info("You will only see records from : " + commitInstantTime2); + hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show(); + } + } + + /** + * Hoodie spark streaming job + * @param streamingInput + * @throws Exception + */ + public void stream(Dataset streamingInput) throws Exception { + + DataStreamWriter writer = streamingInput + .writeStream() + .format("com.uber.hoodie") + .option("hoodie.insert.shuffle.parallelism", "2") + .option("hoodie.upsert.shuffle.parallelism", "2") + .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), tableType) + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") + .option(HoodieWriteConfig.TABLE_NAME, tableName) + .option("checkpointLocation", streamingCheckpointingPath) + .outputMode(OutputMode.Append()); + + updateHiveSyncConfig(writer); + writer + .trigger(new ProcessingTime(500)) + .start(tablePath) + .awaitTermination(streamingDurationInMs); + } + + /** + * Setup configs for syncing to hive + * @param writer + * @return + */ + private DataStreamWriter updateHiveSyncConfig(DataStreamWriter writer) { + if (enableHiveSync) { + logger.info("Enabling Hive sync to " + hiveJdbcUrl); + writer = writer.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), hiveTable) + .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), hiveDB) + .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveJdbcUrl) + .option(DataSourceWriteOptions.HIVE_USER_OPT_KEY(), hiveUser) + .option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(), hivePass) + .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true"); + if (useMultiPartitionKeys) { + writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "year,month,day") + .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), + MultiPartKeysValueExtractor.class.getCanonicalName()); + } else { + writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "dateStr"); + } + } + return writer; + } +} diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala b/hoodie-spark/src/test/scala/DataSourceTest.scala index 1ad42e777..2f34beb4e 100644 --- a/hoodie-spark/src/test/scala/DataSourceTest.scala +++ b/hoodie-spark/src/test/scala/DataSourceTest.scala @@ -20,14 +20,18 @@ import com.uber.hoodie.common.HoodieTestDataGenerator import com.uber.hoodie.common.util.FSUtils import com.uber.hoodie.config.HoodieWriteConfig import com.uber.hoodie.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql._ +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime} import org.junit.Assert._ import org.junit.rules.TemporaryFolder import org.junit.{Before, Test} import org.scalatest.junit.AssertionsForJUnit import scala.collection.JavaConversions._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global /** * Basic tests on the spark datasource @@ -62,7 +66,7 @@ class DataSourceTest extends AssertionsForJUnit { @Test def testCopyOnWriteStorage() { // Insert Operation - val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001", 100)).toList + val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)).toList val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("com.uber.hoodie") .options(commonOpts) @@ -182,4 +186,92 @@ class DataSourceTest extends AssertionsForJUnit { .load(basePath) assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt) } + + @Test def testStructuredStreaming(): Unit = { + fs.delete(new Path(basePath), true) + val sourcePath = basePath + "/source" + val destPath = basePath + "/dest" + fs.mkdirs(new Path(sourcePath)) + + // First chunk of data + val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + + // Second chunk of data + val records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("001", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() + + // define the source of streaming + val streamingInput = + spark.readStream + .schema(inputDF1.schema) + .json(sourcePath) + + val f1 = Future { + println("streaming starting") + //'writeStream' can be called only on streaming Dataset/DataFrame + streamingInput + .writeStream + .format("com.uber.hoodie") + .options(commonOpts) + .trigger(new ProcessingTime(100)) + .option("checkpointLocation", basePath + "/checkpoint") + .outputMode(OutputMode.Append) + .start(destPath) + .awaitTermination(10000) + println("streaming ends") + } + + val f2 = Future { + inputDF1.write.mode(SaveMode.Append).json(sourcePath) + // wait for spark streaming to process one microbatch + Thread.sleep(3000) + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000")) + val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, destPath) + // Read RO View + val hoodieROViewDF1 = spark.read.format("com.uber.hoodie") + .load(destPath + "/*/*/*/*") + assert(hoodieROViewDF1.count() == 100) + + inputDF2.write.mode(SaveMode.Append).json(sourcePath) + // wait for spark streaming to process one microbatch + Thread.sleep(3000) + val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath) + assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size()) + // Read RO View + val hoodieROViewDF2 = spark.read.format("com.uber.hoodie") + .load(destPath + "/*/*/*/*") + assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated + + + // Read Incremental View + // we have 2 commits, try pulling the first commit (which is not the latest) + val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0) + val hoodieIncViewDF1 = spark.read.format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit) + .load(destPath) + assertEquals(100, hoodieIncViewDF1.count()) + // 100 initial inserts must be pulled + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(firstCommit, countsPerCommit(0).get(0)) + + // pull the latest commit + val hoodieIncViewDF2 = spark.read.format("com.uber.hoodie") + .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY, DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(destPath) + + assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime2, countsPerCommit(0).get(0)) + } + + Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf) + + } }