Support nested types for recordKey, partitionPath and combineKey
This commit is contained in:
committed by
vinoth chandar
parent
e43efa042f
commit
a7e6cf5197
@@ -53,6 +53,14 @@ public class DataSourceUtils {
|
|||||||
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
|
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
|
||||||
*/
|
*/
|
||||||
public static String getNestedFieldValAsString(GenericRecord record, String fieldName) {
|
public static String getNestedFieldValAsString(GenericRecord record, String fieldName) {
|
||||||
|
Object obj = getNestedFieldVal(record, fieldName);
|
||||||
|
return (obj == null) ? null : obj.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
|
||||||
|
*/
|
||||||
|
public static Object getNestedFieldVal(GenericRecord record, String fieldName) {
|
||||||
String[] parts = fieldName.split("\\.");
|
String[] parts = fieldName.split("\\.");
|
||||||
GenericRecord valueNode = record;
|
GenericRecord valueNode = record;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
@@ -65,7 +73,7 @@ public class DataSourceUtils {
|
|||||||
|
|
||||||
// return, if last part of name
|
// return, if last part of name
|
||||||
if (i == parts.length - 1) {
|
if (i == parts.length - 1) {
|
||||||
return val.toString();
|
return val;
|
||||||
} else {
|
} else {
|
||||||
// VC: Need a test here
|
// VC: Need a test here
|
||||||
if (!(val instanceof GenericRecord)) {
|
if (!(val instanceof GenericRecord)) {
|
||||||
|
|||||||
@@ -243,7 +243,7 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
||||||
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
||||||
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
||||||
(Comparable) gr.get(cfg.sourceOrderingField));
|
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField));
|
||||||
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HoodieKey getKey(GenericRecord record) {
|
public HoodieKey getKey(GenericRecord record) {
|
||||||
Object partitionVal = record.get(partitionPathField);
|
Object partitionVal = DataSourceUtils.getNestedFieldVal(record, partitionPathField);
|
||||||
SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat);
|
SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat);
|
||||||
partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
|
partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||||
|
|
||||||
@@ -102,7 +102,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
"Unexpected type for partition field: " + partitionVal.getClass().getName());
|
"Unexpected type for partition field: " + partitionVal.getClass().getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
return new HoodieKey(record.get(recordKeyField).toString(),
|
return new HoodieKey(DataSourceUtils.getNestedFieldValAsString(record, recordKeyField),
|
||||||
partitionPathFormat.format(new Date(unixTime * 1000)));
|
partitionPathFormat.format(new Date(unixTime * 1000)));
|
||||||
} catch (ParseException pe) {
|
} catch (ParseException pe) {
|
||||||
throw new HoodieDeltaStreamerException(
|
throw new HoodieDeltaStreamerException(
|
||||||
|
|||||||
Reference in New Issue
Block a user