Fix conversion of Spark struct type to Avro schema
cr https://code.amazon.com/reviews/CR-17184364
This commit is contained in:
committed by
Bhavani Sudha Saktheeswaran
parent
fd8f1c70c0
commit
2bb0c21a3d
@@ -79,10 +79,12 @@ public class HoodieTestDataGenerator {
|
|||||||
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
|
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
|
||||||
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
|
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
|
||||||
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
|
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
|
||||||
+ "{\"name\":\"fare\",\"type\": \"double\"},"
|
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
|
||||||
|
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"
|
||||||
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
|
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
|
||||||
public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
|
public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
|
||||||
public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double,boolean";
|
public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
|
||||||
|
+ "struct<amount:double,currency:string>,boolean";
|
||||||
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
|
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
|
||||||
public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);
|
public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);
|
||||||
|
|
||||||
@@ -152,7 +154,12 @@ public class HoodieTestDataGenerator {
|
|||||||
rec.put("begin_lon", rand.nextDouble());
|
rec.put("begin_lon", rand.nextDouble());
|
||||||
rec.put("end_lat", rand.nextDouble());
|
rec.put("end_lat", rand.nextDouble());
|
||||||
rec.put("end_lon", rand.nextDouble());
|
rec.put("end_lon", rand.nextDouble());
|
||||||
rec.put("fare", rand.nextDouble() * 100);
|
|
||||||
|
GenericRecord fareRecord = new GenericData.Record(avroSchema.getField("fare").schema());
|
||||||
|
fareRecord.put("amount", rand.nextDouble() * 100);
|
||||||
|
fareRecord.put("currency", "USD");
|
||||||
|
rec.put("fare", fareRecord);
|
||||||
|
|
||||||
if (isDeleteRecord) {
|
if (isDeleteRecord) {
|
||||||
rec.put("_hoodie_is_deleted", true);
|
rec.put("_hoodie_is_deleted", true);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -343,7 +343,7 @@ object AvroConversionHelper {
|
|||||||
avroSchema,
|
avroSchema,
|
||||||
field.dataType,
|
field.dataType,
|
||||||
field.name,
|
field.name,
|
||||||
getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
|
getNewRecordNamespace(field.dataType, recordNamespace, structName)))
|
||||||
(item: Any) => {
|
(item: Any) => {
|
||||||
if (item == null) {
|
if (item == null) {
|
||||||
null
|
null
|
||||||
|
|||||||
@@ -34,7 +34,9 @@ public class DataSourceTestUtils {
|
|||||||
try {
|
try {
|
||||||
String str = ((TestRawTripPayload) record.getData()).getJsonData();
|
String str = ((TestRawTripPayload) record.getData()).getJsonData();
|
||||||
str = "{" + str.substring(str.indexOf("\"timestamp\":"));
|
str = "{" + str.substring(str.indexOf("\"timestamp\":"));
|
||||||
return Option.of(str.replaceAll("}", ", \"partition\": \"" + record.getPartitionPath() + "\"}"));
|
// Remove the last } bracket
|
||||||
|
str = str.substring(0, str.length() - 1);
|
||||||
|
return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() + "\"}");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
return Option.empty();
|
return Option.empty();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -212,8 +212,8 @@ public class HoodieJavaApp {
|
|||||||
.load(tablePath + (nonPartitionedTable ? "/*" : "/*/*/*/*"));
|
.load(tablePath + (nonPartitionedTable ? "/*" : "/*/*/*/*"));
|
||||||
hoodieROViewDF.registerTempTable("hoodie_ro");
|
hoodieROViewDF.registerTempTable("hoodie_ro");
|
||||||
spark.sql("describe hoodie_ro").show();
|
spark.sql("describe hoodie_ro").show();
|
||||||
// all trips whose fare was greater than 2.
|
// all trips whose fare amount was greater than 2.
|
||||||
spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_ro where fare > 2.0").show();
|
spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show();
|
||||||
|
|
||||||
if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
|
if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -195,8 +195,8 @@ public class HoodieJavaStreamingApp {
|
|||||||
.load(tablePath + "/*/*/*/*");
|
.load(tablePath + "/*/*/*/*");
|
||||||
hoodieROViewDF.registerTempTable("hoodie_ro");
|
hoodieROViewDF.registerTempTable("hoodie_ro");
|
||||||
spark.sql("describe hoodie_ro").show();
|
spark.sql("describe hoodie_ro").show();
|
||||||
// all trips whose fare was greater than 2.
|
// all trips whose fare amount was greater than 2.
|
||||||
spark.sql("select fare, begin_lon, begin_lat, timestamp from hoodie_ro where fare > 2.0").show();
|
spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show();
|
||||||
|
|
||||||
if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
|
if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -46,7 +46,20 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name" : "fare",
|
"name" : "fare",
|
||||||
"type" : "double"
|
"type" : {
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "fare",
|
||||||
|
"fields" : [
|
||||||
|
{
|
||||||
|
"name" : "amount",
|
||||||
|
"type" : "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name" : "currency",
|
||||||
|
"type" : "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name" : "_hoodie_is_deleted",
|
"name" : "_hoodie_is_deleted",
|
||||||
|
|||||||
@@ -45,7 +45,20 @@
|
|||||||
"type" : "double"
|
"type" : "double"
|
||||||
}, {
|
}, {
|
||||||
"name" : "fare",
|
"name" : "fare",
|
||||||
"type" : "double"
|
"type" : {
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "fare",
|
||||||
|
"fields" : [
|
||||||
|
{
|
||||||
|
"name" : "amount",
|
||||||
|
"type" : "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name" : "currency",
|
||||||
|
"type" : "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name" : "_hoodie_is_deleted",
|
"name" : "_hoodie_is_deleted",
|
||||||
|
|||||||
Reference in New Issue
Block a user