diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5774c896d..3ba34cb26 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,10 +52,11 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") - val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME) - if (path.isEmpty || tblName.isEmpty) { + val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME) + if (path.isEmpty || tblNameOp.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } + val tblName = tblNameOp.get.trim sparkContext.getConf.getOption("spark.serializer") match { case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") => case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer") @@ -85,7 +86,7 @@ private[hudi] object HoodieSparkSqlWriter { if (exists && mode == SaveMode.Append) { val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName - if (!existingTableName.equals(tblName.get)) { + if (!existingTableName.equals(tblName)) { throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath") } } @@ -93,8 +94,8 @@ private[hudi] object HoodieSparkSqlWriter { val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) = if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { // register classes & schemas - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -129,11 +130,11 @@ private[hudi] object HoodieSparkSqlWriter { // Create the table if not present if (!exists) { HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType, - tblName.get, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) + tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY)) } // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName, mapAsJavaMap(parameters) ) @@ -158,8 +159,8 @@ private[hudi] object HoodieSparkSqlWriter { throw new HoodieException(s"Append is the only save mode applicable for $operation operation") } - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) @@ -175,7 +176,7 @@ private[hudi] object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the delete. val client = DataSourceUtils.createHoodieClient(jsc, - Schema.create(Schema.Type.NULL).toString, path.get, tblName.get, + Schema.create(Schema.Type.NULL).toString, path.get, tblName, mapAsJavaMap(parameters) )