From 89e37d5273ea1c6bf2fe3a8f7053e7a3cc44011d Mon Sep 17 00:00:00 2001 From: Shen Hong Date: Mon, 22 Jun 2020 23:13:28 +0800 Subject: [PATCH] [HUDI-908] Add some data types to HoodieTestDataGenerator and fix some some bugs. (#1690) --- .../hudi/client/TestTableSchemaEvolution.java | 13 ++++--- .../testutils/HoodieTestDataGenerator.java | 34 ++++++++++++++++- .../hudi/avro/MercifulJsonConverter.java | 14 +++++-- .../HoodieRealtimeRecordReaderUtils.java | 3 +- .../apache/hudi/AvroConversionHelper.scala | 6 ++- .../delta-streamer-config/source.avsc | 37 ++++++++++++++++++- .../sql-transformer.properties | 2 +- .../delta-streamer-config/target.avsc | 34 +++++++++++++++++ 8 files changed, 127 insertions(+), 16 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 8e56b3106..6a93e3142 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1; +import static org.apache.hudi.testutils.HoodieTestDataGenerator.EXTRA_TYPE_SCHEMA; import static org.apache.hudi.testutils.HoodieTestDataGenerator.FARE_NESTED_SCHEMA; import static org.apache.hudi.testutils.HoodieTestDataGenerator.MAP_TYPE_SCHEMA; import static org.apache.hudi.testutils.HoodieTestDataGenerator.TIP_NESTED_SCHEMA; @@ -68,19 +69,19 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { "{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},"; // TRIP_EXAMPLE_SCHEMA with a new_field added - public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA - + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX; + public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX; // TRIP_EXAMPLE_SCHEMA with tip field removed - public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA - + TRIP_SCHEMA_SUFFIX; + public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; @Test public void testSchemaCompatibilityBasic() throws Exception { assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA), "Same schema is compatible"); - String reorderedSchema = TRIP_SCHEMA_PREFIX + TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA + String reorderedSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA + MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX; assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema), "Reordered fields are compatible"); @@ -114,7 +115,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED), "Added field with default is compatible (Evolved Schema)"); - String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field") + TRIP_SCHEMA_SUFFIX; assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema), diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java index a6de0f581..0eb88e24d 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java @@ -18,6 +18,9 @@ package org.apache.hudi.testutils; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.generic.GenericFixed; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.fs.FSUtils; @@ -46,7 +49,10 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.sql.Date; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -94,8 +100,16 @@ public class HoodieTestDataGenerator { public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": [" + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},"; public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},"; + public static final String EXTRA_TYPE_SCHEMA = "{\"name\": \"distance_in_meters\", \"type\": \"int\"}," + + "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"}," + + "{\"name\": \"weight\", \"type\": \"float\"}," + + "{\"name\": \"nation\", \"type\": \"bytes\"}," + + "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}}," + + "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}}," + + "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},"; + public static final String TRIP_EXAMPLE_SCHEMA = - TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; + TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; public static final String TRIP_FLATTENED_SCHEMA = TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX; @@ -107,7 +121,7 @@ public class HoodieTestDataGenerator { + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); - public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double," + public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6)," + "map,struct,array>,boolean"; public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); @@ -245,6 +259,22 @@ public class HoodieTestDataGenerator { rec.put("fare", RAND.nextDouble() * 100); rec.put("currency", "USD"); } else { + rec.put("distance_in_meters", RAND.nextInt()); + rec.put("seconds_since_epoch", RAND.nextLong()); + rec.put("weight", RAND.nextFloat()); + byte[] bytes = "Canada".getBytes(); + rec.put("nation", ByteBuffer.wrap(bytes)); + long currentTimeMillis = System.currentTimeMillis(); + Date date = new Date(currentTimeMillis); + rec.put("current_date", (int) date.toLocalDate().toEpochDay()); + rec.put("current_ts", currentTimeMillis); + + BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat())); + Schema decimalSchema = AVRO_SCHEMA.getField("height").schema(); + Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion(); + GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6)); + rec.put("height", genericFixed); + rec.put("city_to_state", Collections.singletonMap("LA", "CA")); GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java index 734c631dd..d759a8deb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java @@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericRecord; import java.io.IOException; import java.io.Serializable; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -236,7 +237,8 @@ public class MercifulJsonConverter { return new JsonToAvroFieldProcessor() { @Override public Pair convert(Object value, String name, Schema schema) { - return Pair.of(true, value.toString().getBytes()); + // Should return ByteBuffer (see GenericData.isBytes()) + return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes())); } }; } @@ -245,10 +247,16 @@ public class MercifulJsonConverter { return new JsonToAvroFieldProcessor() { @Override public Pair convert(Object value, String name, Schema schema) { - byte[] src = value.toString().getBytes(); + // The ObjectMapper use List to represent FixedType + // eg: "decimal_val": [0, 0, 14, -63, -52] will convert to ArrayList + List converval = (List) value; + byte[] src = new byte[converval.size()]; + for (int i = 0; i < converval.size(); i++) { + src[i] = converval.get(i).byteValue(); + } byte[] dst = new byte[schema.getFixedSize()]; System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), src.length)); - return Pair.of(true, dst); + return Pair.of(true, new GenericData.Fixed(schema, dst)); } }; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 6af37709c..cd876b4e4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -44,6 +44,7 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.schema.MessageType; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashSet; @@ -146,7 +147,7 @@ public class HoodieRealtimeRecordReaderUtils { case STRING: return new Text(value.toString()); case BYTES: - return new BytesWritable((byte[]) value); + return new BytesWritable(((ByteBuffer)value).array()); case INT: return new IntWritable((Integer) value); case LONG: diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 69e6376c2..259f51f32 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -110,7 +110,11 @@ object AvroConversionHelper { if (item == null) { null } else { - new Date(item.asInstanceOf[Long]) + if (item.isInstanceOf[Integer]) { + new Date(item.asInstanceOf[Integer].longValue()) + } else { + new Date(item.asInstanceOf[Long]) + } } case (TimestampType, LONG) => (item: AnyRef) => diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc index 8d01820ad..f5cc97f56 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc @@ -43,8 +43,41 @@ }, { "name" : "end_lon", "type" : "double" - }, - { + }, { + "name" : "distance_in_meters", + "type" : "int" + }, { + "name" : "seconds_since_epoch", + "type" : "long" + }, { + "name" : "weight", + "type" : "float" + },{ + "name" : "nation", + "type" : "bytes" + },{ + "name" : "current_date", + "type" : { + "type" : "int", + "logicalType" : "date" + } + },{ + "name" : "current_ts", + "type" : { + "type" : "long", + "logicalType" : "timestamp-micros" + } + },{ + "name" : "height", + "type" : { + "type" : "fixed", + "name" : "abc", + "size" : 5, + "logicalType" : "decimal", + "precision" : 10, + "scale": 6 + } + }, { "name" :"city_to_state", "type" : { "type" : "map", diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties index 569b417f2..dc735e803 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties @@ -16,4 +16,4 @@ # limitations under the License. ### include=base.properties -hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM a +hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM a diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc index 4fbb5c56c..a02610793 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc @@ -43,6 +43,40 @@ }, { "name" : "end_lon", "type" : "double" + }, { + "name" : "distance_in_meters", + "type" : "int" + }, { + "name" : "seconds_since_epoch", + "type" : "long" + }, { + "name" : "weight", + "type" : "float" + }, { + "name" : "nation", + "type" : "bytes" + },{ + "name" : "current_date", + "type" : { + "type" : "int", + "logicalType" : "date" + } + },{ + "name" : "current_ts", + "type" : { + "type" : "long", + "logicalType" : "timestamp-micros" + } + }, { + "name" : "height", + "type" : { + "type" : "fixed", + "name" : "abc", + "size" : 5, + "logicalType" : "decimal", + "precision" : 10, + "scale": 6 + } }, { "name" :"city_to_state", "type" : {