diff --git a/hoodie-spark/pom.xml b/hoodie-spark/pom.xml index 25d1fa7ab..c4869f3b2 100644 --- a/hoodie-spark/pom.xml +++ b/hoodie-spark/pom.xml @@ -130,6 +130,24 @@ org.apache.rat apache-rat-plugin + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + TestSuite.txt + + + + test + + test + + + + diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala index dcf7628bd..c6fbd82e0 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala @@ -18,20 +18,15 @@ package com.uber.hoodie -import java.util.concurrent.ConcurrentHashMap - import com.uber.hoodie.DataSourceReadOptions._ import com.uber.hoodie.exception.HoodieException import org.apache.log4j.LogManager +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} - -import scala.collection.JavaConversions._ -import scala.collection.mutable /** * Hoodie Spark Datasource, for reading and writing hoodie datasets @@ -51,22 +46,12 @@ class DefaultSource extends RelationProvider createRelation(sqlContext, parameters, null) } - /** - * Add default options for unspecified read options keys. - * - * @param parameters - * @return - */ - def parametersWithReadDefaults(parameters: Map[String, String]): mutable.Map[String, String] = { - val defaultsMap = new ConcurrentHashMap[String, String](mapAsJavaMap(parameters)) - defaultsMap.putIfAbsent(VIEW_TYPE_OPT_KEY, DEFAULT_VIEW_TYPE_OPT_VAL) - mapAsScalaMap(defaultsMap) - } - override def createRelation(sqlContext: SQLContext, optParams: Map[String, String], schema: StructType): BaseRelation = { - val parameters = parametersWithReadDefaults(optParams) + // Add default options for unspecified read options keys. + val parameters = Map(VIEW_TYPE_OPT_KEY -> DEFAULT_VIEW_TYPE_OPT_VAL) ++: optParams + val path = parameters.get("path") if (path.isEmpty) { throw new HoodieException("'path' must be specified.") @@ -92,7 +77,7 @@ class DefaultSource extends RelationProvider sparkSession = sqlContext.sparkSession, userSpecifiedSchema = Option(schema), className = "parquet", - options = parameters.toMap) + options = parameters) .resolveRelation() } } @@ -102,7 +87,7 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], df: DataFrame): BaseRelation = { - val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams).toMap + val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams) HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df) createRelation(sqlContext, parameters, df.schema) } @@ -111,7 +96,7 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams).toMap + val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams) new HoodieStreamingSink( sqlContext, parameters, diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala index fe3507e08..4092fe3c1 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala @@ -18,12 +18,8 @@ package com.uber.hoodie import java.util -import java.util.concurrent.ConcurrentHashMap import java.util.Optional -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer - import com.uber.hoodie.DataSourceWriteOptions._ import com.uber.hoodie.common.table.HoodieTableMetaClient import com.uber.hoodie.common.util.{FSUtils, TypedProperties} @@ -36,9 +32,10 @@ import org.apache.hadoop.hive.conf.HiveConf import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} -import scala.collection.mutable +import scala.collection.JavaConversions._ +import scala.collection.mutable.ListBuffer private[hoodie] object HoodieSparkSqlWriter { @@ -98,7 +95,7 @@ private[hoodie] object HoodieSparkSqlWriter { gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) - }).toJavaRDD(); + }).toJavaRDD() val jsc = new JavaSparkContext(sparkContext) @@ -117,7 +114,7 @@ private[hoodie] object HoodieSparkSqlWriter { return (true, None) } - val basePath = new Path(parameters.get("path").get) + val basePath = new Path(parameters("path")) val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(basePath) @@ -172,7 +169,7 @@ private[hoodie] object HoodieSparkSqlWriter { log.info("Commit " + commitTime + " failed!") } - val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).map(r => r.toBoolean).getOrElse(false) + val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) val syncHiveSucess = if (hiveSyncEnabled) { log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) @@ -207,30 +204,29 @@ private[hoodie] object HoodieSparkSqlWriter { * @param parameters * @return */ - def parametersWithWriteDefaults(parameters: Map[String, String]): mutable.Map[String, String] = { - val defaultsMap = new ConcurrentHashMap[String, String](mapAsJavaMap(parameters)) - defaultsMap.putIfAbsent(OPERATION_OPT_KEY, DEFAULT_OPERATION_OPT_VAL) - defaultsMap.putIfAbsent(STORAGE_TYPE_OPT_KEY, DEFAULT_STORAGE_TYPE_OPT_VAL) - defaultsMap.putIfAbsent(PRECOMBINE_FIELD_OPT_KEY, DEFAULT_PRECOMBINE_FIELD_OPT_VAL) - defaultsMap.putIfAbsent(PAYLOAD_CLASS_OPT_KEY, DEFAULT_PAYLOAD_OPT_VAL) - defaultsMap.putIfAbsent(RECORDKEY_FIELD_OPT_KEY, DEFAULT_RECORDKEY_FIELD_OPT_VAL) - defaultsMap.putIfAbsent(PARTITIONPATH_FIELD_OPT_KEY, DEFAULT_PARTITIONPATH_FIELD_OPT_VAL) - defaultsMap.putIfAbsent(KEYGENERATOR_CLASS_OPT_KEY, DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) - defaultsMap.putIfAbsent(COMMIT_METADATA_KEYPREFIX_OPT_KEY, DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL) - defaultsMap.putIfAbsent(INSERT_DROP_DUPS_OPT_KEY, DEFAULT_INSERT_DROP_DUPS_OPT_VAL) - defaultsMap.putIfAbsent(STREAMING_RETRY_CNT_OPT_KEY, DEFAULT_STREAMING_RETRY_CNT_OPT_VAL) - defaultsMap.putIfAbsent(STREAMING_RETRY_INTERVAL_MS_OPT_KEY, DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL) - defaultsMap.putIfAbsent(STREAMING_IGNORE_FAILED_BATCH_OPT_KEY, DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_SYNC_ENABLED_OPT_KEY, DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_DATABASE_OPT_KEY, DEFAULT_HIVE_DATABASE_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_TABLE_OPT_KEY, DEFAULT_HIVE_TABLE_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_USER_OPT_KEY, DEFAULT_HIVE_USER_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_PASS_OPT_KEY, DEFAULT_HIVE_PASS_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_URL_OPT_KEY, DEFAULT_HIVE_URL_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_PARTITION_FIELDS_OPT_KEY, DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL) - defaultsMap.putIfAbsent(HIVE_ASSUME_DATE_PARTITION_OPT_KEY, DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL) - mapAsScalaMap(defaultsMap) + def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = { + Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL, + STORAGE_TYPE_OPT_KEY -> DEFAULT_STORAGE_TYPE_OPT_VAL, + PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL, + PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL, + RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL, + PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL, + KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL, + COMMIT_METADATA_KEYPREFIX_OPT_KEY -> DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL, + INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL, + STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL, + STREAMING_RETRY_INTERVAL_MS_OPT_KEY -> DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL, + STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL, + HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL, + HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL, + HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL, + HIVE_USER_OPT_KEY -> DEFAULT_HIVE_USER_OPT_VAL, + HIVE_PASS_OPT_KEY -> DEFAULT_HIVE_PASS_OPT_VAL, + HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL, + HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL, + HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL, + HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL + ) ++: parameters } def toProperties(params: Map[String, String]): TypedProperties = { diff --git a/hoodie-spark/src/test/scala/com/uber/hoodie/HoodieSparkSqlWriterSuite.scala b/hoodie-spark/src/test/scala/com/uber/hoodie/HoodieSparkSqlWriterSuite.scala new file mode 100644 index 000000000..cd3c21343 --- /dev/null +++ b/hoodie-spark/src/test/scala/com/uber/hoodie/HoodieSparkSqlWriterSuite.scala @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ +package com.uber.hoodie + +import org.scalatest.{FunSuite, Matchers} +import DataSourceWriteOptions._ + +class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { + + test("Parameters With Write Defaults") { + val originals = HoodieSparkSqlWriter.parametersWithWriteDefaults(Map.empty) + val rhsKey = "hoodie.right.hand.side.key" + val rhsVal = "hoodie.right.hand.side.val" + val modifier = Map(OPERATION_OPT_KEY -> INSERT_OPERATION_OPT_VAL, STORAGE_TYPE_OPT_KEY -> MOR_STORAGE_TYPE_OPT_VAL, rhsKey -> rhsVal) + val modified = HoodieSparkSqlWriter.parametersWithWriteDefaults(modifier) + val matcher = (k: String, v: String) => modified(k) should be(v) + + originals foreach { + case (OPERATION_OPT_KEY, _) => matcher(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) + case (STORAGE_TYPE_OPT_KEY, _) => matcher(STORAGE_TYPE_OPT_KEY, MOR_STORAGE_TYPE_OPT_VAL) + case (`rhsKey`, _) => matcher(rhsKey, rhsVal) + case (k, v) => matcher(k, v) + } + } + +}