1
0

[HUDI-91][HUDI-12]Migrate to spark 2.4.4, migrate to spark-avro library instead of databricks-avro, add support for Decimal/Date types

- Upgrade Spark to 2.4.4, Parquet to 1.10.1, Avro to 1.8.2
- Remove spark-avro from hudi-spark-bundle. Users need to provide --packages org.apache.spark:spark-avro:2.4.4 when running spark-shell or spark-submit
- Replace com.databricks:spark-avro with org.apache.spark:spark-avro
- Shade avro in hudi-hadoop-mr-bundle to make sure it does not conflict with hive's avro version.
This commit is contained in:
Udit Mehrotra
2020-01-12 15:03:11 -08:00
committed by Balaji Varadarajan
parent d9675c4ec0
commit ad50008a59
11 changed files with 128 additions and 79 deletions

View File

@@ -1,13 +1,12 @@
/*
* This code is copied from com.databricks:spark-avro with following license
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Copyright 2014 Databricks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,14 +21,15 @@ import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}
import java.util
import com.databricks.spark.avro.SchemaConverters
import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{Fixed, Record}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace
import org.apache.spark.sql.Row
import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._
@@ -37,6 +37,16 @@ import scala.collection.JavaConverters._
object AvroConversionHelper {
private def createDecimal(decimal: java.math.BigDecimal, precision: Int, scale: Int): Decimal = {
if (precision <= Decimal.MAX_LONG_DIGITS) {
// Constructs a `Decimal` with an unscaled `Long` value if possible.
Decimal(decimal.unscaledValue().longValue(), precision, scale)
} else {
// Otherwise, resorts to an unscaled `BigInteger` instead.
Decimal(decimal, precision, scale)
}
}
/**
*
* Returns a converter function to convert row in avro format to GenericRow of catalyst.
@@ -76,7 +86,50 @@ object AvroConversionHelper {
byteBuffer.get(bytes)
bytes
}
case (d: DecimalType, FIXED) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
val decimalConversion = new DecimalConversion
val bigDecimal = decimalConversion.fromFixed(item.asInstanceOf[GenericFixed], avroSchema,
LogicalTypes.decimal(d.precision, d.scale))
createDecimal(bigDecimal, d.precision, d.scale)
}
case (d: DecimalType, BYTES) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
val decimalConversion = new DecimalConversion
val bigDecimal = decimalConversion.fromBytes(item.asInstanceOf[ByteBuffer], avroSchema,
LogicalTypes.decimal(d.precision, d.scale))
createDecimal(bigDecimal, d.precision, d.scale)
}
case (DateType, INT) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
new Date(item.asInstanceOf[Long])
}
case (TimestampType, LONG) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
avroSchema.getLogicalType match {
case _: TimestampMillis =>
new Timestamp(item.asInstanceOf[Long])
case _: TimestampMicros =>
new Timestamp(item.asInstanceOf[Long] / 1000)
case null =>
new Timestamp(item.asInstanceOf[Long])
case other =>
throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}
}
case (struct: StructType, RECORD) =>
val length = struct.fields.length
val converters = new Array[AnyRef => AnyRef](length)
@@ -216,7 +269,8 @@ object AvroConversionHelper {
createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
}
def createConverterToAvro(dataType: DataType,
def createConverterToAvro(avroSchema: Schema,
dataType: DataType,
structName: String,
recordNamespace: String): Any => Any = {
dataType match {
@@ -231,13 +285,22 @@ object AvroConversionHelper {
if (item == null) null else item.asInstanceOf[Byte].intValue
case ShortType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Short].intValue
case _: DecimalType => (item: Any) => if (item == null) null else item.toString
case dec: DecimalType => (item: Any) =>
Option(item).map { i =>
val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
val decimalConversions = new DecimalConversion()
decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0),
LogicalTypes.decimal(dec.precision, dec.scale))
}.orNull
case TimestampType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Timestamp].getTime
// Convert time to microseconds since spark-avro by default converts TimestampType to
// Avro Logical TimestampMicros
Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull
case DateType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Date].getTime
Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(
avroSchema,
elementType,
structName,
getNewRecordNamespace(elementType, recordNamespace, structName))
@@ -258,6 +321,7 @@ object AvroConversionHelper {
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(
avroSchema,
valueType,
structName,
getNewRecordNamespace(valueType, recordNamespace, structName))
@@ -273,11 +337,10 @@ object AvroConversionHelper {
}
}
case structType: StructType =>
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
val schema: Schema = SchemaConverters.convertStructToAvro(
structType, builder, recordNamespace)
val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
avroSchema,
field.dataType,
field.name,
getNewRecordNamespace(field.dataType, recordNamespace, field.name)))

View File

@@ -17,11 +17,11 @@
package org.apache.hudi
import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.hudi.common.model.HoodieKey
import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
@@ -30,13 +30,20 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object AvroConversionUtils {
def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema.toString, structName, recordNamespace)
}
def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
val dataType = df.schema
val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
val convertor = AvroConversionHelper.createConverterToAvro(avroSchema, dataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
@@ -75,11 +82,10 @@ object AvroConversionUtils {
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
SchemaConverters.convertStructToAvro(structType, builder, recordNamespace)
SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
}
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType];
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}
}