[HUDI-908] Add some data types to HoodieTestDataGenerator and fix some some bugs. (#1690)
This commit is contained in:
@@ -46,6 +46,7 @@ import java.util.List;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1;
|
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1;
|
||||||
|
import static org.apache.hudi.testutils.HoodieTestDataGenerator.EXTRA_TYPE_SCHEMA;
|
||||||
import static org.apache.hudi.testutils.HoodieTestDataGenerator.FARE_NESTED_SCHEMA;
|
import static org.apache.hudi.testutils.HoodieTestDataGenerator.FARE_NESTED_SCHEMA;
|
||||||
import static org.apache.hudi.testutils.HoodieTestDataGenerator.MAP_TYPE_SCHEMA;
|
import static org.apache.hudi.testutils.HoodieTestDataGenerator.MAP_TYPE_SCHEMA;
|
||||||
import static org.apache.hudi.testutils.HoodieTestDataGenerator.TIP_NESTED_SCHEMA;
|
import static org.apache.hudi.testutils.HoodieTestDataGenerator.TIP_NESTED_SCHEMA;
|
||||||
@@ -68,19 +69,19 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
|||||||
"{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},";
|
"{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},";
|
||||||
|
|
||||||
// TRIP_EXAMPLE_SCHEMA with a new_field added
|
// TRIP_EXAMPLE_SCHEMA with a new_field added
|
||||||
public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
|
public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
|
||||||
+ TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
|
|
||||||
// TRIP_EXAMPLE_SCHEMA with tip field removed
|
// TRIP_EXAMPLE_SCHEMA with tip field removed
|
||||||
public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
|
public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
|
||||||
+ TRIP_SCHEMA_SUFFIX;
|
+ FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSchemaCompatibilityBasic() throws Exception {
|
public void testSchemaCompatibilityBasic() throws Exception {
|
||||||
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA),
|
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA),
|
||||||
"Same schema is compatible");
|
"Same schema is compatible");
|
||||||
|
|
||||||
String reorderedSchema = TRIP_SCHEMA_PREFIX + TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA
|
String reorderedSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA
|
||||||
+ MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
+ MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema),
|
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema),
|
||||||
"Reordered fields are compatible");
|
"Reordered fields are compatible");
|
||||||
@@ -114,7 +115,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
|||||||
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED),
|
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED),
|
||||||
"Added field with default is compatible (Evolved Schema)");
|
"Added field with default is compatible (Evolved Schema)");
|
||||||
|
|
||||||
String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
|
String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
|
||||||
+ TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
|
+ TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
|
||||||
+ TRIP_SCHEMA_SUFFIX;
|
+ TRIP_SCHEMA_SUFFIX;
|
||||||
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema),
|
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema),
|
||||||
|
|||||||
@@ -18,6 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hudi.testutils;
|
package org.apache.hudi.testutils;
|
||||||
|
|
||||||
|
import org.apache.avro.Conversions;
|
||||||
|
import org.apache.avro.LogicalTypes;
|
||||||
|
import org.apache.avro.generic.GenericFixed;
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
@@ -46,7 +49,10 @@ import org.apache.log4j.Logger;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.sql.Date;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -94,8 +100,16 @@ public class HoodieTestDataGenerator {
|
|||||||
public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": ["
|
public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": ["
|
||||||
+ "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},";
|
+ "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},";
|
||||||
public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},";
|
public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},";
|
||||||
|
public static final String EXTRA_TYPE_SCHEMA = "{\"name\": \"distance_in_meters\", \"type\": \"int\"},"
|
||||||
|
+ "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},"
|
||||||
|
+ "{\"name\": \"weight\", \"type\": \"float\"},"
|
||||||
|
+ "{\"name\": \"nation\", \"type\": \"bytes\"},"
|
||||||
|
+ "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},"
|
||||||
|
+ "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}},"
|
||||||
|
+ "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
|
||||||
|
|
||||||
public static final String TRIP_EXAMPLE_SCHEMA =
|
public static final String TRIP_EXAMPLE_SCHEMA =
|
||||||
TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
public static final String TRIP_FLATTENED_SCHEMA =
|
public static final String TRIP_FLATTENED_SCHEMA =
|
||||||
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
|
|
||||||
@@ -107,7 +121,7 @@ public class HoodieTestDataGenerator {
|
|||||||
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
|
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
|
||||||
|
|
||||||
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
|
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
|
||||||
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
|
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
|
||||||
+ "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
|
+ "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
|
||||||
|
|
||||||
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
|
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
|
||||||
@@ -245,6 +259,22 @@ public class HoodieTestDataGenerator {
|
|||||||
rec.put("fare", RAND.nextDouble() * 100);
|
rec.put("fare", RAND.nextDouble() * 100);
|
||||||
rec.put("currency", "USD");
|
rec.put("currency", "USD");
|
||||||
} else {
|
} else {
|
||||||
|
rec.put("distance_in_meters", RAND.nextInt());
|
||||||
|
rec.put("seconds_since_epoch", RAND.nextLong());
|
||||||
|
rec.put("weight", RAND.nextFloat());
|
||||||
|
byte[] bytes = "Canada".getBytes();
|
||||||
|
rec.put("nation", ByteBuffer.wrap(bytes));
|
||||||
|
long currentTimeMillis = System.currentTimeMillis();
|
||||||
|
Date date = new Date(currentTimeMillis);
|
||||||
|
rec.put("current_date", (int) date.toLocalDate().toEpochDay());
|
||||||
|
rec.put("current_ts", currentTimeMillis);
|
||||||
|
|
||||||
|
BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat()));
|
||||||
|
Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
|
||||||
|
Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
|
||||||
|
GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6));
|
||||||
|
rec.put("height", genericFixed);
|
||||||
|
|
||||||
rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
|
rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
|
||||||
|
|
||||||
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
|
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericRecord;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@@ -236,7 +237,8 @@ public class MercifulJsonConverter {
|
|||||||
return new JsonToAvroFieldProcessor() {
|
return new JsonToAvroFieldProcessor() {
|
||||||
@Override
|
@Override
|
||||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
|
public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
|
||||||
return Pair.of(true, value.toString().getBytes());
|
// Should return ByteBuffer (see GenericData.isBytes())
|
||||||
|
return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes()));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -245,10 +247,16 @@ public class MercifulJsonConverter {
|
|||||||
return new JsonToAvroFieldProcessor() {
|
return new JsonToAvroFieldProcessor() {
|
||||||
@Override
|
@Override
|
||||||
public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
|
public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
|
||||||
byte[] src = value.toString().getBytes();
|
// The ObjectMapper use List to represent FixedType
|
||||||
|
// eg: "decimal_val": [0, 0, 14, -63, -52] will convert to ArrayList<Integer>
|
||||||
|
List<Integer> converval = (List<Integer>) value;
|
||||||
|
byte[] src = new byte[converval.size()];
|
||||||
|
for (int i = 0; i < converval.size(); i++) {
|
||||||
|
src[i] = converval.get(i).byteValue();
|
||||||
|
}
|
||||||
byte[] dst = new byte[schema.getFixedSize()];
|
byte[] dst = new byte[schema.getFixedSize()];
|
||||||
System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), src.length));
|
System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), src.length));
|
||||||
return Pair.of(true, dst);
|
return Pair.of(true, new GenericData.Fixed(schema, dst));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ import org.apache.parquet.hadoop.ParquetFileReader;
|
|||||||
import org.apache.parquet.schema.MessageType;
|
import org.apache.parquet.schema.MessageType;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
@@ -146,7 +147,7 @@ public class HoodieRealtimeRecordReaderUtils {
|
|||||||
case STRING:
|
case STRING:
|
||||||
return new Text(value.toString());
|
return new Text(value.toString());
|
||||||
case BYTES:
|
case BYTES:
|
||||||
return new BytesWritable((byte[]) value);
|
return new BytesWritable(((ByteBuffer)value).array());
|
||||||
case INT:
|
case INT:
|
||||||
return new IntWritable((Integer) value);
|
return new IntWritable((Integer) value);
|
||||||
case LONG:
|
case LONG:
|
||||||
|
|||||||
@@ -109,9 +109,13 @@ object AvroConversionHelper {
|
|||||||
(item: AnyRef) =>
|
(item: AnyRef) =>
|
||||||
if (item == null) {
|
if (item == null) {
|
||||||
null
|
null
|
||||||
|
} else {
|
||||||
|
if (item.isInstanceOf[Integer]) {
|
||||||
|
new Date(item.asInstanceOf[Integer].longValue())
|
||||||
} else {
|
} else {
|
||||||
new Date(item.asInstanceOf[Long])
|
new Date(item.asInstanceOf[Long])
|
||||||
}
|
}
|
||||||
|
}
|
||||||
case (TimestampType, LONG) =>
|
case (TimestampType, LONG) =>
|
||||||
(item: AnyRef) =>
|
(item: AnyRef) =>
|
||||||
if (item == null) {
|
if (item == null) {
|
||||||
|
|||||||
@@ -43,8 +43,41 @@
|
|||||||
}, {
|
}, {
|
||||||
"name" : "end_lon",
|
"name" : "end_lon",
|
||||||
"type" : "double"
|
"type" : "double"
|
||||||
},
|
}, {
|
||||||
{
|
"name" : "distance_in_meters",
|
||||||
|
"type" : "int"
|
||||||
|
}, {
|
||||||
|
"name" : "seconds_since_epoch",
|
||||||
|
"type" : "long"
|
||||||
|
}, {
|
||||||
|
"name" : "weight",
|
||||||
|
"type" : "float"
|
||||||
|
},{
|
||||||
|
"name" : "nation",
|
||||||
|
"type" : "bytes"
|
||||||
|
},{
|
||||||
|
"name" : "current_date",
|
||||||
|
"type" : {
|
||||||
|
"type" : "int",
|
||||||
|
"logicalType" : "date"
|
||||||
|
}
|
||||||
|
},{
|
||||||
|
"name" : "current_ts",
|
||||||
|
"type" : {
|
||||||
|
"type" : "long",
|
||||||
|
"logicalType" : "timestamp-micros"
|
||||||
|
}
|
||||||
|
},{
|
||||||
|
"name" : "height",
|
||||||
|
"type" : {
|
||||||
|
"type" : "fixed",
|
||||||
|
"name" : "abc",
|
||||||
|
"size" : 5,
|
||||||
|
"logicalType" : "decimal",
|
||||||
|
"precision" : 10,
|
||||||
|
"scale": 6
|
||||||
|
}
|
||||||
|
}, {
|
||||||
"name" :"city_to_state",
|
"name" :"city_to_state",
|
||||||
"type" : {
|
"type" : {
|
||||||
"type" : "map",
|
"type" : "map",
|
||||||
|
|||||||
@@ -16,4 +16,4 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
###
|
###
|
||||||
include=base.properties
|
include=base.properties
|
||||||
hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
|
hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
|
||||||
|
|||||||
@@ -43,6 +43,40 @@
|
|||||||
}, {
|
}, {
|
||||||
"name" : "end_lon",
|
"name" : "end_lon",
|
||||||
"type" : "double"
|
"type" : "double"
|
||||||
|
}, {
|
||||||
|
"name" : "distance_in_meters",
|
||||||
|
"type" : "int"
|
||||||
|
}, {
|
||||||
|
"name" : "seconds_since_epoch",
|
||||||
|
"type" : "long"
|
||||||
|
}, {
|
||||||
|
"name" : "weight",
|
||||||
|
"type" : "float"
|
||||||
|
}, {
|
||||||
|
"name" : "nation",
|
||||||
|
"type" : "bytes"
|
||||||
|
},{
|
||||||
|
"name" : "current_date",
|
||||||
|
"type" : {
|
||||||
|
"type" : "int",
|
||||||
|
"logicalType" : "date"
|
||||||
|
}
|
||||||
|
},{
|
||||||
|
"name" : "current_ts",
|
||||||
|
"type" : {
|
||||||
|
"type" : "long",
|
||||||
|
"logicalType" : "timestamp-micros"
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
"name" : "height",
|
||||||
|
"type" : {
|
||||||
|
"type" : "fixed",
|
||||||
|
"name" : "abc",
|
||||||
|
"size" : 5,
|
||||||
|
"logicalType" : "decimal",
|
||||||
|
"precision" : 10,
|
||||||
|
"scale": 6
|
||||||
|
}
|
||||||
}, {
|
}, {
|
||||||
"name" :"city_to_state",
|
"name" :"city_to_state",
|
||||||
"type" : {
|
"type" : {
|
||||||
|
|||||||
Reference in New Issue
Block a user