[HUDI-2398] Collect event time for inserts in DefaultHoodieRecordPayload (#3602)
This commit is contained in:
@@ -90,9 +90,9 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (useWriterSchema) {
|
if (useWriterSchema) {
|
||||||
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
|
||||||
} else {
|
} else {
|
||||||
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
|
||||||
}
|
}
|
||||||
insertRecordsWritten++;
|
insertRecordsWritten++;
|
||||||
writtenRecordKeys.add(keyToPreWrite);
|
writtenRecordKeys.add(keyToPreWrite);
|
||||||
@@ -112,9 +112,9 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
|
|||||||
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
||||||
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
|
if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
|
||||||
if (useWriterSchema) {
|
if (useWriterSchema) {
|
||||||
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
|
||||||
} else {
|
} else {
|
||||||
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema));
|
writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
|
||||||
}
|
}
|
||||||
insertRecordsWritten++;
|
insertRecordsWritten++;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.model;
|
package org.apache.hudi.common.model;
|
||||||
|
|
||||||
import org.apache.hudi.common.config.HoodieConfig;
|
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
@@ -56,7 +55,7 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
|
|||||||
if (recordBytes.length == 0) {
|
if (recordBytes.length == 0) {
|
||||||
return Option.empty();
|
return Option.empty();
|
||||||
}
|
}
|
||||||
HoodieConfig hoodieConfig = new HoodieConfig(properties);
|
|
||||||
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
|
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
|
||||||
|
|
||||||
// Null check is needed here to support schema evolution. The record in storage may be from old schema where
|
// Null check is needed here to support schema evolution. The record in storage may be from old schema where
|
||||||
@@ -68,17 +67,27 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
|
|||||||
/*
|
/*
|
||||||
* We reached a point where the value is disk is older than the incoming record.
|
* We reached a point where the value is disk is older than the incoming record.
|
||||||
*/
|
*/
|
||||||
eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord, hoodieConfig
|
eventTime = updateEventTime(incomingRecord, properties);
|
||||||
.getString(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true));
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Now check if the incoming record is a delete record.
|
* Now check if the incoming record is a delete record.
|
||||||
*/
|
*/
|
||||||
if (isDeleteRecord(incomingRecord)) {
|
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
|
||||||
return Option.empty();
|
|
||||||
} else {
|
|
||||||
return Option.of(incomingRecord);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
|
||||||
|
if (recordBytes.length == 0) {
|
||||||
|
return Option.empty();
|
||||||
|
}
|
||||||
|
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
|
||||||
|
eventTime = updateEventTime(incomingRecord, properties);
|
||||||
|
|
||||||
|
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Option<Object> updateEventTime(GenericRecord record, Properties properties) {
|
||||||
|
return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -77,8 +77,8 @@ public class TestDefaultHoodieRecordPayload {
|
|||||||
assertEquals(payload1.preCombine(payload2, props), payload2);
|
assertEquals(payload1.preCombine(payload2, props), payload2);
|
||||||
assertEquals(payload2.preCombine(payload1, props), payload2);
|
assertEquals(payload2.preCombine(payload1, props), payload2);
|
||||||
|
|
||||||
assertEquals(record1, payload1.getInsertValue(schema).get());
|
assertEquals(record1, payload1.getInsertValue(schema, props).get());
|
||||||
assertEquals(record2, payload2.getInsertValue(schema).get());
|
assertEquals(record2, payload2.getInsertValue(schema, props).get());
|
||||||
|
|
||||||
assertEquals(payload1.combineAndGetUpdateValue(record2, schema, props).get(), record2);
|
assertEquals(payload1.combineAndGetUpdateValue(record2, schema, props).get(), record2);
|
||||||
assertEquals(payload2.combineAndGetUpdateValue(record1, schema, props).get(), record2);
|
assertEquals(payload2.combineAndGetUpdateValue(record1, schema, props).get(), record2);
|
||||||
@@ -103,8 +103,8 @@ public class TestDefaultHoodieRecordPayload {
|
|||||||
assertEquals(payload1.preCombine(payload2, props), payload2);
|
assertEquals(payload1.preCombine(payload2, props), payload2);
|
||||||
assertEquals(payload2.preCombine(payload1, props), payload2);
|
assertEquals(payload2.preCombine(payload1, props), payload2);
|
||||||
|
|
||||||
assertEquals(record1, payload1.getInsertValue(schema).get());
|
assertEquals(record1, payload1.getInsertValue(schema, props).get());
|
||||||
assertFalse(payload2.getInsertValue(schema).isPresent());
|
assertFalse(payload2.getInsertValue(schema, props).isPresent());
|
||||||
|
|
||||||
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, props).get(), delRecord1);
|
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, props).get(), delRecord1);
|
||||||
assertFalse(payload2.combineAndGetUpdateValue(record1, schema, props).isPresent());
|
assertFalse(payload2.combineAndGetUpdateValue(record1, schema, props).isPresent());
|
||||||
@@ -142,4 +142,20 @@ public class TestDefaultHoodieRecordPayload {
|
|||||||
assertEquals(eventTime,
|
assertEquals(eventTime,
|
||||||
Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
|
Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(longs = {1L, 1612542030000L})
|
||||||
|
public void testGetEventTimeInMetadataForInserts(long eventTime) throws IOException {
|
||||||
|
GenericRecord record = new GenericData.Record(schema);
|
||||||
|
|
||||||
|
record.put("id", "1");
|
||||||
|
record.put("partition", "partition0");
|
||||||
|
record.put("ts", eventTime);
|
||||||
|
record.put("_hoodie_is_deleted", false);
|
||||||
|
DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, eventTime);
|
||||||
|
payload.getInsertValue(schema, props);
|
||||||
|
assertTrue(payload.getMetadata().isPresent());
|
||||||
|
assertEquals(eventTime,
|
||||||
|
Long.parseLong(payload.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user