[HUDI-713] Fix conversion of Spark array of struct type to Avro schema (#1406)
Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
|
|||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.GenericArray;
|
||||||
import org.apache.avro.generic.GenericData;
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@@ -46,6 +47,7 @@ import java.io.Serializable;
|
|||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -84,15 +86,18 @@ public class HoodieTestDataGenerator {
|
|||||||
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},";
|
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},";
|
||||||
public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"},"
|
public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"},"
|
||||||
+ "{\"name\": \"currency\", \"type\": \"string\"},";
|
+ "{\"name\": \"currency\", \"type\": \"string\"},";
|
||||||
|
public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": ["
|
||||||
|
+ "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},";
|
||||||
|
public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},";
|
||||||
|
|
||||||
public static final String TRIP_EXAMPLE_SCHEMA =
|
public static final String TRIP_EXAMPLE_SCHEMA =
|
||||||
TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
public static final String TRIP_FLATTENED_SCHEMA =
|
public static final String TRIP_FLATTENED_SCHEMA =
|
||||||
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
|
|
||||||
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
|
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
|
||||||
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
|
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
|
||||||
+ "struct<amount:double,currency:string>,boolean";
|
+ "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
|
||||||
|
|
||||||
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
|
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
|
||||||
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
|
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
|
||||||
@@ -194,10 +199,20 @@ public class HoodieTestDataGenerator {
|
|||||||
rec.put("fare", RAND.nextDouble() * 100);
|
rec.put("fare", RAND.nextDouble() * 100);
|
||||||
rec.put("currency", "USD");
|
rec.put("currency", "USD");
|
||||||
} else {
|
} else {
|
||||||
|
rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
|
||||||
|
|
||||||
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
|
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
|
||||||
fareRecord.put("amount", RAND.nextDouble() * 100);
|
fareRecord.put("amount", RAND.nextDouble() * 100);
|
||||||
fareRecord.put("currency", "USD");
|
fareRecord.put("currency", "USD");
|
||||||
rec.put("fare", fareRecord);
|
rec.put("fare", fareRecord);
|
||||||
|
|
||||||
|
GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema());
|
||||||
|
Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
|
||||||
|
GenericRecord tipRecord = new GenericData.Record(tipSchema);
|
||||||
|
tipRecord.put("amount", RAND.nextDouble() * 100);
|
||||||
|
tipRecord.put("currency", "USD");
|
||||||
|
tipHistoryArray.add(tipRecord);
|
||||||
|
rec.put("tip_history", tipHistoryArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isDeleteRecord) {
|
if (isDeleteRecord) {
|
||||||
|
|||||||
@@ -27,7 +27,6 @@ import org.apache.avro.{LogicalTypes, Schema}
|
|||||||
import org.apache.avro.Schema.Type._
|
import org.apache.avro.Schema.Type._
|
||||||
import org.apache.avro.generic.GenericData.{Fixed, Record}
|
import org.apache.avro.generic.GenericData.{Fixed, Record}
|
||||||
import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
|
import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
|
||||||
import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace
|
|
||||||
import org.apache.spark.sql.Row
|
import org.apache.spark.sql.Row
|
||||||
import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
|
import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
|
||||||
import org.apache.spark.sql.catalyst.expressions.GenericRow
|
import org.apache.spark.sql.catalyst.expressions.GenericRow
|
||||||
@@ -303,7 +302,7 @@ object AvroConversionHelper {
|
|||||||
avroSchema,
|
avroSchema,
|
||||||
elementType,
|
elementType,
|
||||||
structName,
|
structName,
|
||||||
getNewRecordNamespace(elementType, recordNamespace, structName))
|
recordNamespace)
|
||||||
(item: Any) => {
|
(item: Any) => {
|
||||||
if (item == null) {
|
if (item == null) {
|
||||||
null
|
null
|
||||||
@@ -324,7 +323,7 @@ object AvroConversionHelper {
|
|||||||
avroSchema,
|
avroSchema,
|
||||||
valueType,
|
valueType,
|
||||||
structName,
|
structName,
|
||||||
getNewRecordNamespace(valueType, recordNamespace, structName))
|
recordNamespace)
|
||||||
(item: Any) => {
|
(item: Any) => {
|
||||||
if (item == null) {
|
if (item == null) {
|
||||||
null
|
null
|
||||||
@@ -338,12 +337,13 @@ object AvroConversionHelper {
|
|||||||
}
|
}
|
||||||
case structType: StructType =>
|
case structType: StructType =>
|
||||||
val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
|
val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
|
||||||
|
val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName
|
||||||
val fieldConverters = structType.fields.map(field =>
|
val fieldConverters = structType.fields.map(field =>
|
||||||
createConverterToAvro(
|
createConverterToAvro(
|
||||||
avroSchema,
|
avroSchema,
|
||||||
field.dataType,
|
field.dataType,
|
||||||
field.name,
|
field.name,
|
||||||
getNewRecordNamespace(field.dataType, recordNamespace, structName)))
|
childNameSpace))
|
||||||
(item: Any) => {
|
(item: Any) => {
|
||||||
if (item == null) {
|
if (item == null) {
|
||||||
null
|
null
|
||||||
|
|||||||
@@ -71,16 +71,6 @@ object AvroConversionUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def getNewRecordNamespace(elementDataType: DataType,
|
|
||||||
currentRecordNamespace: String,
|
|
||||||
elementName: String): String = {
|
|
||||||
|
|
||||||
elementDataType match {
|
|
||||||
case StructType(_) => s"$currentRecordNamespace.$elementName"
|
|
||||||
case _ => currentRecordNamespace
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def convertStructTypeToAvroSchema(structType: StructType,
|
def convertStructTypeToAvroSchema(structType: StructType,
|
||||||
structName: String,
|
structName: String,
|
||||||
recordNamespace: String): Schema = {
|
recordNamespace: String): Schema = {
|
||||||
|
|||||||
@@ -44,6 +44,13 @@
|
|||||||
"name" : "end_lon",
|
"name" : "end_lon",
|
||||||
"type" : "double"
|
"type" : "double"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name" :"city_to_state",
|
||||||
|
"type" : {
|
||||||
|
"type" : "map",
|
||||||
|
"values": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name" : "fare",
|
"name" : "fare",
|
||||||
"type" : {
|
"type" : {
|
||||||
@@ -61,6 +68,26 @@
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name" : "tip_history",
|
||||||
|
"type" : {
|
||||||
|
"type" : "array",
|
||||||
|
"items" : {
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "tip_history",
|
||||||
|
"fields" : [
|
||||||
|
{
|
||||||
|
"name" : "amount",
|
||||||
|
"type" : "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name" : "currency",
|
||||||
|
"type" : "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name" : "_hoodie_is_deleted",
|
"name" : "_hoodie_is_deleted",
|
||||||
"type" : "boolean",
|
"type" : "boolean",
|
||||||
|
|||||||
@@ -16,4 +16,4 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
###
|
###
|
||||||
include=base.properties
|
include=base.properties
|
||||||
hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
|
hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
|
||||||
|
|||||||
@@ -43,6 +43,12 @@
|
|||||||
}, {
|
}, {
|
||||||
"name" : "end_lon",
|
"name" : "end_lon",
|
||||||
"type" : "double"
|
"type" : "double"
|
||||||
|
}, {
|
||||||
|
"name" :"city_to_state",
|
||||||
|
"type" : {
|
||||||
|
"type" : "map",
|
||||||
|
"values": "string"
|
||||||
|
}
|
||||||
}, {
|
}, {
|
||||||
"name" : "fare",
|
"name" : "fare",
|
||||||
"type" : {
|
"type" : {
|
||||||
@@ -60,6 +66,26 @@
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name" : "tip_history",
|
||||||
|
"type" : {
|
||||||
|
"type" : "array",
|
||||||
|
"items" : {
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "tip_history",
|
||||||
|
"fields" : [
|
||||||
|
{
|
||||||
|
"name" : "amount",
|
||||||
|
"type" : "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name" : "currency",
|
||||||
|
"type" : "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name" : "_hoodie_is_deleted",
|
"name" : "_hoodie_is_deleted",
|
||||||
"type" : "boolean",
|
"type" : "boolean",
|
||||||
|
|||||||
Reference in New Issue
Block a user