diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 8bdc253d3..ae0f5957b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.avro;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversions.DecimalConversion;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
@@ -39,6 +40,7 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
+
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
@@ -277,7 +279,7 @@ public class HoodieAvroUtils {
for (Schema.Field schemaField: fileSchema.getFields()) {
if (fields.contains(schemaField.name())) {
- toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultValue()));
+ toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultVal()));
}
}
recordSchema.setFields(toBeAddedFields);
@@ -434,23 +436,32 @@ public class HoodieAvroUtils {
String[] parts = fieldName.split("\\.");
GenericRecord valueNode = record;
int i = 0;
- for (; i < parts.length; i++) {
- String part = parts[i];
- Object val = valueNode.get(part);
- if (val == null) {
- break;
- }
-
- // return, if last part of name
- if (i == parts.length - 1) {
- Schema fieldSchema = valueNode.getSchema().getField(part).schema();
- return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled);
- } else {
- // VC: Need a test here
- if (!(val instanceof GenericRecord)) {
- throw new HoodieException("Cannot find a record at part value :" + part);
+ try {
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Object val = valueNode.get(part);
+ if (val == null) {
+ break;
}
- valueNode = (GenericRecord) val;
+
+ // return, if last part of name
+ if (i == parts.length - 1) {
+ Schema fieldSchema = valueNode.getSchema().getField(part).schema();
+ return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled);
+ } else {
+ // VC: Need a test here
+ if (!(val instanceof GenericRecord)) {
+ throw new HoodieException("Cannot find a record at part value :" + part);
+ }
+ valueNode = (GenericRecord) val;
+ }
+ }
+ } catch (AvroRuntimeException e) {
+ // Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key)
+ // rather than return null like the previous version if if record doesn't contain this key.
+ // So when returnNullIfNotFound is true, catch this exception.
+ if (!returnNullIfNotFound) {
+ throw e;
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 942a0aa09..5e4b445df 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -89,7 +89,13 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
- return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled));
+ return Option.ofNullable(
+ HoodieAvroUtils.getNestedFieldVal(
+ record,
+ properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY),
+ true,
+ consistentLogicalTimestampEnabled)
+ );
}
@Override
@@ -115,30 +121,13 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
- Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue,
- properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled);
- Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord,
- properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled);
+ Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+ properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY),
+ true, consistentLogicalTimestampEnabled);
+ Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
+ properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY),
+ true, consistentLogicalTimestampEnabled);
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}
- /**
- * a wrapper of HoodieAvroUtils.getNestedFieldVal.
- * Within it, catch exceptions and return null when "returnNullIfNotFound" is true and can't take effect.
- */
- private static Object getNestedFieldVal(
- GenericRecord record,
- String fieldName,
- boolean returnNullIfNotFound,
- boolean consistentLogicalTimestampEnabled) {
- try {
- return HoodieAvroUtils.getNestedFieldVal(record, fieldName, returnNullIfNotFound, consistentLogicalTimestampEnabled);
- } catch (Exception e) {
- if (returnNullIfNotFound) {
- return null;
- } else {
- throw e;
- }
- }
- }
}
diff --git a/pom.xml b/pom.xml
index 420da1c5d..8f2152dfa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1587,6 +1587,7 @@
3.1.0
2.4.1
1.12.1
+ 1.10.2
${fasterxml.spark3.version}
${fasterxml.spark3.version}
${fasterxml.spark3.version}