1
0
Files
hudi/hoodie-spark/src/main/scala/com/uber/hoodie/AvroConversionUtils.scala
Vinoth Chandar 85dd265b7b Improving out of box experience for data source
- Fixes #246
 - Bump up default parallelism to 1500, to handle large upserts
 - Add docs on s3 confuration & tuning tips with tested spark knobs
 - Fix bug to not duplicate hoodie metadata fields when input dataframe is another hoodie dataset
 - Improve speed of ROTablePathFilter by removing directory check
 - Move to spark-avro 4.0 to handle issue with nested fields with same name
 - Keep AvroConversionUtils in sync with spark-avro 4.0
2018-06-10 19:16:44 -07:00

147 lines
5.3 KiB
Scala

/*
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.uber.hoodie
import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}
import java.util
import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
object AvroConversionUtils {
def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val dataType = df.schema
df.rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val convertor = createConverterToAvro(dataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
}
def getNewRecordNamespace(elementDataType: DataType,
currentRecordNamespace: String,
elementName: String): String = {
elementDataType match {
case StructType(_) => s"$currentRecordNamespace.$elementName"
case _ => currentRecordNamespace
}
}
def createConverterToAvro(dataType: DataType,
structName: String,
recordNamespace: String): (Any) => Any = {
dataType match {
case BinaryType => (item: Any) =>
item match {
case null => null
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
}
case ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType | StringType | BooleanType => identity
case _: DecimalType => (item: Any) => if (item == null) null else item.toString
case TimestampType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Timestamp].getTime
case DateType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Date].getTime
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(
elementType,
structName,
getNewRecordNamespace(elementType, recordNamespace, structName))
(item: Any) => {
if (item == null) {
null
} else {
val sourceArray = item.asInstanceOf[Seq[Any]]
val sourceArraySize = sourceArray.size
val targetList = new util.ArrayList[Any](sourceArraySize)
var idx = 0
while (idx < sourceArraySize) {
targetList.add(elementConverter(sourceArray(idx)))
idx += 1
}
targetList
}
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(
valueType,
structName,
getNewRecordNamespace(valueType, recordNamespace, structName))
(item: Any) => {
if (item == null) {
null
} else {
val javaMap = new util.HashMap[String, Any]()
item.asInstanceOf[Map[String, Any]].foreach { case (key, value) =>
javaMap.put(key, valueConverter(value))
}
javaMap
}
}
case structType: StructType =>
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
val schema: Schema = SchemaConverters.convertStructToAvro(
structType, builder, recordNamespace)
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
field.dataType,
field.name,
getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
(item: Any) => {
if (item == null) {
null
} else {
val record = new Record(schema)
val convertersIterator = fieldConverters.iterator
val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
val rowIterator = item.asInstanceOf[Row].toSeq.iterator
while (convertersIterator.hasNext) {
val converter = convertersIterator.next()
record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
}
record
}
}
}
}
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
SchemaConverters.convertStructToAvro(structType, builder, recordNamespace)
}
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType];
}
}