diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 708a6471b..ecf38d8a9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -481,7 +481,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { GenericRecord payload; try { payload = (GenericRecord)r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get(); - GenericRecord newPayload = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(payload, newSchema); + GenericRecord newPayload = HoodieAvroUtils.rewriteRecord(payload, newSchema); return new HoodieRecord(key, new RawTripTestPayload(newPayload.toString(), key.getRecordKey(), key.getPartitionPath(), schemaStr)); } catch (IOException e) { throw new RuntimeException("Conversion to new schema failed"); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 37a49eacc..ec19fe354 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.avro; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -58,7 +59,6 @@ import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -292,53 +292,53 @@ public class HoodieAvroUtils { return result; } - /** - * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old - * schema. - */ - public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) { - return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), newSchema), newSchema); - } - /** * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new * schema. + * NOTE: Here, the assumption is that you cannot go from an evolved schema (schema with (N) fields) + * to an older schema (schema with (N-1) fields). All fields present in the older record schema MUST be present in the + * new schema and the default/existing values are carried over. + * This particular method does the following things : + * a) Create a new empty GenericRecord with the new schema. + * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this + * transformed schema + * c) For SpecificRecord, hoodie_metadata_fields have a special treatment. This is done because for code generated + * avro classes (HoodieMetadataRecord), the avro record is a SpecificBaseRecord type instead of a GenericRecord. + * SpecificBaseRecord throws null pointer exception for record.get(name) if name is not present in the schema of the + * record (which happens when converting a SpecificBaseRecord without hoodie_metadata_fields to a new record with it). + * In this case, we do NOT set the defaults for the hoodie_metadata_fields explicitly, instead, the new record assumes + * the default defined in the avro schema itself. + * TODO: See if we can always pass GenericRecord instead of SpecificBaseRecord in some cases. */ - public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) { - return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema); - } - - private static GenericRecord rewrite(GenericRecord record, LinkedHashSet fieldsToWrite, Schema newSchema) { + public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSchema) { GenericRecord newRecord = new GenericData.Record(newSchema); - for (Schema.Field f : fieldsToWrite) { - if (record.get(f.name()) == null) { - if (f.defaultVal() instanceof JsonProperties.Null) { - newRecord.put(f.name(), null); - } else { - newRecord.put(f.name(), f.defaultVal()); - } - } else { - newRecord.put(f.name(), record.get(f.name())); + boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase; + for (Schema.Field f : newSchema.getFields()) { + if (!isSpecificRecord) { + copyOldValueOrSetDefault(oldRecord, newRecord, f); + } else if (!isMetadataField(f.name())) { + copyOldValueOrSetDefault(oldRecord, newRecord, f); } } if (!GenericData.get().validate(newSchema, newRecord)) { throw new SchemaCompatibilityException( - "Unable to validate the rewritten record " + record + " against schema " + newSchema); + "Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema); } return newRecord; } - /** - * Generates a super set of fields from both old and new schema. - */ - private static LinkedHashSet getCombinedFieldsToWrite(Schema oldSchema, Schema newSchema) { - LinkedHashSet allFields = new LinkedHashSet<>(oldSchema.getFields()); - for (Schema.Field f : newSchema.getFields()) { - if (!allFields.contains(f) && !isMetadataField(f.name())) { - allFields.add(f); + private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) { + // cache the result of oldRecord.get() to save CPU expensive hash lookup + Object fieldValue = oldRecord.get(f.name()); + if (fieldValue == null) { + if (f.defaultVal() instanceof JsonProperties.Null) { + newRecord.put(f.name(), null); + } else { + newRecord.put(f.name(), f.defaultVal()); } + } else { + newRecord.put(f.name(), fieldValue); } - return allFields; } public static byte[] compress(String text) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index a139997ca..a98a23010 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -109,7 +109,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader if (usesCustomPayload) { // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from // the writerSchema with only the projection fields - recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema()); + recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema()); } // we assume, a later safe record in the log, is newer than what we have in the map & // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 4592ae811..5aa979725 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -197,7 +197,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { private JavaRDD projectSchema(JavaRDD updates) { // The records read from the hoodie dataset have the hoodie record fields, rewrite the record to eliminate them return updates - .map(r -> HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(r, new Schema.Parser().parse(schemaStr))); + .map(r -> HoodieAvroUtils.rewriteRecord(r, new Schema.Parser().parse(schemaStr))); } private JavaRDD generateUpdates(Map adjustedPartitionToFileIdCountMap,