[HUDI-3592] Fix NPE of DefaultHoodieRecordPayload if Property is empty (#4999)
Co-authored-by: Rex An <bonean131@gmail.com>
This commit is contained in:
@@ -89,10 +89,15 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
|
||||
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty(
|
||||
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
|
||||
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
|
||||
String eventTimeField = properties
|
||||
.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY);
|
||||
if (eventTimeField == null) {
|
||||
return Option.empty();
|
||||
}
|
||||
return Option.ofNullable(
|
||||
HoodieAvroUtils.getNestedFieldVal(
|
||||
record,
|
||||
properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY),
|
||||
eventTimeField,
|
||||
true,
|
||||
consistentLogicalTimestampEnabled)
|
||||
);
|
||||
@@ -118,14 +123,18 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
|
||||
* NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path
|
||||
* and need to be dealt with separately.
|
||||
*/
|
||||
String orderField = properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
|
||||
if (orderField == null) {
|
||||
return true;
|
||||
}
|
||||
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty(
|
||||
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
|
||||
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
|
||||
Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
|
||||
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY),
|
||||
orderField,
|
||||
true, consistentLogicalTimestampEnabled);
|
||||
Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
|
||||
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY),
|
||||
orderField,
|
||||
true, consistentLogicalTimestampEnabled);
|
||||
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
|
||||
}
|
||||
|
||||
@@ -143,6 +143,26 @@ public class TestDefaultHoodieRecordPayload {
|
||||
Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyProperty() throws IOException {
|
||||
GenericRecord record1 = new GenericData.Record(schema);
|
||||
record1.put("id", "1");
|
||||
record1.put("partition", "partition0");
|
||||
record1.put("ts", 0L);
|
||||
record1.put("_hoodie_is_deleted", false);
|
||||
|
||||
GenericRecord record2 = new GenericData.Record(schema);
|
||||
record2.put("id", "1");
|
||||
record2.put("partition", "partition0");
|
||||
record2.put("ts", 1L);
|
||||
record2.put("_hoodie_is_deleted", false);
|
||||
|
||||
DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(Option.of(record1));
|
||||
Properties properties = new Properties();
|
||||
payload.getInsertValue(schema, properties);
|
||||
payload.combineAndGetUpdateValue(record2, schema, properties);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(longs = {1L, 1612542030000L})
|
||||
public void testGetEventTimeInMetadataForInserts(long eventTime) throws IOException {
|
||||
|
||||
Reference in New Issue
Block a user