[HUDI-1493] Fixed schema compatibility check for fields. (#2350)
Some field types changes are allowed (e.g. int -> long) while maintaining schema backward compatibility within HUDI. The check was reversed with the reader schema being passed for the write schema.
This commit is contained in:
@@ -107,10 +107,33 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
|||||||
assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema),
|
assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema),
|
||||||
"Swapped fields are not compatible");
|
"Swapped fields are not compatible");
|
||||||
|
|
||||||
String typeChangeSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
|
String typeChangeSchemaDisallowed = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
|
||||||
+ TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX;
|
+ TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX;
|
||||||
assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchema),
|
assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchemaDisallowed),
|
||||||
"Field type change is not compatible");
|
"Incompatible field type change is not allowed");
|
||||||
|
|
||||||
|
// Array of allowed schema field type transitions
|
||||||
|
String[][] allowedFieldChanges = {
|
||||||
|
{"string", "bytes"}, {"bytes", "string"},
|
||||||
|
{"int", "long"}, {"int", "float"}, {"long", "float"},
|
||||||
|
{"int", "double"}, {"float", "double"}, {"long", "double"}};
|
||||||
|
for (String[] fieldChange : allowedFieldChanges) {
|
||||||
|
String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[0]) + TRIP_SCHEMA_SUFFIX;
|
||||||
|
String toSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[1]) + TRIP_SCHEMA_SUFFIX;
|
||||||
|
assertTrue(TableSchemaResolver.isSchemaCompatible(fromSchema, toSchema),
|
||||||
|
"Compatible field type change is not allowed");
|
||||||
|
if (!fieldChange[0].equals("byte") && fieldChange[1].equals("byte")) {
|
||||||
|
assertFalse(TableSchemaResolver.isSchemaCompatible(toSchema, fromSchema),
|
||||||
|
"Incompatible field type change is allowed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Names and aliases should match
|
||||||
|
String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
|
String toSchema = TRIP_SCHEMA_PREFIX.replace("triprec", "new_triprec") + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
|
||||||
|
assertFalse(TableSchemaResolver.isSchemaCompatible(fromSchema, toSchema), "Field names should match");
|
||||||
|
assertFalse(TableSchemaResolver.isSchemaCompatible(toSchema, fromSchema), "Field names should match");
|
||||||
|
|
||||||
|
|
||||||
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED),
|
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED),
|
||||||
"Added field with default is compatible (Evolved Schema)");
|
"Added field with default is compatible (Evolved Schema)");
|
||||||
@@ -474,6 +497,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected HoodieTableType getTableType() {
|
protected HoodieTableType getTableType() {
|
||||||
return tableType;
|
return tableType;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -296,7 +296,7 @@ public class TableSchemaResolver {
|
|||||||
public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) {
|
public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) {
|
||||||
if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) {
|
if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) {
|
||||||
// record names must match:
|
// record names must match:
|
||||||
if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) {
|
if (!SchemaCompatibility.schemaNameEquals(newSchema, oldSchema)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -329,9 +329,11 @@ public class TableSchemaResolver {
|
|||||||
// All fields in the newSchema record can be populated from the oldSchema record
|
// All fields in the newSchema record can be populated from the oldSchema record
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
// Use the checks implemented by
|
// Use the checks implemented by Avro
|
||||||
|
// newSchema is the schema which will be used to read the records written earlier using oldSchema. Hence, in the
|
||||||
|
// check below, use newSchema as the reader schema and oldSchema as the writer schema.
|
||||||
org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult =
|
org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult =
|
||||||
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, newSchema);
|
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema);
|
||||||
return compatResult.getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
|
return compatResult.getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user