From d971974063411597bdb5633e71a7e26688b2b136 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Thu, 10 Feb 2022 22:12:16 +0800 Subject: [PATCH] [HUDI-3333] fix that getNestedFieldVal breaks with Spark 3.2 (#4783) --- .../org/apache/hudi/avro/HoodieAvroUtils.java | 45 ++++++++++++------- .../model/DefaultHoodieRecordPayload.java | 37 ++++++--------- pom.xml | 1 + 3 files changed, 42 insertions(+), 41 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 8bdc253d3..ae0f5957b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.avro; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.Conversions.DecimalConversion; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalTypes; @@ -39,6 +40,7 @@ import org.apache.avro.io.EncoderFactory; import org.apache.avro.io.JsonDecoder; import org.apache.avro.io.JsonEncoder; import org.apache.avro.specific.SpecificRecordBase; + import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; @@ -277,7 +279,7 @@ public class HoodieAvroUtils { for (Schema.Field schemaField: fileSchema.getFields()) { if (fields.contains(schemaField.name())) { - toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultValue())); + toBeAddedFields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), schemaField.defaultVal())); } } recordSchema.setFields(toBeAddedFields); @@ -434,23 +436,32 @@ public class HoodieAvroUtils { String[] parts = fieldName.split("\\."); GenericRecord valueNode = record; int i = 0; - for (; i < parts.length; i++) { - String part = parts[i]; - Object val = valueNode.get(part); - if (val == null) { - break; - } - - // return, if last part of name - if (i == parts.length - 1) { - Schema fieldSchema = valueNode.getSchema().getField(part).schema(); - return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled); - } else { - // VC: Need a test here - if (!(val instanceof GenericRecord)) { - throw new HoodieException("Cannot find a record at part value :" + part); + try { + for (; i < parts.length; i++) { + String part = parts[i]; + Object val = valueNode.get(part); + if (val == null) { + break; } - valueNode = (GenericRecord) val; + + // return, if last part of name + if (i == parts.length - 1) { + Schema fieldSchema = valueNode.getSchema().getField(part).schema(); + return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled); + } else { + // VC: Need a test here + if (!(val instanceof GenericRecord)) { + throw new HoodieException("Cannot find a record at part value :" + part); + } + valueNode = (GenericRecord) val; + } + } + } catch (AvroRuntimeException e) { + // Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key) + // rather than return null like the previous version if if record doesn't contain this key. + // So when returnNullIfNotFound is true, catch this exception. + if (!returnNullIfNotFound) { + throw e; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 942a0aa09..5e4b445df 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -89,7 +89,13 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); - return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled)); + return Option.ofNullable( + HoodieAvroUtils.getNestedFieldVal( + record, + properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), + true, + consistentLogicalTimestampEnabled) + ); } @Override @@ -115,30 +121,13 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty( KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); - Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue, - properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled); - Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord, - properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled); + Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue, + properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), + true, consistentLogicalTimestampEnabled); + Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord, + properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), + true, consistentLogicalTimestampEnabled); return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0; } - /** - * a wrapper of HoodieAvroUtils.getNestedFieldVal. - * Within it, catch exceptions and return null when "returnNullIfNotFound" is true and can't take effect. - */ - private static Object getNestedFieldVal( - GenericRecord record, - String fieldName, - boolean returnNullIfNotFound, - boolean consistentLogicalTimestampEnabled) { - try { - return HoodieAvroUtils.getNestedFieldVal(record, fieldName, returnNullIfNotFound, consistentLogicalTimestampEnabled); - } catch (Exception e) { - if (returnNullIfNotFound) { - return null; - } else { - throw e; - } - } - } } diff --git a/pom.xml b/pom.xml index 420da1c5d..8f2152dfa 100644 --- a/pom.xml +++ b/pom.xml @@ -1587,6 +1587,7 @@ 3.1.0 2.4.1 1.12.1 + 1.10.2 ${fasterxml.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version}