diff --git a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala index 4092fe3c1..e7ae8e068 100644 --- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala +++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala @@ -52,11 +52,10 @@ private[hoodie] object HoodieSparkSqlWriter { if (path.isEmpty || tblName.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } - val serializer = sparkContext.getConf.get("spark.serializer") - if (!serializer.equals("org.apache.spark.serializer.KryoSerializer")) { - throw new HoodieException(s"${serializer} serialization is not supported by hoodie. Please use kryo.") + sparkContext.getConf.getOption("spark.serializer") match { + case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => + case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") } - val storageType = parameters(STORAGE_TYPE_OPT_KEY) val operation = // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true diff --git a/hoodie-spark/src/test/scala/com/uber/hoodie/HoodieSparkSqlWriterSuite.scala b/hoodie-spark/src/test/scala/com/uber/hoodie/HoodieSparkSqlWriterSuite.scala index cd3c21343..eb528dce8 100644 --- a/hoodie-spark/src/test/scala/com/uber/hoodie/HoodieSparkSqlWriterSuite.scala +++ b/hoodie-spark/src/test/scala/com/uber/hoodie/HoodieSparkSqlWriterSuite.scala @@ -19,6 +19,9 @@ package com.uber.hoodie import org.scalatest.{FunSuite, Matchers} import DataSourceWriteOptions._ +import com.uber.hoodie.config.HoodieWriteConfig +import com.uber.hoodie.exception.HoodieException +import org.apache.spark.sql.{SaveMode, SparkSession} class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { @@ -38,4 +41,12 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } + test("throw hoodie exception when invalid serializer") { + val session = SparkSession.builder().appName("hoodie_test").master("local").getOrCreate() + val sqlContext = session.sqlContext + val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl") + val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, session.emptyDataFrame)) + assert(e.getMessage.contains("spark.serializer")) + } + }