diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index ddeec6a36..d484b60b9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -79,10 +79,12 @@ public class HoodieTestDataGenerator { + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," - + "{\"name\":\"fare\",\"type\": \"double\"}," + + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [" + + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}}," + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"; public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); - public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double,boolean"; + public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double," + + "struct,boolean"; public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema); @@ -152,7 +154,12 @@ public class HoodieTestDataGenerator { rec.put("begin_lon", rand.nextDouble()); rec.put("end_lat", rand.nextDouble()); rec.put("end_lon", rand.nextDouble()); - rec.put("fare", rand.nextDouble() * 100); + + GenericRecord fareRecord = new GenericData.Record(avroSchema.getField("fare").schema()); + fareRecord.put("amount", rand.nextDouble() * 100); + fareRecord.put("currency", "USD"); + rec.put("fare", fareRecord); + if (isDeleteRecord) { rec.put("_hoodie_is_deleted", true); } else { diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 8244fc327..e1a7ae18f 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -343,7 +343,7 @@ object AvroConversionHelper { avroSchema, field.dataType, field.name, - getNewRecordNamespace(field.dataType, recordNamespace, field.name))) + getNewRecordNamespace(field.dataType, recordNamespace, structName))) (item: Any) => { if (item == null) { null diff --git a/hudi-spark/src/test/java/DataSourceTestUtils.java b/hudi-spark/src/test/java/DataSourceTestUtils.java index 6becaed7c..036e6c221 100644 --- a/hudi-spark/src/test/java/DataSourceTestUtils.java +++ b/hudi-spark/src/test/java/DataSourceTestUtils.java @@ -34,7 +34,9 @@ public class DataSourceTestUtils { try { String str = ((TestRawTripPayload) record.getData()).getJsonData(); str = "{" + str.substring(str.indexOf("\"timestamp\":")); - return Option.of(str.replaceAll("}", ", \"partition\": \"" + record.getPartitionPath() + "\"}")); + // Remove the last } bracket + str = str.substring(0, str.length() - 1); + return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() + "\"}"); } catch (IOException e) { return Option.empty(); } diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java index 70324f9f1..f9499867a 100644 --- a/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark/src/test/java/HoodieJavaApp.java @@ -212,8 +212,8 @@ public class HoodieJavaApp { .load(tablePath + (nonPartitionedTable ? "/*" : "/*/*/*/*")); hoodieROViewDF.registerTempTable("hoodie_ro"); spark.sql("describe hoodie_ro").show(); - // all trips whose fare was greater than 2. - spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_ro where fare > 2.0").show(); + // all trips whose fare amount was greater than 2. + spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show(); if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) { /** diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index af9c9f440..f7921dab3 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -195,8 +195,8 @@ public class HoodieJavaStreamingApp { .load(tablePath + "/*/*/*/*"); hoodieROViewDF.registerTempTable("hoodie_ro"); spark.sql("describe hoodie_ro").show(); - // all trips whose fare was greater than 2. - spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_ro where fare > 2.0").show(); + // all trips whose fare amount was greater than 2. + spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show(); if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) { /** diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc index 95757a3e1..a02d48d46 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc @@ -46,7 +46,20 @@ }, { "name" : "fare", - "type" : "double" + "type" : { + "type" : "record", + "name" : "fare", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } }, { "name" : "_hoodie_is_deleted", diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc index 38e72556f..b64fe4d59 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc @@ -45,7 +45,20 @@ "type" : "double" }, { "name" : "fare", - "type" : "double" + "type" : { + "type" : "record", + "name" : "fare", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } }, { "name" : "_hoodie_is_deleted",