diff --git a/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java index a2a70a12b..6460b5e9c 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/ComplexKeyGenerator.java @@ -61,7 +61,7 @@ public class ComplexKeyGenerator extends KeyGenerator { boolean keyIsNullEmpty = true; StringBuilder recordKey = new StringBuilder(); for (String recordKeyField : recordKeyFields) { - String recordKeyValue = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField); + String recordKeyValue = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); if (recordKeyValue == null) { recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ","); } else if (recordKeyValue.isEmpty()) { @@ -79,7 +79,7 @@ public class ComplexKeyGenerator extends KeyGenerator { StringBuilder partitionPath = new StringBuilder(); for (String partitionPathField : partitionPathFields) { - String fieldVal = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField); + String fieldVal = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField, true); if (fieldVal == null || fieldVal.isEmpty()) { partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH : DEFAULT_PARTITION_PATH); diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index b06a9ae42..2cee24c17 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -52,29 +52,18 @@ import java.util.stream.Collectors; */ public class DataSourceUtils { - /** - * Obtain value of the provided nullable field as string, denoted by dot notation. e.g: a.b.c - */ - public static String getNullableNestedFieldValAsString(GenericRecord record, String fieldName) { - try { - return getNestedFieldValAsString(record, fieldName); - } catch (HoodieException e) { - return null; - } - } - /** * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c */ - public static String getNestedFieldValAsString(GenericRecord record, String fieldName) { - Object obj = getNestedFieldVal(record, fieldName); - return obj.toString(); + public static String getNestedFieldValAsString(GenericRecord record, String fieldName, boolean returnNullIfNotFound) { + Object obj = getNestedFieldVal(record, fieldName, returnNullIfNotFound); + return (obj == null) ? null : obj.toString(); } /** * Obtain value of the provided field, denoted by dot notation. e.g: a.b.c */ - public static Object getNestedFieldVal(GenericRecord record, String fieldName) { + public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound) { String[] parts = fieldName.split("\\."); GenericRecord valueNode = record; int i = 0; @@ -96,9 +85,14 @@ public class DataSourceUtils { valueNode = (GenericRecord) val; } } - throw new HoodieException( + + if (returnNullIfNotFound) { + return null; + } else { + throw new HoodieException( fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :" - + valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList())); + + valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList())); + } } /** diff --git a/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java index 35829a1f4..e022d926c 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/NonpartitionedKeyGenerator.java @@ -37,7 +37,7 @@ public class NonpartitionedKeyGenerator extends SimpleKeyGenerator { @Override public HoodieKey getKey(GenericRecord record) { - String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField); + String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); if (recordKey == null || recordKey.isEmpty()) { throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java index 02f41e35b..f93562015 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java +++ b/hudi-spark/src/main/java/org/apache/hudi/SimpleKeyGenerator.java @@ -51,12 +51,12 @@ public class SimpleKeyGenerator extends KeyGenerator { throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); } - String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField); + String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); if (recordKey == null || recordKey.isEmpty()) { throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); } - String partitionPath = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField); + String partitionPath = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField, true); if (partitionPath == null || partitionPath.isEmpty()) { partitionPath = DEFAULT_PARTITION_PATH; } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 62bdd198b..cc01c1399 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -98,7 +98,7 @@ private[hudi] object HoodieSparkSqlWriter { val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) val hoodieAllIncomingRecords = genericRecords.map(gr => { val orderingVal = DataSourceUtils.getNestedFieldValAsString( - gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] + gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false).asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) }).toJavaRDD() diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index a001323d2..9435844e3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -313,7 +313,7 @@ public class DeltaSync implements Serializable { JavaRDD avroRDD = avroRDDOptional.get(); JavaRDD records = avroRDD.map(gr -> { HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField)); + (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); }); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java index 7964a45da..6d3a6e3c7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java @@ -81,7 +81,10 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { @Override public HoodieKey getKey(GenericRecord record) { - Object partitionVal = DataSourceUtils.getNestedFieldVal(record, partitionPathField); + Object partitionVal = DataSourceUtils.getNestedFieldVal(record, partitionPathField, true); + if (partitionVal == null) { + partitionVal = 1L; + } SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat); partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT")); @@ -97,11 +100,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { unixTime = inputDateFormat.parse(partitionVal.toString()).getTime() / 1000; } else { throw new HoodieNotSupportedException( - "Unexpected type for partition field: " + partitionVal.getClass().getName()); + "Unexpected type for partition field: " + partitionVal.getClass().getName()); } Date timestamp = this.timestampType == TimestampType.EPOCHMILLISECONDS ? new Date(unixTime) : new Date(unixTime * 1000); - String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField); + String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true); if (recordKey == null || recordKey.isEmpty()) { throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); }