[HUDI-4276] Reconcile schema-inject null values for missing fields and add new fields (#6017)
* [HUDI-4276] Reconcile schema-inject null values for missing fields and add new fields. * fix comments Co-authored-by: public (bdcee5037027) <mengtao0326@qq.com>
This commit is contained in:
@@ -745,15 +745,18 @@ public class HoodieAvroUtils {
|
||||
* b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
|
||||
*
|
||||
* @param oldRecord oldRecord to be rewritten
|
||||
* @param oldAvroSchema old avro schema.
|
||||
* @param newSchema newSchema used to rewrite oldRecord
|
||||
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
|
||||
* @param fieldNames track the full name of visited field when we travel new schema.
|
||||
* @return newRecord for new Schema
|
||||
*/
|
||||
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
|
||||
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
|
||||
if (oldRecord == null) {
|
||||
return null;
|
||||
}
|
||||
// try to get real schema for union type
|
||||
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
|
||||
switch (newSchema.getType()) {
|
||||
case RECORD:
|
||||
if (!(oldRecord instanceof IndexedRecord)) {
|
||||
@@ -761,40 +764,33 @@ public class HoodieAvroUtils {
|
||||
}
|
||||
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
|
||||
List<Schema.Field> fields = newSchema.getFields();
|
||||
Map<Integer, Object> helper = new HashMap<>();
|
||||
|
||||
GenericData.Record newRecord = new GenericData.Record(newSchema);
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
Schema.Field field = fields.get(i);
|
||||
String fieldName = field.name();
|
||||
fieldNames.push(fieldName);
|
||||
if (oldSchema.getField(field.name()) != null) {
|
||||
Schema.Field oldField = oldSchema.getField(field.name());
|
||||
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
|
||||
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
|
||||
} else {
|
||||
String fieldFullName = createFullName(fieldNames);
|
||||
String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
|
||||
String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
|
||||
String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
|
||||
// deal with rename
|
||||
if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) {
|
||||
if (oldSchema.getField(field.name()) == null && oldSchema.getField(fieldNameFromOldSchema) != null) {
|
||||
// find rename
|
||||
Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema);
|
||||
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
|
||||
Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema);
|
||||
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
|
||||
} else {
|
||||
// deal with default value
|
||||
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
|
||||
newRecord.put(i, null);
|
||||
} else {
|
||||
newRecord.put(i, fields.get(i).defaultVal());
|
||||
}
|
||||
}
|
||||
}
|
||||
fieldNames.pop();
|
||||
}
|
||||
GenericData.Record newRecord = new GenericData.Record(newSchema);
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
if (helper.containsKey(i)) {
|
||||
newRecord.put(i, helper.get(i));
|
||||
} else {
|
||||
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
|
||||
newRecord.put(i, null);
|
||||
} else {
|
||||
newRecord.put(i, fields.get(i).defaultVal());
|
||||
}
|
||||
}
|
||||
}
|
||||
return newRecord;
|
||||
case ARRAY:
|
||||
if (!(oldRecord instanceof Collection)) {
|
||||
@@ -1028,4 +1024,8 @@ public class HoodieAvroUtils {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, Schema newSchema) {
|
||||
return rewriteRecordWithNewSchema(oldRecord, newSchema, Collections.EMPTY_MAP);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,13 @@ public class HoodieCommonConfig extends HoodieConfig {
|
||||
.defaultValue(false)
|
||||
.withDocumentation("Enables support for Schema Evolution feature");
|
||||
|
||||
public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
|
||||
.key("hoodie.datasource.write.reconcile.schema")
|
||||
.defaultValue(false)
|
||||
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
|
||||
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
|
||||
+ "injected to missing fields). If not, the write batch would fail.");
|
||||
|
||||
public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
|
||||
.key("hoodie.common.spillable.diskmap.type")
|
||||
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
|
||||
|
||||
@@ -57,8 +57,8 @@ import org.apache.log4j.Logger;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@@ -380,7 +380,7 @@ public abstract class AbstractHoodieLogRecordReader {
|
||||
Option<Schema> schemaOption = getMergedSchema(dataBlock);
|
||||
while (recordIterator.hasNext()) {
|
||||
IndexedRecord currentRecord = recordIterator.next();
|
||||
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord;
|
||||
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), Collections.emptyMap()) : currentRecord;
|
||||
processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
|
||||
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
|
||||
totalLogRecords.incrementAndGet();
|
||||
|
||||
@@ -68,10 +68,7 @@ public class InternalSchemaMerger {
|
||||
}
|
||||
|
||||
public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) {
|
||||
this.fileSchema = fileSchema;
|
||||
this.querySchema = querySchema;
|
||||
this.ignoreRequiredAttribute = ignoreRequiredAttribute;
|
||||
this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
|
||||
this(fileSchema, querySchema, ignoreRequiredAttribute, useColumnTypeFromFileSchema, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -151,14 +148,15 @@ public class InternalSchemaMerger {
|
||||
Types.Field fieldFromFileSchema = fileSchema.findField(fieldId);
|
||||
String nameFromFileSchema = fieldFromFileSchema.name();
|
||||
String nameFromQuerySchema = querySchema.findField(fieldId).name();
|
||||
String finalFieldName = useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema;
|
||||
Type typeFromFileSchema = fieldFromFileSchema.type();
|
||||
// Current design mechanism guarantees nestedType change is not allowed, so no need to consider.
|
||||
if (newType.isNestedType()) {
|
||||
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
|
||||
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc());
|
||||
finalFieldName, newType, oldField.doc());
|
||||
} else {
|
||||
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
|
||||
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
|
||||
finalFieldName, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,37 +33,33 @@ import java.util.stream.Collectors;
|
||||
* Utility methods to support evolve old avro schema based on a given schema.
|
||||
*/
|
||||
public class AvroSchemaEvolutionUtils {
|
||||
/**
|
||||
* Support evolution from a new avroSchema.
|
||||
* Now hoodie support implicitly add columns when hoodie write operation,
|
||||
* This ability needs to be preserved, so implicitly evolution for internalSchema should supported.
|
||||
*
|
||||
* @param evolvedSchema implicitly evolution of avro when hoodie write operation
|
||||
* @param oldSchema old internalSchema
|
||||
* @param supportPositionReorder support position reorder
|
||||
* @return evolution Schema
|
||||
*/
|
||||
public static InternalSchema evolveSchemaFromNewAvroSchema(Schema evolvedSchema, InternalSchema oldSchema, Boolean supportPositionReorder) {
|
||||
InternalSchema evolvedInternalSchema = AvroInternalSchemaConverter.convert(evolvedSchema);
|
||||
// do check, only support add column evolution
|
||||
List<String> colNamesFromEvolved = evolvedInternalSchema.getAllColsFullName();
|
||||
List<String> colNamesFromOldSchema = oldSchema.getAllColsFullName();
|
||||
List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f -> !colNamesFromEvolved.contains(f)).collect(Collectors.toList());
|
||||
List<Types.Field> newFields = new ArrayList<>();
|
||||
if (colNamesFromEvolved.size() == colNamesFromOldSchema.size() && diffFromOldSchema.size() == 0) {
|
||||
// no changes happen
|
||||
if (supportPositionReorder) {
|
||||
evolvedInternalSchema.getRecord().fields().forEach(f -> newFields.add(oldSchema.getRecord().field(f.name())));
|
||||
return new InternalSchema(newFields);
|
||||
}
|
||||
return oldSchema;
|
||||
}
|
||||
// try to find all added columns
|
||||
if (diffFromOldSchema.size() != 0) {
|
||||
throw new UnsupportedOperationException("Cannot evolve schema implicitly, find delete/rename operation");
|
||||
}
|
||||
|
||||
List<String> diffFromEvolutionSchema = colNamesFromEvolved.stream().filter(f -> !colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
|
||||
/**
|
||||
* Support reconcile from a new avroSchema.
|
||||
* 1) incoming data has missing columns that were already defined in the table –> null values will be injected into missing columns
|
||||
* 2) incoming data contains new columns not defined yet in the table -> columns will be added to the table schema (incoming dataframe?)
|
||||
* 3) incoming data has missing columns that are already defined in the table and new columns not yet defined in the table ->
|
||||
* new columns will be added to the table schema, missing columns will be injected with null values
|
||||
* 4) support nested schema change.
|
||||
* Notice:
|
||||
* the incoming schema should not have delete/rename semantics.
|
||||
* for example: incoming schema: int a, int b, int d; oldTableSchema int a, int b, int c, int d
|
||||
* we must guarantee the column c is missing semantic, instead of delete semantic.
|
||||
* @param incomingSchema implicitly evolution of avro when hoodie write operation
|
||||
* @param oldTableSchema old internalSchema
|
||||
* @return reconcile Schema
|
||||
*/
|
||||
public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema) {
|
||||
InternalSchema inComingInternalSchema = AvroInternalSchemaConverter.convert(incomingSchema);
|
||||
// do check, only support add column evolution
|
||||
List<String> colNamesFromIncoming = inComingInternalSchema.getAllColsFullName();
|
||||
List<String> colNamesFromOldSchema = oldTableSchema.getAllColsFullName();
|
||||
List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f -> !colNamesFromIncoming.contains(f)).collect(Collectors.toList());
|
||||
List<Types.Field> newFields = new ArrayList<>();
|
||||
if (colNamesFromIncoming.size() == colNamesFromOldSchema.size() && diffFromOldSchema.size() == 0) {
|
||||
return oldTableSchema;
|
||||
}
|
||||
List<String> diffFromEvolutionSchema = colNamesFromIncoming.stream().filter(f -> !colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
|
||||
// Remove redundancy from diffFromEvolutionSchema.
|
||||
// for example, now we add a struct col in evolvedSchema, the struct col is " user struct<name:string, age:int> "
|
||||
// when we do diff operation: user, user.name, user.age will appeared in the resultSet which is redundancy, user.name and user.age should be excluded.
|
||||
@@ -77,29 +73,27 @@ public class AvroSchemaEvolutionUtils {
|
||||
// find redundancy, skip it
|
||||
continue;
|
||||
}
|
||||
finalAddAction.put(evolvedInternalSchema.findIdByName(name), name);
|
||||
finalAddAction.put(inComingInternalSchema.findIdByName(name), name);
|
||||
}
|
||||
|
||||
TableChanges.ColumnAddChange addChange = TableChanges.ColumnAddChange.get(oldSchema);
|
||||
TableChanges.ColumnAddChange addChange = TableChanges.ColumnAddChange.get(oldTableSchema);
|
||||
finalAddAction.entrySet().stream().forEach(f -> {
|
||||
String name = f.getValue();
|
||||
int splitPoint = name.lastIndexOf(".");
|
||||
String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
|
||||
String rawName = splitPoint > 0 ? name.substring(splitPoint + 1) : name;
|
||||
addChange.addColumns(parentName, rawName, evolvedInternalSchema.findType(name), null);
|
||||
// try to infer add position.
|
||||
java.util.Optional<String> inferPosition =
|
||||
colNamesFromIncoming.stream().filter(c ->
|
||||
c.lastIndexOf(".") == splitPoint
|
||||
&& c.startsWith(parentName)
|
||||
&& inComingInternalSchema.findIdByName(c) > inComingInternalSchema.findIdByName(name)
|
||||
&& oldTableSchema.findIdByName(c) > 0).sorted((s1, s2) -> oldTableSchema.findIdByName(s1) - oldTableSchema.findIdByName(s2)).findFirst();
|
||||
addChange.addColumns(parentName, rawName, inComingInternalSchema.findType(name), null);
|
||||
inferPosition.map(i -> addChange.addPositionChange(name, i, "before"));
|
||||
});
|
||||
|
||||
InternalSchema res = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange);
|
||||
if (supportPositionReorder) {
|
||||
evolvedInternalSchema.getRecord().fields().forEach(f -> newFields.add(oldSchema.getRecord().field(f.name())));
|
||||
return new InternalSchema(newFields);
|
||||
} else {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
public static InternalSchema evolveSchemaFromNewAvroSchema(Schema evolvedSchema, InternalSchema oldSchema) {
|
||||
return evolveSchemaFromNewAvroSchema(evolvedSchema, oldSchema, false);
|
||||
return SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -273,7 +273,7 @@ public class InternalSchemaUtils {
|
||||
*
|
||||
* @param oldSchema oldSchema
|
||||
* @param newSchema newSchema which modified from oldSchema
|
||||
* @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameFromOldSchema)
|
||||
* @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameLastPartFromOldSchema)
|
||||
*/
|
||||
public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) {
|
||||
List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
|
||||
@@ -281,6 +281,9 @@ public class InternalSchemaUtils {
|
||||
int filedIdFromWriteSchema = oldSchema.findIdByName(f);
|
||||
// try to find the cols which has the same id, but have different colName;
|
||||
return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
|
||||
}).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> e));
|
||||
}).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> {
|
||||
int lastDotIndex = e.lastIndexOf(".");
|
||||
return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1);
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user