1
0

[HUDI-3333] fix that getNestedFieldVal breaks with Spark 3.2 (#4783)

This commit is contained in:
Yann Byron
2022-02-10 22:12:16 +08:00
committed by GitHub
parent e7ec3a82dc
commit d971974063
3 changed files with 42 additions and 41 deletions

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.avro;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversions.DecimalConversion;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
@@ -39,6 +40,7 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
@@ -277,7 +279,7 @@ public class HoodieAvroUtils {
for (Schema.Field schemaField: fileSchema.getFields()) {
if (fields.contains(schemaField.name())) {
toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultValue()));
toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultVal()));
}
}
recordSchema.setFields(toBeAddedFields);
@@ -434,23 +436,32 @@ public class HoodieAvroUtils {
String[] parts = fieldName.split("\\.");
GenericRecord valueNode = record;
int i = 0;
for (; i < parts.length; i++) {
String part = parts[i];
Object val = valueNode.get(part);
if (val == null) {
break;
}
// return, if last part of name
if (i == parts.length - 1) {
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled);
} else {
// VC: Need a test here
if (!(val instanceof GenericRecord)) {
throw new HoodieException("Cannot find a record at part value :" + part);
try {
for (; i < parts.length; i++) {
String part = parts[i];
Object val = valueNode.get(part);
if (val == null) {
break;
}
valueNode = (GenericRecord) val;
// return, if last part of name
if (i == parts.length - 1) {
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled);
} else {
// VC: Need a test here
if (!(val instanceof GenericRecord)) {
throw new HoodieException("Cannot find a record at part value :" + part);
}
valueNode = (GenericRecord) val;
}
}
} catch (AvroRuntimeException e) {
// Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key)
// rather than return null like the previous version if if record doesn't contain this key.
// So when returnNullIfNotFound is true, catch this exception.
if (!returnNullIfNotFound) {
throw e;
}
}

View File

@@ -89,7 +89,13 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled));
return Option.ofNullable(
HoodieAvroUtils.getNestedFieldVal(
record,
properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY),
true,
consistentLogicalTimestampEnabled)
);
}
@Override
@@ -115,30 +121,13 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled);
Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled);
Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY),
true, consistentLogicalTimestampEnabled);
Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY),
true, consistentLogicalTimestampEnabled);
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}
/**
* a wrapper of HoodieAvroUtils.getNestedFieldVal.
* Within it, catch exceptions and return null when "returnNullIfNotFound" is true and can't take effect.
*/
private static Object getNestedFieldVal(
GenericRecord record,
String fieldName,
boolean returnNullIfNotFound,
boolean consistentLogicalTimestampEnabled) {
try {
return HoodieAvroUtils.getNestedFieldVal(record, fieldName, returnNullIfNotFound, consistentLogicalTimestampEnabled);
} catch (Exception e) {
if (returnNullIfNotFound) {
return null;
} else {
throw e;
}
}
}
}

View File

@@ -1587,6 +1587,7 @@
<scalatest.version>3.1.0</scalatest.version>
<kafka.version>2.4.1</kafka.version>
<parquet.version>1.12.1</parquet.version>
<avro.version>1.10.2</avro.version>
<fasterxml.version>${fasterxml.spark3.version}</fasterxml.version>
<fasterxml.jackson.databind.version>${fasterxml.spark3.version}</fasterxml.jackson.databind.version>
<fasterxml.jackson.module.scala.version>${fasterxml.spark3.version}</fasterxml.jackson.module.scala.version>