From 4f991ee3525c6225c7bf3b46e272f7d5b919196e Mon Sep 17 00:00:00 2001 From: Ankush Kanungo <40214578+akanungoz@users.noreply.github.com> Date: Sat, 11 Sep 2021 20:27:40 -0700 Subject: [PATCH] [HUDI-2398] Collect event time for inserts in DefaultHoodieRecordPayload (#3602) --- .../hudi/io/HoodieSortedMergeHandle.java | 8 +++---- .../model/DefaultHoodieRecordPayload.java | 23 ++++++++++++------ .../model/TestDefaultHoodieRecordPayload.java | 24 +++++++++++++++---- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java index 763178dbf..606e63a34 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java @@ -90,9 +90,9 @@ public class HoodieSortedMergeHandle ext } try { if (useWriterSchema) { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields)); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); } else { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema)); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps())); } insertRecordsWritten++; writtenRecordKeys.add(keyToPreWrite); @@ -112,9 +112,9 @@ public class HoodieSortedMergeHandle ext HoodieRecord hoodieRecord = keyToNewRecords.get(key); if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { if (useWriterSchema) { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields)); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps())); } else { - writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema)); + writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps())); } insertRecordsWritten++; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 86ccf673e..76474fde6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.model; -import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.util.Option; import org.apache.avro.Schema; @@ -56,7 +55,7 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { if (recordBytes.length == 0) { return Option.empty(); } - HoodieConfig hoodieConfig = new HoodieConfig(properties); + GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); // Null check is needed here to support schema evolution. The record in storage may be from old schema where @@ -68,17 +67,27 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { /* * We reached a point where the value is disk is older than the incoming record. */ - eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord, hoodieConfig - .getString(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true)); + eventTime = updateEventTime(incomingRecord, properties); /* * Now check if the incoming record is a delete record. */ - if (isDeleteRecord(incomingRecord)) { + return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + } + + @Override + public Option getInsertValue(Schema schema, Properties properties) throws IOException { + if (recordBytes.length == 0) { return Option.empty(); - } else { - return Option.of(incomingRecord); } + GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); + eventTime = updateEventTime(incomingRecord, properties); + + return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord); + } + + private static Option updateEventTime(GenericRecord record, Properties properties) { + return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true)); } @Override diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java index 5be0961ca..87d4e746d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java @@ -77,8 +77,8 @@ public class TestDefaultHoodieRecordPayload { assertEquals(payload1.preCombine(payload2, props), payload2); assertEquals(payload2.preCombine(payload1, props), payload2); - assertEquals(record1, payload1.getInsertValue(schema).get()); - assertEquals(record2, payload2.getInsertValue(schema).get()); + assertEquals(record1, payload1.getInsertValue(schema, props).get()); + assertEquals(record2, payload2.getInsertValue(schema, props).get()); assertEquals(payload1.combineAndGetUpdateValue(record2, schema, props).get(), record2); assertEquals(payload2.combineAndGetUpdateValue(record1, schema, props).get(), record2); @@ -103,8 +103,8 @@ public class TestDefaultHoodieRecordPayload { assertEquals(payload1.preCombine(payload2, props), payload2); assertEquals(payload2.preCombine(payload1, props), payload2); - assertEquals(record1, payload1.getInsertValue(schema).get()); - assertFalse(payload2.getInsertValue(schema).isPresent()); + assertEquals(record1, payload1.getInsertValue(schema, props).get()); + assertFalse(payload2.getInsertValue(schema, props).isPresent()); assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, props).get(), delRecord1); assertFalse(payload2.combineAndGetUpdateValue(record1, schema, props).isPresent()); @@ -142,4 +142,20 @@ public class TestDefaultHoodieRecordPayload { assertEquals(eventTime, Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY))); } + + @ParameterizedTest + @ValueSource(longs = {1L, 1612542030000L}) + public void testGetEventTimeInMetadataForInserts(long eventTime) throws IOException { + GenericRecord record = new GenericData.Record(schema); + + record.put("id", "1"); + record.put("partition", "partition0"); + record.put("ts", eventTime); + record.put("_hoodie_is_deleted", false); + DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, eventTime); + payload.getInsertValue(schema, props); + assertTrue(payload.getMetadata().isPresent()); + assertEquals(eventTime, + Long.parseLong(payload.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY))); + } }