1
0

[HUDI-1509]: Reverting LinkedHashSet changes to combine fields from oldSchema and newSchema in favor of using only new schema for record rewriting (#2424)

This commit is contained in:
n3nash
2021-01-14 12:47:50 -08:00
committed by GitHub
parent e926c1a45c
commit 749f657856
4 changed files with 36 additions and 36 deletions

View File

@@ -481,7 +481,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
GenericRecord payload;
try {
payload = (GenericRecord)r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
GenericRecord newPayload = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(payload, newSchema);
GenericRecord newPayload = HoodieAvroUtils.rewriteRecord(payload, newSchema);
return new HoodieRecord(key, new RawTripTestPayload(newPayload.toString(), key.getRecordKey(), key.getPartitionPath(), schemaStr));
} catch (IOException e) {
throw new RuntimeException("Conversion to new schema failed");

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.avro;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -58,7 +59,6 @@ import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -292,53 +292,53 @@ public class HoodieAvroUtils {
return result;
}
/**
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old
* schema.
*/
public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) {
return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), newSchema), newSchema);
}
/**
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new
* schema.
* NOTE: Here, the assumption is that you cannot go from an evolved schema (schema with (N) fields)
* to an older schema (schema with (N-1) fields). All fields present in the older record schema MUST be present in the
* new schema and the default/existing values are carried over.
* This particular method does the following things :
* a) Create a new empty GenericRecord with the new schema.
* b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this
* transformed schema
* c) For SpecificRecord, hoodie_metadata_fields have a special treatment. This is done because for code generated
* avro classes (HoodieMetadataRecord), the avro record is a SpecificBaseRecord type instead of a GenericRecord.
* SpecificBaseRecord throws null pointer exception for record.get(name) if name is not present in the schema of the
* record (which happens when converting a SpecificBaseRecord without hoodie_metadata_fields to a new record with it).
* In this case, we do NOT set the defaults for the hoodie_metadata_fields explicitly, instead, the new record assumes
* the default defined in the avro schema itself.
* TODO: See if we can always pass GenericRecord instead of SpecificBaseRecord in some cases.
*/
public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema);
}
private static GenericRecord rewrite(GenericRecord record, LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSchema) {
GenericRecord newRecord = new GenericData.Record(newSchema);
for (Schema.Field f : fieldsToWrite) {
if (record.get(f.name()) == null) {
if (f.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(f.name(), null);
} else {
newRecord.put(f.name(), f.defaultVal());
}
} else {
newRecord.put(f.name(), record.get(f.name()));
boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
for (Schema.Field f : newSchema.getFields()) {
if (!isSpecificRecord) {
copyOldValueOrSetDefault(oldRecord, newRecord, f);
} else if (!isMetadataField(f.name())) {
copyOldValueOrSetDefault(oldRecord, newRecord, f);
}
}
if (!GenericData.get().validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + record + " against schema " + newSchema);
"Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema);
}
return newRecord;
}
/**
* Generates a super set of fields from both old and new schema.
*/
private static LinkedHashSet<Field> getCombinedFieldsToWrite(Schema oldSchema, Schema newSchema) {
LinkedHashSet<Field> allFields = new LinkedHashSet<>(oldSchema.getFields());
for (Schema.Field f : newSchema.getFields()) {
if (!allFields.contains(f) && !isMetadataField(f.name())) {
allFields.add(f);
private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) {
// cache the result of oldRecord.get() to save CPU expensive hash lookup
Object fieldValue = oldRecord.get(f.name());
if (fieldValue == null) {
if (f.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(f.name(), null);
} else {
newRecord.put(f.name(), f.defaultVal());
}
} else {
newRecord.put(f.name(), fieldValue);
}
return allFields;
}
public static byte[] compress(String text) {

View File

@@ -109,7 +109,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
if (usesCustomPayload) {
// If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
// the writerSchema with only the projection fields
recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema());
}
// we assume, a later safe record in the log, is newer than what we have in the map &
// replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest

View File

@@ -197,7 +197,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
private JavaRDD<GenericRecord> projectSchema(JavaRDD<GenericRecord> updates) {
// The records read from the hoodie dataset have the hoodie record fields, rewrite the record to eliminate them
return updates
.map(r -> HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(r, new Schema.Parser().parse(schemaStr)));
.map(r -> HoodieAvroUtils.rewriteRecord(r, new Schema.Parser().parse(schemaStr)));
}
private JavaRDD<GenericRecord> generateUpdates(Map<String, Integer> adjustedPartitionToFileIdCountMap,