diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java index 19352edd4..9eb8bcd6c 100644 --- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java +++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java @@ -53,6 +53,14 @@ public class DataSourceUtils { * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c */ public static String getNestedFieldValAsString(GenericRecord record, String fieldName) { + Object obj = getNestedFieldVal(record, fieldName); + return (obj == null) ? null : obj.toString(); + } + + /** + * Obtain value of the provided field, denoted by dot notation. e.g: a.b.c + */ + public static Object getNestedFieldVal(GenericRecord record, String fieldName) { String[] parts = fieldName.split("\\."); GenericRecord valueNode = record; int i = 0; @@ -65,7 +73,7 @@ public class DataSourceUtils { // return, if last part of name if (i == parts.length - 1) { - return val.toString(); + return val; } else { // VC: Need a test here if (!(val instanceof GenericRecord)) { diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index acff58f9b..e00c92315 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -243,7 +243,7 @@ public class HoodieDeltaStreamer implements Serializable { JavaRDD avroRDD = avroRDDOptional.get(); JavaRDD records = avroRDD.map(gr -> { HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) gr.get(cfg.sourceOrderingField)); + (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField)); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); }); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java index f6a0c90e3..b4c26b9be 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/keygen/TimestampBasedKeyGenerator.java @@ -83,7 +83,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { @Override public HoodieKey getKey(GenericRecord record) { - Object partitionVal = record.get(partitionPathField); + Object partitionVal = DataSourceUtils.getNestedFieldVal(record, partitionPathField); SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat); partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT")); @@ -102,7 +102,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { "Unexpected type for partition field: " + partitionVal.getClass().getName()); } - return new HoodieKey(record.get(recordKeyField).toString(), + return new HoodieKey(DataSourceUtils.getNestedFieldValAsString(record, recordKeyField), partitionPathFormat.format(new Date(unixTime * 1000))); } catch (ParseException pe) { throw new HoodieDeltaStreamerException(