diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 60e059147..087587ab0 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; @@ -46,6 +47,7 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -84,15 +86,18 @@ public class HoodieTestDataGenerator { + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"; public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"}," + "{\"name\": \"currency\", \"type\": \"string\"},"; + public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": [" + + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},"; + public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},"; public static final String TRIP_EXAMPLE_SCHEMA = - TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; + TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; public static final String TRIP_FLATTENED_SCHEMA = TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX; public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double," - + "struct,boolean"; + + "map,struct,array>,boolean"; public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = @@ -194,10 +199,20 @@ public class HoodieTestDataGenerator { rec.put("fare", RAND.nextDouble() * 100); rec.put("currency", "USD"); } else { + rec.put("city_to_state", Collections.singletonMap("LA", "CA")); + GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); fareRecord.put("amount", RAND.nextDouble() * 100); fareRecord.put("currency", "USD"); rec.put("fare", fareRecord); + + GenericArray tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema()); + Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType(); + GenericRecord tipRecord = new GenericData.Record(tipSchema); + tipRecord.put("amount", RAND.nextDouble() * 100); + tipRecord.put("currency", "USD"); + tipHistoryArray.add(tipRecord); + rec.put("tip_history", tipHistoryArray); } if (isDeleteRecord) { diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index b61bef3d8..43225bcab 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -27,7 +27,6 @@ import org.apache.avro.{LogicalTypes, Schema} import org.apache.avro.Schema.Type._ import org.apache.avro.generic.GenericData.{Fixed, Record} import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord} -import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace import org.apache.spark.sql.Row import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.GenericRow @@ -303,7 +302,7 @@ object AvroConversionHelper { avroSchema, elementType, structName, - getNewRecordNamespace(elementType, recordNamespace, structName)) + recordNamespace) (item: Any) => { if (item == null) { null @@ -324,7 +323,7 @@ object AvroConversionHelper { avroSchema, valueType, structName, - getNewRecordNamespace(valueType, recordNamespace, structName)) + recordNamespace) (item: Any) => { if (item == null) { null @@ -338,12 +337,13 @@ object AvroConversionHelper { } case structType: StructType => val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) + val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName val fieldConverters = structType.fields.map(field => createConverterToAvro( avroSchema, field.dataType, field.name, - getNewRecordNamespace(field.dataType, recordNamespace, structName))) + childNameSpace)) (item: Any) => { if (item == null) { null diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index decab9c64..04de1c7f8 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -71,16 +71,6 @@ object AvroConversionUtils { } } - def getNewRecordNamespace(elementDataType: DataType, - currentRecordNamespace: String, - elementName: String): String = { - - elementDataType match { - case StructType(_) => s"$currentRecordNamespace.$elementName" - case _ => currentRecordNamespace - } - } - def convertStructTypeToAvroSchema(structType: StructType, structName: String, recordNamespace: String): Schema = { diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc index a02d48d46..8d01820ad 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc @@ -44,6 +44,13 @@ "name" : "end_lon", "type" : "double" }, + { + "name" :"city_to_state", + "type" : { + "type" : "map", + "values": "string" + } + }, { "name" : "fare", "type" : { @@ -61,6 +68,26 @@ ] } }, + { + "name" : "tip_history", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "tip_history", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + } + }, { "name" : "_hoodie_is_deleted", "type" : "boolean", diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties index e8b285703..569b417f2 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties @@ -16,4 +16,4 @@ # limitations under the License. ### include=base.properties -hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM a +hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM a diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc index b64fe4d59..4fbb5c56c 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc @@ -43,6 +43,12 @@ }, { "name" : "end_lon", "type" : "double" + }, { + "name" :"city_to_state", + "type" : { + "type" : "map", + "values": "string" + } }, { "name" : "fare", "type" : { @@ -60,6 +66,26 @@ ] } }, + { + "name" : "tip_history", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "tip_history", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + } + }, { "name" : "_hoodie_is_deleted", "type" : "boolean",