1
0

[HUDI-1716]: Resolving default values for schema from dataframe (#2765)

- Adding default values and setting null as first entry in UNION data types in avro schema. 

Co-authored-by: Aditya Tiwari <aditya.tiwari@flipkart.com>
This commit is contained in:
Aditya Tiwari
2021-04-19 19:35:20 +05:30
committed by GitHub
parent dab5114f16
commit ec2334ceac
9 changed files with 405 additions and 15 deletions

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.client.bootstrap;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
@@ -29,7 +30,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
@@ -63,6 +63,6 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro
String structName = tableName + "_record";
String recordNamespace = "hoodie." + tableName;
return SchemaConverters.toAvroType(sparkSchema, false, structName, recordNamespace);
return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, structName, recordNamespace);
}
}

View File

@@ -33,6 +33,7 @@ import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.hudi.AvroConversionUtils._
import scala.collection.JavaConverters._
@@ -340,7 +341,7 @@ object AvroConversionHelper {
}
}
case structType: StructType =>
val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
val schema: Schema = convertStructTypeToAvroSchema(structType, structName, recordNamespace)
val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(

View File

@@ -19,6 +19,7 @@
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.JsonProperties
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.spark.rdd.RDD
@@ -27,6 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
object AvroConversionUtils {
@@ -46,10 +48,67 @@ object AvroConversionUtils {
}
}
/**
*
* Returns avro schema from spark StructType.
*
* @param structType Dataframe Struct Type.
* @param structName Avro record name.
* @param recordNamespace Avro record namespace.
* @return Avro schema corresponding to given struct type.
*/
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace))
}
/**
*
* Method to add default value of null to nullable fields in given avro schema
*
* @param schema input avro schema
* @return Avro schema with null default set to nullable fields
*/
def getAvroSchemaWithDefaults(schema: Schema): Schema = {
schema.getType match {
case Schema.Type.RECORD => {
val modifiedFields = schema.getFields.map(field => {
val newSchema = getAvroSchemaWithDefaults(field.schema())
field.schema().getType match {
case Schema.Type.UNION => {
val innerFields = newSchema.getTypes
val containsNullSchema = innerFields.foldLeft(false)((nullFieldEncountered, schema) => nullFieldEncountered | schema.getType == Schema.Type.NULL)
if(containsNullSchema) {
// Need to re shuffle the fields in list because to set null as default, null schema must be head in union schema
val restructuredNewSchema = Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
new Schema.Field(field.name(), restructuredNewSchema, field.doc(), JsonProperties.NULL_VALUE)
} else {
new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal())
}
}
case _ => new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal())
}
}).toList
Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, modifiedFields)
}
case Schema.Type.UNION => {
Schema.createUnion(schema.getTypes.map(innerSchema => getAvroSchemaWithDefaults(innerSchema)))
}
case Schema.Type.MAP => {
Schema.createMap(getAvroSchemaWithDefaults(schema.getValueType))
}
case Schema.Type.ARRAY => {
Schema.createArray(getAvroSchemaWithDefaults(schema.getElementType))
}
case _ => schema
}
}
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {

View File

@@ -45,6 +45,10 @@ public class DataSourceTestUtils {
return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleSchema.txt")));
}
public static Schema getStructTypeExampleEvolvedSchema() throws IOException {
return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleEvolvedSchema.txt")));
}
public static List<Row> generateRandomRows(int count) {
Random random = new Random();
List<Row> toReturn = new ArrayList<>();
@@ -58,4 +62,31 @@ public class DataSourceTestUtils {
}
return toReturn;
}
public static List<Row> generateUpdates(List<Row> records, int count) {
List<Row> toReturn = new ArrayList<>();
for (int i = 0; i < count; i++) {
Object[] values = new Object[3];
values[0] = records.get(i).getString(0);
values[1] = records.get(i).getAs(1);
values[2] = new Date().getTime();
toReturn.add(RowFactory.create(values));
}
return toReturn;
}
public static List<Row> generateRandomRowsEvolvedSchema(int count) {
Random random = new Random();
List<Row> toReturn = new ArrayList<>();
List<String> partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH});
for (int i = 0; i < count; i++) {
Object[] values = new Object[4];
values[0] = UUID.randomUUID().toString();
values[1] = partitions.get(random.nextInt(3));
values[2] = new Date().getTime();
values[3] = UUID.randomUUID().toString();
toReturn.add(RowFactory.create(values));
}
return toReturn;
}
}

View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "example.schema",
"type": "record",
"name": "trip",
"fields": [
{
"name": "_row_key",
"type": "string"
},
{
"name": "partition",
"type": "string"
},
{
"name": "ts",
"type": ["long", "null"]
},
{
"name": "new_field",
"type": ["string","null"]
}
]
}

View File

@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.spark.sql.types.{DataTypes, StructType, StringType, ArrayType}
import org.scalatest.{FunSuite, Matchers}
class TestAvroConversionUtils extends FunSuite with Matchers {
test("test convertStructTypeToAvroSchema") {
val mapType = DataTypes.createMapType(StringType, new StructType().add("mapKey", "string", false).add("mapVal", "integer", true))
val arrayType = ArrayType(new StructType().add("arrayKey", "string", false).add("arrayVal", "integer", true))
val innerStruct = new StructType().add("innerKey","string",false).add("value", "long", true)
val struct = new StructType().add("key", "string", false).add("version", "string", true)
.add("data1",innerStruct,false).add("data2",innerStruct,true)
.add("nullableMap", mapType, true).add("map",mapType,false)
.add("nullableArray", arrayType, true).add("array",arrayType,false)
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName", "SchemaNS")
val expectedSchemaStr = s"""
{
"type" : "record",
"name" : "SchemaName",
"namespace" : "SchemaNS",
"fields" : [ {
"name" : "key",
"type" : "string"
}, {
"name" : "version",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "data1",
"type" : {
"type" : "record",
"name" : "data1",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "innerKey",
"type" : "string"
}, {
"name" : "value",
"type" : [ "null", "long" ],
"default" : null
} ]
}
}, {
"name" : "data2",
"type" : [ "null", {
"type" : "record",
"name" : "data2",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "innerKey",
"type" : "string"
}, {
"name" : "value",
"type" : [ "null", "long" ],
"default" : null
} ]
} ],
"default" : null
}, {
"name" : "nullableMap",
"type" : [ "null", {
"type" : "map",
"values" : [ {
"type" : "record",
"name" : "nullableMap",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "mapKey",
"type" : "string"
}, {
"name" : "mapVal",
"type" : [ "null", "int" ],
"default" : null
} ]
}, "null" ]
} ],
"default" : null
}, {
"name" : "map",
"type" : {
"type" : "map",
"values" : [ {
"type" : "record",
"name" : "map",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "mapKey",
"type" : "string"
}, {
"name" : "mapVal",
"type" : [ "null", "int" ],
"default" : null
} ]
}, "null" ]
}
}, {
"name" : "nullableArray",
"type" : [ "null", {
"type" : "array",
"items" : [ {
"type" : "record",
"name" : "nullableArray",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "arrayKey",
"type" : "string"
}, {
"name" : "arrayVal",
"type" : [ "null", "int" ],
"default" : null
} ]
}, "null" ]
} ],
"default" : null
}, {
"name" : "array",
"type" : {
"type" : "array",
"items" : [ {
"type" : "record",
"name" : "array",
"namespace" : "SchemaNS.SchemaName",
"fields" : [ {
"name" : "arrayKey",
"type" : "string"
}, {
"name" : "arrayVal",
"type" : [ "null", "int" ],
"default" : null
} ]
}, "null" ]
}
} ]
}
"""
val expectedAvroSchema = new Schema.Parser().parse(expectedSchemaStr)
assert(avroSchema.equals(expectedAvroSchema))
}
}

View File

@@ -39,6 +39,7 @@ import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.{FunSuite, Matchers}
import scala.collection.JavaConversions._
import org.junit.jupiter.api.Assertions.assertEquals
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
@@ -300,13 +301,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val modifiedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip", "example.schema")
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
schema.toString,
modifiedSchema.toString,
path.toAbsolutePath.toString,
hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
@@ -399,6 +401,91 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
})
List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.foreach(tableType => {
test("test schema evolution for " + tableType) {
initSparkContext("test_schema_evolution")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "1",
"hoodie.upsert.shuffle.parallelism" -> "1",
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
var schema = DataSourceTestUtils.getStructTypeExampleSchema
var structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
var records = DataSourceTestUtils.generateRandomRows(10)
var recordsSeq = convertRowListToSeq(records)
var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)
val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(10, snapshotDF1.count())
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf1 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
assert(df1.except(trimmedDf1).count() == 0)
// issue updates so that log files are created for MOR table
var updates = DataSourceTestUtils.generateUpdates(records, 5);
var updatesSeq = convertRowListToSeq(updates)
var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
// write updates to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)
val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(10, snapshotDF2.count())
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf2 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
// ensure 2nd batch of updates matches.
assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)
// getting new schema with new column
schema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema
structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5)
recordsSeq = convertRowListToSeq(records)
val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi with new column
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3)
val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(15, snapshotDF3.count())
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf3 = snapshotDF3.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
// ensure 2nd batch of updates matches.
assert(df3.intersect(trimmedDf3).except(df3).count() == 0)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
})
case class Test(uuid: String, ts: Long)
import scala.collection.JavaConverters

View File

@@ -47,8 +47,8 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
private static String ORIGINAL_SCHEMA = "{\"name\":\"t3_biz_operation_t_driver\",\"type\":\"record\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ "{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
private static String RESULT_SCHEMA = "{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"ums_ts_\",\"type\":[\"string\",\"null\"]}]}";
private static String RESULT_SCHEMA = "{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"],"
+ "\"default\":null},{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
@Test
public void testPostProcessor() throws IOException {

View File

@@ -26,34 +26,42 @@
},
{
"name": "TIMESTAMP",
"type": ["double", "null"]
"type": ["null", "double"],
"default": null
},
{
"name": "RIDER",
"type": ["string", "null"]
"type": ["null", "string"],
"default": null
},
{
"name": "DRIVER",
"type": ["string", "null"]
"type": ["null" ,"string"],
"default": null
},
{
"name": "BEGIN_LAT",
"type": ["double", "null"]
"type": ["null", "double"],
"default": null
},
{
"name": "BEGIN_LON",
"type": ["double", "null"]
"type": ["null", "double"],
"default": null
},
{
"name": "END_LAT",
"type": ["double", "null"]
"type": ["null", "double"],
"default": null
},
{
"name": "END_LON",
"type": ["double", "null"]
"type": ["null", "double"],
"default": null
},
{
"name": "FARE",
"type": ["double", "null"]
"type": ["null", "double"],
"default": null
} ]
}