From 5448cdde7e45c39b8bf8275d9f3ee425c9bfa90e Mon Sep 17 00:00:00 2001 From: swuferhong <337361684@qq.com> Date: Wed, 11 Aug 2021 10:20:55 +0800 Subject: [PATCH] [HUDI-2170] [HUDI-1763] Always choose the latest record for HoodieRecordPayload (#3401) --- .../bootstrap/BootstrapRecordPayload.java | 2 +- .../MultipleSparkJobExecutionStrategy.java | 1 + .../table/action/commit/SparkWriteHelper.java | 2 +- .../apache/hudi/common/HoodieJsonPayload.java | 2 +- .../model/EmptyHoodieRecordPayload.java | 4 +- .../hudi/common/model/HoodieAvroPayload.java | 6 ++- .../common/model/HoodieRecordPayload.java | 10 ++-- .../model/OverwriteWithLatestAvroPayload.java | 12 +++-- .../log/AbstractHoodieLogRecordScanner.java | 7 ++- .../table/log/HoodieFileSliceReader.java | 6 +-- .../hudi/common/util/SpillableMapUtils.java | 25 ++++++++-- .../metadata/HoodieBackedTableMetadata.java | 8 ++-- .../hudi/metadata/HoodieMetadataPayload.java | 4 ++ .../testutils/AvroBinaryTestPayload.java | 2 +- .../testutils/HoodieTestDataGenerator.java | 4 +- .../common/testutils/RawTripTestPayload.java | 15 ++++-- .../hudi/functional/TestMORDataSource.scala | 46 ++++++++++++++----- 17 files changed, 110 insertions(+), 46 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java index fa508e42f..a60a0d39f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/BootstrapRecordPayload.java @@ -34,7 +34,7 @@ public class BootstrapRecordPayload implements HoodieRecordPayload extends AbstractW return new Tuple2<>(key, record); }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") - T reducedData = (T) rec1.getData().preCombine(rec2.getData()); + T reducedData = (T) rec2.getData().preCombine(rec1.getData()); HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); return new HoodieRecord(reducedKey, reducedData); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java index 1c15c6641..10869fc56 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieJsonPayload.java @@ -50,7 +50,7 @@ public class HoodieJsonPayload implements HoodieRecordPayload } @Override - public HoodieJsonPayload preCombine(HoodieJsonPayload another) { + public HoodieJsonPayload preCombine(HoodieJsonPayload oldValue) { return this; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java index 783422fc6..abcad8d92 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java @@ -36,8 +36,8 @@ public class EmptyHoodieRecordPayload implements HoodieRecordPayload // java serializable private final byte[] recordBytes; + public HoodieAvroPayload(GenericRecord record, Comparable orderingVal) { + this(Option.of(record)); + } + public HoodieAvroPayload(Option record) { if (record.isPresent()) { this.recordBytes = HoodieAvroUtils.avroToBytes(record.get()); @@ -46,7 +50,7 @@ public class HoodieAvroPayload implements HoodieRecordPayload } @Override - public HoodieAvroPayload preCombine(HoodieAvroPayload another) { + public HoodieAvroPayload preCombine(HoodieAvroPayload oldValue) { return this; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 53fcca16d..7ebf9887a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -42,18 +42,20 @@ public interface HoodieRecordPayload extends Seri */ @Deprecated @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) - T preCombine(T another); + T preCombine(T oldValue); /** * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to insert/upsert by taking in a property map. * Implementation can leverage the property to decide their business logic to do preCombine. - * @param another instance of another {@link HoodieRecordPayload} to be combined with. + * + * @param oldValue instance of the old {@link HoodieRecordPayload} to be combined with. * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. + * * @return the combined value */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) - default T preCombine(T another, Properties properties) { - return preCombine(another); + default T preCombine(T oldValue, Properties properties) { + return preCombine(oldValue); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index fe767e164..6e8351cc0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -47,10 +47,14 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload } @Override - public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) { - // pick the payload with greatest ordering value - if (another.orderingVal.compareTo(orderingVal) > 0) { - return another; + public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) { + if (oldValue.recordBytes.length == 0) { + // use natural order for delete record + return this; + } + if (oldValue.orderingVal.compareTo(orderingVal) > 0) { + // pick the payload with greatest ordering value + return oldValue; } else { return this; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java index 79d582126..868c7cb89 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java @@ -83,6 +83,8 @@ public abstract class AbstractHoodieLogRecordScanner { private final HoodieTableMetaClient hoodieTableMetaClient; // Merge strategy to use when combining records from log private final String payloadClassFQN; + // preCombine field + private final String preCombineField; // simple key gen fields private Option> simpleKeyGenFields = Option.empty(); // Log File Paths @@ -123,6 +125,7 @@ public abstract class AbstractHoodieLogRecordScanner { this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); // load class from the payload fully qualified class name this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass(); + this.preCombineField = this.hoodieTableMetaClient.getTableConfig().getPreCombineField(); HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig(); if (!tableConfig.populateMetaFields()) { this.simpleKeyGenFields = Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp())); @@ -316,9 +319,9 @@ public abstract class AbstractHoodieLogRecordScanner { protected HoodieRecord createHoodieRecord(IndexedRecord rec) { if (!simpleKeyGenFields.isPresent()) { - return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.withOperationField); + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.preCombineField, this.withOperationField); } else { - return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.simpleKeyGenFields.get(), this.withOperationField); + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.preCombineField, this.simpleKeyGenFields.get(), this.withOperationField); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index d840565c6..f7a7acfa9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -40,13 +40,13 @@ public class HoodieFileSliceReader implements Ite public static HoodieFileSliceReader getFileSliceReader( HoodieFileReader baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - Option> simpleKeyGenFieldsOpt) throws IOException { + String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { Iterator baseIterator = baseFileReader.getRecordIterator(schema); while (baseIterator.hasNext()) { GenericRecord record = (GenericRecord) baseIterator.next(); HoodieRecord hoodieRecord = simpleKeyGenFieldsOpt.isPresent() - ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField()) - : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, scanner.isWithOperationField()); + ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField()) + : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField()); scanner.processNextRecord(hoodieRecord); } return new HoodieFileSliceReader(scanner.iterator()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index 0a716e0e6..5dd0c5a8b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.BitCaskDiskMap.FileEntry; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieCorruptedDataException; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import java.io.IOException; @@ -113,26 +114,42 @@ public class SpillableMapUtils { /** * Utility method to convert bytes to HoodieRecord using schema and payload class. */ - public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, boolean withOperationField) { - return convertToHoodieRecordPayload(rec, payloadClazz, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField); + public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, String preCombineField, boolean withOperationField) { + return convertToHoodieRecordPayload(rec, payloadClazz, preCombineField, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField); } /** * Utility method to convert bytes to HoodieRecord using schema and payload class. */ public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, - Pair recordKeyPartitionPathPair, + String preCombineField, Pair recordKeyPartitionPathPair, boolean withOperationField) { String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString(); String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString(); + Object preCombineVal = getPreCombineVal(rec, preCombineField); HoodieOperation operation = withOperationField ? HoodieOperation.fromName(getNullableValAsString(rec, HoodieRecord.OPERATION_METADATA_FIELD)) : null; HoodieRecord hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath), - ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.of(rec)}, Option.class), operation); + ReflectionUtils.loadPayload(payloadClazz, new Object[] {rec, preCombineVal}, GenericRecord.class, Comparable.class), operation); return (R) hoodieRecord; } + /** + * Returns the preCombine value with given field name. + * + * @param rec The avro record + * @param preCombineField The preCombine field name + * @return the preCombine field value or 0 if the field does not exist in the avro schema + */ + private static Object getPreCombineVal(GenericRecord rec, String preCombineField) { + if (preCombineField == null) { + return 0; + } + Schema.Field field = rec.getSchema().getField(preCombineField); + return field == null ? 0 : rec.get(field.pos()); + } + /** * Utility method to convert bytes to HoodieRecord using schema and payload class. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 962b8cac8..1f0338368 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -133,11 +133,9 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { Option baseRecord = baseFileReader.getRecordByKey(key); if (baseRecord.isPresent()) { hoodieRecord = tableConfig.populateMetaFields() - ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), false) - : SpillableMapUtils.convertToHoodieRecordPayload( - baseRecord.get(), - tableConfig.getPayloadClass(), - Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()), false); + ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), tableConfig.getPreCombineField(), false) + : SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), tableConfig.getPreCombineField(), + Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()), false); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 9c6eb89b9..16eef8a5f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -70,6 +70,10 @@ public class HoodieMetadataPayload implements HoodieRecordPayload filesystemMetadata = null; + public HoodieMetadataPayload(GenericRecord record, Comparable orderingVal) { + this(Option.of(record)); + } + public HoodieMetadataPayload(Option record) { if (record.isPresent()) { // This can be simplified using SpecificData.deepcopy once this bug is fixed diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/AvroBinaryTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/AvroBinaryTestPayload.java index ff862ee7b..edd1a0536 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/AvroBinaryTestPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/AvroBinaryTestPayload.java @@ -44,7 +44,7 @@ public class AvroBinaryTestPayload implements HoodieRecordPayload { } @Override - public HoodieRecordPayload preCombine(HoodieRecordPayload another) { + public HoodieRecordPayload preCombine(HoodieRecordPayload oldValue) { return this; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 76f64b46b..e4ea18678 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -233,7 +233,7 @@ public class HoodieTestDataGenerator { public static RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException { GenericRecord rec = generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0, true, false); - return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true); + return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0L); } /** @@ -574,7 +574,7 @@ public class HoodieTestDataGenerator { public HoodieRecord generateDeleteRecord(HoodieKey key) throws IOException { RawTripTestPayload payload = - new RawTripTestPayload(Option.empty(), key.getRecordKey(), key.getPartitionPath(), null, true); + new RawTripTestPayload(Option.empty(), key.getRecordKey(), key.getPartitionPath(), null, true, 0L); return new HoodieRecord(key, payload); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java index df450a5ff..8bd10823d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java @@ -53,9 +53,10 @@ public class RawTripTestPayload implements HoodieRecordPayload jsonData, String rowKey, String partitionPath, String schemaStr, - Boolean isDeleted) throws IOException { + Boolean isDeleted, Comparable orderingVal) throws IOException { if (jsonData.isPresent()) { this.jsonDataCompressed = compressData(jsonData.get()); this.dataSize = jsonData.get().length(); @@ -63,10 +64,11 @@ public class RawTripTestPayload implements HoodieRecordPayload 0) { + // pick the payload with greatest ordering value + return oldValue; + } else { + return this; + } } @Override diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index c0909c3f0..8855fb01c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -534,22 +534,39 @@ class TestMORDataSource extends HoodieClientTestBase { @Test def testPreCombineFiledForReadMOR(): Unit = { - writeData((1, "a0",10, 100)) - checkAnswer((1, "a0",10, 100)) + writeData((1, "a0", 10, 100, false)) + checkAnswer((1, "a0", 10, 100, false)) - writeData((1, "a0", 12, 99)) + writeData((1, "a0", 12, 99, false)) // The value has not update, because the version 99 < 100 - checkAnswer((1, "a0",10, 100)) + checkAnswer((1, "a0", 10, 100, false)) - writeData((1, "a0", 12, 101)) + writeData((1, "a0", 12, 101, false)) // The value has update - checkAnswer((1, "a0", 12, 101)) + checkAnswer((1, "a0", 12, 101, false)) + + writeData((1, "a0", 14, 98, false)) + // Latest value should be ignored if preCombine honors ordering + checkAnswer((1, "a0", 12, 101, false)) + + writeData((1, "a0", 16, 97, true)) + // Ordering value will not be honored for a delete record as the payload is sent as empty payload + checkAnswer((1, "a0", 16, 97, true)) + + writeData((1, "a0", 18, 96, false)) + // Ideally, once a record is deleted, preCombine does not kick. So, any new record will be considered valid ignoring + // ordering val. But what happens ini hudi is, all records in log files are reconciled and then merged with base + // file. After reconciling all records from log files, it results in (1, "a0", 18, 96, false) and ths is merged with + // (1, "a0", 10, 100, false) in base file and hence we see (1, "a0", 10, 100, false) as it has higher preComine value. + // the result might differ depending on whether compaction was triggered or not(after record is deleted). In this + // test, no compaction is triggered and hence we see the record from base file. + checkAnswer((1, "a0", 10, 100, false)) } - private def writeData(data: (Int, String, Int, Int)): Unit = { + private def writeData(data: (Int, String, Int, Int, Boolean)): Unit = { val _spark = spark import _spark.implicits._ - val df = Seq(data).toDF("id", "name", "value", "version") + val df = Seq(data).toDF("id", "name", "value", "version", "_hoodie_is_deleted") df.write.format("org.apache.hudi") .options(commonOpts) // use DefaultHoodieRecordPayload here @@ -563,11 +580,18 @@ class TestMORDataSource extends HoodieClientTestBase { .save(basePath) } - private def checkAnswer(expect: (Int, String, Int, Int)): Unit = { + private def checkAnswer(expect: (Int, String, Int, Int, Boolean)): Unit = { val readDf = spark.read.format("org.apache.hudi") .load(basePath + "/*") - val row1 = readDf.select("id", "name", "value", "version").take(1)(0) - assertEquals(Row(expect.productIterator.toSeq: _*), row1) + if (expect._5) { + if (!readDf.isEmpty) { + println("Found df " + readDf.collectAsList().get(0).mkString(",")) + } + assertTrue(readDf.isEmpty) + } else { + val row1 = readDf.select("id", "name", "value", "version", "_hoodie_is_deleted").take(1)(0) + assertEquals(Row(expect.productIterator.toSeq: _*), row1) + } } def verifySchemaAndTypes(df: DataFrame): Unit = {