[HUDI-2170] [HUDI-1763] Always choose the latest record for HoodieRecordPayload (#3401)
This commit is contained in:
@@ -34,7 +34,7 @@ public class BootstrapRecordPayload implements HoodieRecordPayload<BootstrapReco
|
||||
}
|
||||
|
||||
@Override
|
||||
public BootstrapRecordPayload preCombine(BootstrapRecordPayload another) {
|
||||
public BootstrapRecordPayload preCombine(BootstrapRecordPayload oldValue) {
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@@ -176,6 +176,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
|
||||
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
|
||||
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
|
||||
tableConfig.getPayloadClass(),
|
||||
tableConfig.getPreCombineField(),
|
||||
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
|
||||
tableConfig.getPartitionFieldProp()))));
|
||||
} catch (IOException e) {
|
||||
|
||||
@@ -58,7 +58,7 @@ public class SparkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractW
|
||||
return new Tuple2<>(key, record);
|
||||
}).reduceByKey((rec1, rec2) -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
|
||||
T reducedData = (T) rec2.getData().preCombine(rec1.getData());
|
||||
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
|
||||
|
||||
return new HoodieRecord<T>(reducedKey, reducedData);
|
||||
|
||||
@@ -50,7 +50,7 @@ public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload>
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieJsonPayload preCombine(HoodieJsonPayload another) {
|
||||
public HoodieJsonPayload preCombine(HoodieJsonPayload oldValue) {
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@@ -36,8 +36,8 @@ public class EmptyHoodieRecordPayload implements HoodieRecordPayload<EmptyHoodie
|
||||
}
|
||||
|
||||
@Override
|
||||
public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload another) {
|
||||
return another;
|
||||
public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload oldValue) {
|
||||
return oldValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -37,6 +37,10 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
|
||||
// java serializable
|
||||
private final byte[] recordBytes;
|
||||
|
||||
public HoodieAvroPayload(GenericRecord record, Comparable<?> orderingVal) {
|
||||
this(Option.of(record));
|
||||
}
|
||||
|
||||
public HoodieAvroPayload(Option<GenericRecord> record) {
|
||||
if (record.isPresent()) {
|
||||
this.recordBytes = HoodieAvroUtils.avroToBytes(record.get());
|
||||
@@ -46,7 +50,7 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieAvroPayload preCombine(HoodieAvroPayload another) {
|
||||
public HoodieAvroPayload preCombine(HoodieAvroPayload oldValue) {
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@@ -42,18 +42,20 @@ public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Seri
|
||||
*/
|
||||
@Deprecated
|
||||
@PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
|
||||
T preCombine(T another);
|
||||
T preCombine(T oldValue);
|
||||
|
||||
/**
|
||||
* When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to insert/upsert by taking in a property map.
|
||||
* Implementation can leverage the property to decide their business logic to do preCombine.
|
||||
* @param another instance of another {@link HoodieRecordPayload} to be combined with.
|
||||
*
|
||||
* @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with.
|
||||
* @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.
|
||||
*
|
||||
* @return the combined value
|
||||
*/
|
||||
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
|
||||
default T preCombine(T another, Properties properties) {
|
||||
return preCombine(another);
|
||||
default T preCombine(T oldValue, Properties properties) {
|
||||
return preCombine(oldValue);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -47,10 +47,14 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
|
||||
}
|
||||
|
||||
@Override
|
||||
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) {
|
||||
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) {
|
||||
if (oldValue.recordBytes.length == 0) {
|
||||
// use natural order for delete record
|
||||
return this;
|
||||
}
|
||||
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
|
||||
// pick the payload with greatest ordering value
|
||||
if (another.orderingVal.compareTo(orderingVal) > 0) {
|
||||
return another;
|
||||
return oldValue;
|
||||
} else {
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -83,6 +83,8 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
private final HoodieTableMetaClient hoodieTableMetaClient;
|
||||
// Merge strategy to use when combining records from log
|
||||
private final String payloadClassFQN;
|
||||
// preCombine field
|
||||
private final String preCombineField;
|
||||
// simple key gen fields
|
||||
private Option<Pair<String, String>> simpleKeyGenFields = Option.empty();
|
||||
// Log File Paths
|
||||
@@ -123,6 +125,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
|
||||
// load class from the payload fully qualified class name
|
||||
this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass();
|
||||
this.preCombineField = this.hoodieTableMetaClient.getTableConfig().getPreCombineField();
|
||||
HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig();
|
||||
if (!tableConfig.populateMetaFields()) {
|
||||
this.simpleKeyGenFields = Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
|
||||
@@ -316,9 +319,9 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
|
||||
protected HoodieRecord<?> createHoodieRecord(IndexedRecord rec) {
|
||||
if (!simpleKeyGenFields.isPresent()) {
|
||||
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.withOperationField);
|
||||
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.preCombineField, this.withOperationField);
|
||||
} else {
|
||||
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get(), this.withOperationField);
|
||||
return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.preCombineField, this.simpleKeyGenFields.get(), this.withOperationField);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -40,13 +40,13 @@ public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Ite
|
||||
|
||||
public static <R extends IndexedRecord, T> HoodieFileSliceReader getFileSliceReader(
|
||||
HoodieFileReader<R> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
|
||||
Option<Pair<String,String>> simpleKeyGenFieldsOpt) throws IOException {
|
||||
String preCombineField, Option<Pair<String,String>> simpleKeyGenFieldsOpt) throws IOException {
|
||||
Iterator<R> baseIterator = baseFileReader.getRecordIterator(schema);
|
||||
while (baseIterator.hasNext()) {
|
||||
GenericRecord record = (GenericRecord) baseIterator.next();
|
||||
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = simpleKeyGenFieldsOpt.isPresent()
|
||||
? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
|
||||
: SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, scanner.isWithOperationField());
|
||||
? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
|
||||
: SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField());
|
||||
scanner.processNextRecord(hoodieRecord);
|
||||
}
|
||||
return new HoodieFileSliceReader(scanner.iterator());
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieCorruptedDataException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -113,26 +114,42 @@ public class SpillableMapUtils {
|
||||
/**
|
||||
* Utility method to convert bytes to HoodieRecord using schema and payload class.
|
||||
*/
|
||||
public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, boolean withOperationField) {
|
||||
return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField);
|
||||
public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, String preCombineField, boolean withOperationField) {
|
||||
return convertToHoodieRecordPayload(rec, payloadClazz, preCombineField, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to convert bytes to HoodieRecord using schema and payload class.
|
||||
*/
|
||||
public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz,
|
||||
Pair<String, String> recordKeyPartitionPathPair,
|
||||
String preCombineField, Pair<String, String> recordKeyPartitionPathPair,
|
||||
boolean withOperationField) {
|
||||
String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString();
|
||||
String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString();
|
||||
Object preCombineVal = getPreCombineVal(rec, preCombineField);
|
||||
HoodieOperation operation = withOperationField
|
||||
? HoodieOperation.fromName(getNullableValAsString(rec, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
|
||||
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath),
|
||||
ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class), operation);
|
||||
ReflectionUtils.loadPayload(payloadClazz, new Object[] {rec, preCombineVal}, GenericRecord.class, Comparable.class), operation);
|
||||
|
||||
return (R) hoodieRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the preCombine value with given field name.
|
||||
*
|
||||
* @param rec The avro record
|
||||
* @param preCombineField The preCombine field name
|
||||
* @return the preCombine field value or 0 if the field does not exist in the avro schema
|
||||
*/
|
||||
private static Object getPreCombineVal(GenericRecord rec, String preCombineField) {
|
||||
if (preCombineField == null) {
|
||||
return 0;
|
||||
}
|
||||
Schema.Field field = rec.getSchema().getField(preCombineField);
|
||||
return field == null ? 0 : rec.get(field.pos());
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to convert bytes to HoodieRecord using schema and payload class.
|
||||
*/
|
||||
|
||||
@@ -133,10 +133,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
|
||||
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
|
||||
if (baseRecord.isPresent()) {
|
||||
hoodieRecord = tableConfig.populateMetaFields()
|
||||
? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), false)
|
||||
: SpillableMapUtils.convertToHoodieRecordPayload(
|
||||
baseRecord.get(),
|
||||
tableConfig.getPayloadClass(),
|
||||
? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), tableConfig.getPreCombineField(), false)
|
||||
: SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), tableConfig.getPreCombineField(),
|
||||
Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()), false);
|
||||
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
|
||||
}
|
||||
|
||||
@@ -70,6 +70,10 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
|
||||
private int type = 0;
|
||||
private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
|
||||
|
||||
public HoodieMetadataPayload(GenericRecord record, Comparable<?> orderingVal) {
|
||||
this(Option.of(record));
|
||||
}
|
||||
|
||||
public HoodieMetadataPayload(Option<GenericRecord> record) {
|
||||
if (record.isPresent()) {
|
||||
// This can be simplified using SpecificData.deepcopy once this bug is fixed
|
||||
|
||||
@@ -44,7 +44,7 @@ public class AvroBinaryTestPayload implements HoodieRecordPayload {
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieRecordPayload preCombine(HoodieRecordPayload another) {
|
||||
public HoodieRecordPayload preCombine(HoodieRecordPayload oldValue) {
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@@ -233,7 +233,7 @@ public class HoodieTestDataGenerator {
|
||||
public static RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException {
|
||||
GenericRecord rec = generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0,
|
||||
true, false);
|
||||
return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true);
|
||||
return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -574,7 +574,7 @@ public class HoodieTestDataGenerator {
|
||||
|
||||
public HoodieRecord generateDeleteRecord(HoodieKey key) throws IOException {
|
||||
RawTripTestPayload payload =
|
||||
new RawTripTestPayload(Option.empty(), key.getRecordKey(), key.getPartitionPath(), null, true);
|
||||
new RawTripTestPayload(Option.empty(), key.getRecordKey(), key.getPartitionPath(), null, true, 0L);
|
||||
return new HoodieRecord(key, payload);
|
||||
}
|
||||
|
||||
|
||||
@@ -53,9 +53,10 @@ public class RawTripTestPayload implements HoodieRecordPayload<RawTripTestPayloa
|
||||
private byte[] jsonDataCompressed;
|
||||
private int dataSize;
|
||||
private boolean isDeleted;
|
||||
private Comparable orderingVal;
|
||||
|
||||
public RawTripTestPayload(Option<String> jsonData, String rowKey, String partitionPath, String schemaStr,
|
||||
Boolean isDeleted) throws IOException {
|
||||
Boolean isDeleted, Comparable orderingVal) throws IOException {
|
||||
if (jsonData.isPresent()) {
|
||||
this.jsonDataCompressed = compressData(jsonData.get());
|
||||
this.dataSize = jsonData.get().length();
|
||||
@@ -63,10 +64,11 @@ public class RawTripTestPayload implements HoodieRecordPayload<RawTripTestPayloa
|
||||
this.rowKey = rowKey;
|
||||
this.partitionPath = partitionPath;
|
||||
this.isDeleted = isDeleted;
|
||||
this.orderingVal = orderingVal;
|
||||
}
|
||||
|
||||
public RawTripTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException {
|
||||
this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false);
|
||||
this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
|
||||
}
|
||||
|
||||
public RawTripTestPayload(String jsonData) throws IOException {
|
||||
@@ -105,8 +107,13 @@ public class RawTripTestPayload implements HoodieRecordPayload<RawTripTestPayloa
|
||||
}
|
||||
|
||||
@Override
|
||||
public RawTripTestPayload preCombine(RawTripTestPayload another) {
|
||||
return another;
|
||||
public RawTripTestPayload preCombine(RawTripTestPayload oldValue) {
|
||||
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
|
||||
// pick the payload with greatest ordering value
|
||||
return oldValue;
|
||||
} else {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -534,22 +534,39 @@ class TestMORDataSource extends HoodieClientTestBase {
|
||||
|
||||
@Test
|
||||
def testPreCombineFiledForReadMOR(): Unit = {
|
||||
writeData((1, "a0",10, 100))
|
||||
checkAnswer((1, "a0",10, 100))
|
||||
writeData((1, "a0", 10, 100, false))
|
||||
checkAnswer((1, "a0", 10, 100, false))
|
||||
|
||||
writeData((1, "a0", 12, 99))
|
||||
writeData((1, "a0", 12, 99, false))
|
||||
// The value has not update, because the version 99 < 100
|
||||
checkAnswer((1, "a0",10, 100))
|
||||
checkAnswer((1, "a0", 10, 100, false))
|
||||
|
||||
writeData((1, "a0", 12, 101))
|
||||
writeData((1, "a0", 12, 101, false))
|
||||
// The value has update
|
||||
checkAnswer((1, "a0", 12, 101))
|
||||
checkAnswer((1, "a0", 12, 101, false))
|
||||
|
||||
writeData((1, "a0", 14, 98, false))
|
||||
// Latest value should be ignored if preCombine honors ordering
|
||||
checkAnswer((1, "a0", 12, 101, false))
|
||||
|
||||
writeData((1, "a0", 16, 97, true))
|
||||
// Ordering value will not be honored for a delete record as the payload is sent as empty payload
|
||||
checkAnswer((1, "a0", 16, 97, true))
|
||||
|
||||
writeData((1, "a0", 18, 96, false))
|
||||
// Ideally, once a record is deleted, preCombine does not kick. So, any new record will be considered valid ignoring
|
||||
// ordering val. But what happens ini hudi is, all records in log files are reconciled and then merged with base
|
||||
// file. After reconciling all records from log files, it results in (1, "a0", 18, 96, false) and ths is merged with
|
||||
// (1, "a0", 10, 100, false) in base file and hence we see (1, "a0", 10, 100, false) as it has higher preComine value.
|
||||
// the result might differ depending on whether compaction was triggered or not(after record is deleted). In this
|
||||
// test, no compaction is triggered and hence we see the record from base file.
|
||||
checkAnswer((1, "a0", 10, 100, false))
|
||||
}
|
||||
|
||||
private def writeData(data: (Int, String, Int, Int)): Unit = {
|
||||
private def writeData(data: (Int, String, Int, Int, Boolean)): Unit = {
|
||||
val _spark = spark
|
||||
import _spark.implicits._
|
||||
val df = Seq(data).toDF("id", "name", "value", "version")
|
||||
val df = Seq(data).toDF("id", "name", "value", "version", "_hoodie_is_deleted")
|
||||
df.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
// use DefaultHoodieRecordPayload here
|
||||
@@ -563,12 +580,19 @@ class TestMORDataSource extends HoodieClientTestBase {
|
||||
.save(basePath)
|
||||
}
|
||||
|
||||
private def checkAnswer(expect: (Int, String, Int, Int)): Unit = {
|
||||
private def checkAnswer(expect: (Int, String, Int, Int, Boolean)): Unit = {
|
||||
val readDf = spark.read.format("org.apache.hudi")
|
||||
.load(basePath + "/*")
|
||||
val row1 = readDf.select("id", "name", "value", "version").take(1)(0)
|
||||
if (expect._5) {
|
||||
if (!readDf.isEmpty) {
|
||||
println("Found df " + readDf.collectAsList().get(0).mkString(","))
|
||||
}
|
||||
assertTrue(readDf.isEmpty)
|
||||
} else {
|
||||
val row1 = readDf.select("id", "name", "value", "version", "_hoodie_is_deleted").take(1)(0)
|
||||
assertEquals(Row(expect.productIterator.toSeq: _*), row1)
|
||||
}
|
||||
}
|
||||
|
||||
def verifySchemaAndTypes(df: DataFrame): Unit = {
|
||||
assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
|
||||
|
||||
Reference in New Issue
Block a user