[HUDI-406]: added default partition path in TimestampBasedKeyGenerator
This commit is contained in:
committed by
Balaji Varadarajan
parent
2d5b79d96f
commit
8f935e779a
@@ -61,7 +61,7 @@ public class ComplexKeyGenerator extends KeyGenerator {
|
|||||||
boolean keyIsNullEmpty = true;
|
boolean keyIsNullEmpty = true;
|
||||||
StringBuilder recordKey = new StringBuilder();
|
StringBuilder recordKey = new StringBuilder();
|
||||||
for (String recordKeyField : recordKeyFields) {
|
for (String recordKeyField : recordKeyFields) {
|
||||||
String recordKeyValue = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField);
|
String recordKeyValue = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true);
|
||||||
if (recordKeyValue == null) {
|
if (recordKeyValue == null) {
|
||||||
recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ",");
|
recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ",");
|
||||||
} else if (recordKeyValue.isEmpty()) {
|
} else if (recordKeyValue.isEmpty()) {
|
||||||
@@ -79,7 +79,7 @@ public class ComplexKeyGenerator extends KeyGenerator {
|
|||||||
|
|
||||||
StringBuilder partitionPath = new StringBuilder();
|
StringBuilder partitionPath = new StringBuilder();
|
||||||
for (String partitionPathField : partitionPathFields) {
|
for (String partitionPathField : partitionPathFields) {
|
||||||
String fieldVal = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField);
|
String fieldVal = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField, true);
|
||||||
if (fieldVal == null || fieldVal.isEmpty()) {
|
if (fieldVal == null || fieldVal.isEmpty()) {
|
||||||
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH
|
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH
|
||||||
: DEFAULT_PARTITION_PATH);
|
: DEFAULT_PARTITION_PATH);
|
||||||
|
|||||||
@@ -52,29 +52,18 @@ import java.util.stream.Collectors;
|
|||||||
*/
|
*/
|
||||||
public class DataSourceUtils {
|
public class DataSourceUtils {
|
||||||
|
|
||||||
/**
|
|
||||||
* Obtain value of the provided nullable field as string, denoted by dot notation. e.g: a.b.c
|
|
||||||
*/
|
|
||||||
public static String getNullableNestedFieldValAsString(GenericRecord record, String fieldName) {
|
|
||||||
try {
|
|
||||||
return getNestedFieldValAsString(record, fieldName);
|
|
||||||
} catch (HoodieException e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
|
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
|
||||||
*/
|
*/
|
||||||
public static String getNestedFieldValAsString(GenericRecord record, String fieldName) {
|
public static String getNestedFieldValAsString(GenericRecord record, String fieldName, boolean returnNullIfNotFound) {
|
||||||
Object obj = getNestedFieldVal(record, fieldName);
|
Object obj = getNestedFieldVal(record, fieldName, returnNullIfNotFound);
|
||||||
return obj.toString();
|
return (obj == null) ? null : obj.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
|
* Obtain value of the provided field, denoted by dot notation. e.g: a.b.c
|
||||||
*/
|
*/
|
||||||
public static Object getNestedFieldVal(GenericRecord record, String fieldName) {
|
public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound) {
|
||||||
String[] parts = fieldName.split("\\.");
|
String[] parts = fieldName.split("\\.");
|
||||||
GenericRecord valueNode = record;
|
GenericRecord valueNode = record;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
@@ -96,10 +85,15 @@ public class DataSourceUtils {
|
|||||||
valueNode = (GenericRecord) val;
|
valueNode = (GenericRecord) val;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (returnNullIfNotFound) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
throw new HoodieException(
|
throw new HoodieException(
|
||||||
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
|
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
|
||||||
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
|
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a key generator class via reflection, passing in any configs needed.
|
* Create a key generator class via reflection, passing in any configs needed.
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HoodieKey getKey(GenericRecord record) {
|
public HoodieKey getKey(GenericRecord record) {
|
||||||
String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField);
|
String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true);
|
||||||
if (recordKey == null || recordKey.isEmpty()) {
|
if (recordKey == null || recordKey.isEmpty()) {
|
||||||
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
|
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,12 +51,12 @@ public class SimpleKeyGenerator extends KeyGenerator {
|
|||||||
throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
|
throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
|
||||||
}
|
}
|
||||||
|
|
||||||
String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField);
|
String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true);
|
||||||
if (recordKey == null || recordKey.isEmpty()) {
|
if (recordKey == null || recordKey.isEmpty()) {
|
||||||
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
|
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
|
||||||
}
|
}
|
||||||
|
|
||||||
String partitionPath = DataSourceUtils.getNullableNestedFieldValAsString(record, partitionPathField);
|
String partitionPath = DataSourceUtils.getNestedFieldValAsString(record, partitionPathField, true);
|
||||||
if (partitionPath == null || partitionPath.isEmpty()) {
|
if (partitionPath == null || partitionPath.isEmpty()) {
|
||||||
partitionPath = DEFAULT_PARTITION_PATH;
|
partitionPath = DEFAULT_PARTITION_PATH;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -98,7 +98,7 @@ private[hudi] object HoodieSparkSqlWriter {
|
|||||||
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
|
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
|
||||||
val hoodieAllIncomingRecords = genericRecords.map(gr => {
|
val hoodieAllIncomingRecords = genericRecords.map(gr => {
|
||||||
val orderingVal = DataSourceUtils.getNestedFieldValAsString(
|
val orderingVal = DataSourceUtils.getNestedFieldValAsString(
|
||||||
gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
|
gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false).asInstanceOf[Comparable[_]]
|
||||||
DataSourceUtils.createHoodieRecord(gr,
|
DataSourceUtils.createHoodieRecord(gr,
|
||||||
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
|
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
|
||||||
}).toJavaRDD()
|
}).toJavaRDD()
|
||||||
|
|||||||
@@ -313,7 +313,7 @@ public class DeltaSync implements Serializable {
|
|||||||
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
||||||
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
||||||
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
||||||
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField));
|
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
|
||||||
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -81,7 +81,10 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HoodieKey getKey(GenericRecord record) {
|
public HoodieKey getKey(GenericRecord record) {
|
||||||
Object partitionVal = DataSourceUtils.getNestedFieldVal(record, partitionPathField);
|
Object partitionVal = DataSourceUtils.getNestedFieldVal(record, partitionPathField, true);
|
||||||
|
if (partitionVal == null) {
|
||||||
|
partitionVal = 1L;
|
||||||
|
}
|
||||||
SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat);
|
SimpleDateFormat partitionPathFormat = new SimpleDateFormat(outputDateFormat);
|
||||||
partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
|
partitionPathFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
|
||||||
|
|
||||||
@@ -101,7 +104,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
|
|||||||
}
|
}
|
||||||
Date timestamp = this.timestampType == TimestampType.EPOCHMILLISECONDS ? new Date(unixTime) : new Date(unixTime * 1000);
|
Date timestamp = this.timestampType == TimestampType.EPOCHMILLISECONDS ? new Date(unixTime) : new Date(unixTime * 1000);
|
||||||
|
|
||||||
String recordKey = DataSourceUtils.getNullableNestedFieldValAsString(record, recordKeyField);
|
String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true);
|
||||||
if (recordKey == null || recordKey.isEmpty()) {
|
if (recordKey == null || recordKey.isEmpty()) {
|
||||||
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
|
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user