From 6df14f15a3761580085d38bffceca9d19c7ad24e Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 1 Apr 2022 20:46:51 +0800 Subject: [PATCH] [HUDI-2752] The MOR DELETE block breaks the event time sequence of CDC (#4880) --- .../apache/hudi/io/HoodieAppendHandle.java | 18 ++- .../HoodieBackedTableMetadataWriter.java | 4 +- .../table/action/commit/FlinkWriteHelper.java | 2 +- .../hudi/common/model/DeleteRecord.java | 107 +++++++++++++ .../hudi/common/model/HoodieAvroPayload.java | 16 +- .../common/model/HoodieRecordPayload.java | 12 ++ .../model/OverwriteWithLatestAvroPayload.java | 5 + .../log/AbstractHoodieLogRecordReader.java | 10 +- .../log/HoodieMergedLogRecordScanner.java | 30 +++- .../log/HoodieUnMergedLogRecordScanner.java | 8 +- .../table/log/block/HoodieDeleteBlock.java | 32 ++-- .../table/log/block/HoodieLogBlock.java | 2 +- .../hudi/common/util/ReflectionUtils.java | 7 + .../hudi/common/util/SpillableMapUtils.java | 4 +- .../HoodieMetadataMergedLogRecordReader.java | 8 +- .../functional/TestHoodieLogFormat.java | 145 ++++++++++++++++-- ...a => TestGlobalDeleteRecordGenerator.java} | 2 +- .../hudi/functional/TestMORDataSource.scala | 15 +- 18 files changed, 356 insertions(+), 71 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java rename hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/{TestGlobalDeleteKeyGenerator.java => TestGlobalDeleteRecordGenerator.java} (97%) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index f4aa73498..efae4584a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -23,10 +23,10 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieDeltaWriteStat; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodiePartitionMetadata; @@ -92,7 +92,7 @@ public class HoodieAppendHandle extends // Buffer for holding records in memory before they are flushed to disk private final List recordList = new ArrayList<>(); // Buffer for holding records (to be deleted) in memory before they are flushed to disk - private final List keysToDelete = new ArrayList<>(); + private final List recordsToDelete = new ArrayList<>(); // Incoming records to be written to logs. protected Iterator> recordItr; // Writer to log into the file group's latest slice. @@ -402,15 +402,15 @@ public class HoodieAppendHandle extends blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, header, keyField)); } - if (keysToDelete.size() > 0) { - blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header)); + if (recordsToDelete.size() > 0) { + blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new DeleteRecord[0]), header)); } if (blocks.size() > 0) { AppendResult appendResult = writer.appendBlocks(blocks); processAppendResult(appendResult, recordList); recordList.clear(); - keysToDelete.clear(); + recordsToDelete.clear(); } } catch (Exception e) { throw new HoodieAppendException("Failed while appending records to " + writer.getLogFile().getPath(), e); @@ -472,7 +472,7 @@ public class HoodieAppendHandle extends } private Writer createLogWriter(Option fileSlice, String baseCommitTime) - throws IOException, InterruptedException { + throws IOException { Option latestLogFile = fileSlice.get().getLatestLogFile(); return HoodieLogFormat.newWriterBuilder() @@ -507,14 +507,16 @@ public class HoodieAppendHandle extends record.setNewLocation(new HoodieRecordLocation(instantTime, fileId)); record.seal(); } + // fetch the ordering val first in case the record was deflated. + final Comparable orderingVal = record.getData().getOrderingValue(); Option indexedRecord = getIndexedRecord(record); if (indexedRecord.isPresent()) { - // Skip the Ignore Record. + // Skip the ignored record. if (!indexedRecord.get().equals(IGNORE_RECORD)) { recordList.add(indexedRecord.get()); } } else { - keysToDelete.add(record.getKey()); + recordsToDelete.add(DeleteRecord.create(record.getKey(), orderingVal)); } numberOfRecords++; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index abed2f78b..1a069c465 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -31,12 +31,12 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -656,7 +656,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // Archival of data table has a dependency on compaction(base files) in metadata table. // It is assumed that as of time Tx of base instant (/compaction time) in metadata table, // all commits in data table is in sync with metadata table. So, we always start with log file for any fileGroup. - final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0], blockHeader); + final HoodieDeleteBlock block = new HoodieDeleteBlock(new DeleteRecord[0], blockHeader); LOG.info(String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s", fileGroupCount, metadataPartition.getPartitionPath(), metadataPartition.getFileIdPrefix(), instantTime)); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 66723a3fc..9c17e77b9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -102,7 +102,7 @@ public class FlinkWriteHelper extends BaseWrit // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. - boolean choosePrev = data1.equals(reducedData); + boolean choosePrev = data1 == reducedData; HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey(); HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation(); HoodieRecord hoodieRecord = new HoodieAvroRecord<>(reducedKey, reducedData, operation); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java new file mode 100644 index 000000000..003b591c2 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Delete record is a combination of HoodieKey and ordering value. + * The record is used for {@link org.apache.hudi.common.table.log.block.HoodieDeleteBlock} + * to support per-record deletions. The deletion block is always appended after the data block, + * we need to keep the ordering val to combine with the data records when merging, or the data loss + * may occur if there are intermediate deletions for the inputs + * (a new INSERT comes after a DELETE in one input batch). + */ +public class DeleteRecord implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * The record key and partition path. + */ + private final HoodieKey hoodieKey; + + /** + * For purposes of preCombining. + */ + private final Comparable orderingVal; + + private DeleteRecord(HoodieKey hoodieKey, Comparable orderingVal) { + this.hoodieKey = hoodieKey; + this.orderingVal = orderingVal; + } + + public static DeleteRecord create(HoodieKey hoodieKey) { + return create(hoodieKey, 0); + } + + public static DeleteRecord create(String recordKey, String partitionPath) { + return create(recordKey, partitionPath, 0); + } + + public static DeleteRecord create(String recordKey, String partitionPath, Comparable orderingVal) { + return create(new HoodieKey(recordKey, partitionPath), orderingVal); + } + + public static DeleteRecord create(HoodieKey hoodieKey, Comparable orderingVal) { + return new DeleteRecord(hoodieKey, orderingVal); + } + + public String getRecordKey() { + return hoodieKey.getRecordKey(); + } + + public String getPartitionPath() { + return hoodieKey.getPartitionPath(); + } + + public HoodieKey getHoodieKey() { + return hoodieKey; + } + + public Comparable getOrderingValue() { + return orderingVal; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof DeleteRecord)) { + return false; + } + DeleteRecord that = (DeleteRecord) o; + return this.hoodieKey.equals(that.hoodieKey) && this.orderingVal.equals(that.orderingVal); + } + + @Override + public int hashCode() { + return Objects.hash(this.hoodieKey, this.orderingVal); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("DeleteRecord {"); + sb.append(" key=").append(hoodieKey); + sb.append(" orderingVal=").append(this.orderingVal); + sb.append('}'); + return sb.toString(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java index 04a873c98..3fbcb8a62 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java @@ -36,17 +36,16 @@ public class HoodieAvroPayload implements HoodieRecordPayload // Store the GenericRecord converted to bytes - 1) Doesn't store schema hence memory efficient 2) Makes the payload // java serializable private final byte[] recordBytes; + private final Comparable orderingVal; public HoodieAvroPayload(GenericRecord record, Comparable orderingVal) { - this(Option.of(record)); + this.recordBytes = record == null ? new byte[0] : HoodieAvroUtils.avroToBytes(record); + this.orderingVal = orderingVal; } public HoodieAvroPayload(Option record) { - if (record.isPresent()) { - this.recordBytes = HoodieAvroUtils.avroToBytes(record.get()); - } else { - this.recordBytes = new byte[0]; - } + this.recordBytes = record.isPresent() ? HoodieAvroUtils.avroToBytes(record.get()) : new byte[0]; + this.orderingVal = 0; } @Override @@ -71,4 +70,9 @@ public class HoodieAvroPayload implements HoodieRecordPayload public byte[] getRecordBytes() { return recordBytes; } + + @Override + public Comparable getOrderingValue() { + return orderingVal; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java index 36dd30b65..6752607d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java @@ -114,4 +114,16 @@ public interface HoodieRecordPayload extends Seri default Option> getMetadata() { return Option.empty(); } + + /** + * This method can be used to extract the ordering value of the payload for combining/merging, + * or 0 if no value is specified which means natural order(arrival time is used). + * + * @return the ordering value + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + default Comparable getOrderingValue() { + // default natural order + return 0; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 7b7bd6c6b..d8469ed5a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -105,4 +105,9 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload } return Objects.equals(value, defaultValue); } + + @Override + public Comparable getOrderingValue() { + return this.orderingVal; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 6a0b10fe0..93677e76b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -18,8 +18,8 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -404,11 +404,11 @@ public abstract class AbstractHoodieLogRecordReader { protected abstract void processNextRecord(HoodieRecord hoodieRecord) throws Exception; /** - * Process next deleted key. + * Process next deleted record. * - * @param key Deleted record key + * @param deleteRecord Deleted record(hoodie key and ordering value) */ - protected abstract void processNextDeletedKey(HoodieKey key); + protected abstract void processNextDeletedRecord(DeleteRecord deleteRecord); /** * Process the set of log blocks belonging to the last instant which is read fully. @@ -433,7 +433,7 @@ public abstract class AbstractHoodieLogRecordReader { processDataBlock((HoodieParquetDataBlock) lastBlock, keys); break; case DELETE_BLOCK: - Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey); + Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord); break; case CORRUPT_BLOCK: LOG.warn("Found a corrupt block which was not rolled back"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 882e1057c..54453946d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; @@ -28,6 +29,7 @@ import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.exception.HoodieIOException; @@ -135,7 +137,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader String key = hoodieRecord.getRecordKey(); if (records.containsKey(key)) { // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be - // done when a delete (empty payload) is encountered before or after an insert/update. + // done when a DELETE (empty payload) is encountered before or after an insert/update. HoodieRecord oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); @@ -152,9 +154,29 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader } @Override - protected void processNextDeletedKey(HoodieKey hoodieKey) { - records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(), - hoodieKey.getPartitionPath(), getPayloadClassFQN())); + protected void processNextDeletedRecord(DeleteRecord deleteRecord) { + String key = deleteRecord.getRecordKey(); + HoodieRecord oldRecord = records.get(key); + if (oldRecord != null) { + // Merge and store the merged record. The ordering val is taken to decide whether the same key record + // should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val. + // For same ordering values, uses the natural order(arrival time semantics). + + Comparable curOrderingVal = oldRecord.getData().getOrderingValue(); + Comparable deleteOrderingVal = deleteRecord.getOrderingValue(); + // Checks the ordering value does not equal to 0 + // because we use 0 as the default value which means natural order + boolean choosePrev = !deleteOrderingVal.equals(0) + && ReflectionUtils.isSameClass(curOrderingVal, deleteOrderingVal) + && curOrderingVal.compareTo(deleteOrderingVal) > 0; + if (choosePrev) { + // The DELETE message is obsolete if the old message has greater orderingVal. + return; + } + } + // Put the DELETE record + records.put(key, SpillableMapUtils.generateEmptyPayload(key, + deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(), getPayloadClassFQN())); } public long getTotalTimeTakenToReadAndMergeBlocks() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index f781a148a..8ea34d6f2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -18,7 +18,7 @@ package org.apache.hudi.common.table.log; -import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; @@ -56,7 +56,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade } @Override - protected void processNextDeletedKey(HoodieKey key) { + protected void processNextDeletedRecord(DeleteRecord deleteRecord) { throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config"); } @@ -64,9 +64,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade * A callback for log record scanner. */ @FunctionalInterface - public static interface LogRecordScannerCallback { + public interface LogRecordScannerCallback { - public void apply(HoodieRecord record) throws Exception; + void apply(HoodieRecord record) throws Exception; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java index 01159ab72..a5168072d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log.block; import org.apache.hudi.common.fs.SizeAwareDataInputStream; +import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SerializationUtils; @@ -31,6 +32,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -39,11 +41,11 @@ import java.util.Map; */ public class HoodieDeleteBlock extends HoodieLogBlock { - private HoodieKey[] keysToDelete; + private DeleteRecord[] recordsToDelete; - public HoodieDeleteBlock(HoodieKey[] keysToDelete, Map header) { + public HoodieDeleteBlock(DeleteRecord[] recordsToDelete, Map header) { this(Option.empty(), null, false, Option.empty(), header, new HashMap<>()); - this.keysToDelete = keysToDelete; + this.recordsToDelete = recordsToDelete; } public HoodieDeleteBlock(Option content, FSDataInputStream inputStream, boolean readBlockLazily, @@ -59,23 +61,23 @@ public class HoodieDeleteBlock extends HoodieLogBlock { // In case this method is called before realizing keys from content if (content.isPresent()) { return content.get(); - } else if (readBlockLazily && keysToDelete == null) { + } else if (readBlockLazily && recordsToDelete == null) { // read block lazily - getKeysToDelete(); + getRecordsToDelete(); } ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(baos); - byte[] bytesToWrite = SerializationUtils.serialize(getKeysToDelete()); + byte[] bytesToWrite = SerializationUtils.serialize(getRecordsToDelete()); output.writeInt(version); output.writeInt(bytesToWrite.length); output.write(bytesToWrite); return baos.toByteArray(); } - public HoodieKey[] getKeysToDelete() { + public DeleteRecord[] getRecordsToDelete() { try { - if (keysToDelete == null) { + if (recordsToDelete == null) { if (!getContent().isPresent() && readBlockLazily) { // read content from disk inflate(); @@ -86,15 +88,25 @@ public class HoodieDeleteBlock extends HoodieLogBlock { int dataLength = dis.readInt(); byte[] data = new byte[dataLength]; dis.readFully(data); - this.keysToDelete = SerializationUtils.deserialize(data); + this.recordsToDelete = deserialize(version, data); deflate(); } - return keysToDelete; + return recordsToDelete; } catch (IOException io) { throw new HoodieIOException("Unable to generate keys to delete from block content", io); } } + private static DeleteRecord[] deserialize(int version, byte[] data) { + if (version == 1) { + // legacy version + HoodieKey[] keys = SerializationUtils.deserialize(data); + return Arrays.stream(keys).map(DeleteRecord::create).toArray(DeleteRecord[]::new); + } else { + return SerializationUtils.deserialize(data); + } + } + @Override public HoodieLogBlockType getBlockType() { return HoodieLogBlockType.DELETE_BLOCK; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java index d514f28ce..71336be88 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java @@ -51,7 +51,7 @@ public abstract class HoodieLogBlock { * corresponding changes need to be made to {@link HoodieLogBlockVersion} TODO : Change this to a class, something * like HoodieLogBlockVersionV1/V2 and implement/override operations there */ - public static int version = 1; + public static int version = 2; // Header for each log block private final Map logBlockHeader; // Footer for each log block diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index bc48661c8..a4ef09641 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -173,4 +173,11 @@ public class ReflectionUtils { } return classes; } + + /** + * Returns whether the given two comparable values come from the same runtime class. + */ + public static boolean isSameClass(Comparable v, Comparable o) { + return v.getClass() == o.getClass(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index 9ded41543..d4bafd9c9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -161,9 +161,9 @@ public class SpillableMapUtils { /** * Utility method to convert bytes to HoodieRecord using schema and payload class. */ - public static R generateEmptyPayload(String recKey, String partitionPath, String payloadClazz) { + public static R generateEmptyPayload(String recKey, String partitionPath, Comparable orderingVal, String payloadClazz) { HoodieRecord hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), - ReflectionUtils.loadPayload(payloadClazz, new Object[] {Option.empty()}, Option.class)); + ReflectionUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class)); return (R) hoodieRecord; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index 4f616c362..28b66d837 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -19,8 +19,8 @@ package org.apache.hudi.metadata; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableConfig; @@ -80,9 +80,9 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc } @Override - protected void processNextDeletedKey(HoodieKey hoodieKey) { - if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(hoodieKey.getRecordKey())) { - super.processNextDeletedKey(hoodieKey); + protected void processNextDeletedRecord(DeleteRecord deleteRecord) { + if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(deleteRecord.getRecordKey())) { + super.processNextDeletedRecord(deleteRecord); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 0772dc63b..5b3dabdbf 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -20,9 +20,9 @@ package org.apache.hudi.common.functional; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.HoodieArchivedLogFile; import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -1016,13 +1016,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .collect(Collectors.toList()); // Delete 50 keys - List deletedKeys = copyOfRecords1.stream() - .map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + List deletedRecords = copyOfRecords1.stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) .collect(Collectors.toList()).subList(0, 50); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedRecords.toArray(new DeleteRecord[50]), header); writer.appendBlock(deleteBlock); List allLogFiles = @@ -1063,7 +1063,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { }); assertEquals(200, readKeys.size(), "Stream collect should return all 200 records"); assertEquals(50, emptyPayloads.size(), "Stream collect should return all 50 records with empty payloads"); - originalKeys.removeAll(deletedKeys); + originalKeys.removeAll(deletedRecords); Collections.sort(originalKeys); Collections.sort(readKeys); assertEquals(originalKeys, readKeys, "CompositeAvroLogReader should return 150 records from 2 versions"); @@ -1097,6 +1097,123 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); } + @ParameterizedTest + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) + throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords1 = records1.stream() + .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header); + writer.appendBlock(dataBlock); + + // Write 2 + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); + List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords2 = records2.stream() + .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); + dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header); + writer.appendBlock(dataBlock); + + copyOfRecords1.addAll(copyOfRecords2); + List originalKeys = + copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect(Collectors.toList()); + + // Delete 10 keys + // Default orderingVal is 0, which means natural order, the DELETE records + // should overwrite the data records. + List deleteRecords1 = copyOfRecords1.subList(0, 10).stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) + .collect(Collectors.toList()); + + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); + HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header); + writer.appendBlock(deleteBlock1); + + // Delete another 10 keys with -1 as orderingVal. + // The deletion should not work + + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103"); + HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1))).toArray(DeleteRecord[]::new), header); + writer.appendBlock(deleteBlock2); + + // Delete another 10 keys with +1 as orderingVal. + // The deletion should work because the keys has greater ordering value. + List deletedRecords3 = copyOfRecords1.subList(20, 30).stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1))) + .collect(Collectors.toList()); + + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104"); + HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header); + writer.appendBlock(deleteBlock3); + + List allLogFiles = + FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()).collect(Collectors.toList()); + + FileCreateUtils.createDeltaCommit(basePath, "100", fs); + FileCreateUtils.createDeltaCommit(basePath, "101", fs); + FileCreateUtils.createDeltaCommit(basePath, "102", fs); + FileCreateUtils.createDeltaCommit(basePath, "103", fs); + FileCreateUtils.createDeltaCommit(basePath, "104", fs); + + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(allLogFiles) + .withReaderSchema(schema) + .withLatestInstantTime("104") + .withMaxMemorySizeInBytes(10240L) + .withReadBlocksLazily(readBlocksLazily) + .withReverseReader(false) + .withBufferSize(bufferSize) + .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) + .build(); + + assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); + final List readKeys = new ArrayList<>(200); + final List emptyPayloadKeys = new ArrayList<>(); + scanner.forEach(s -> readKeys.add(s.getRecordKey())); + scanner.forEach(s -> { + try { + if (!s.getData().getInsertValue(schema).isPresent()) { + emptyPayloadKeys.add(s.getRecordKey()); + } + } catch (IOException io) { + throw new UncheckedIOException(io); + } + }); + assertEquals(200, readKeys.size(), "Stream collect should return all 200 records"); + assertEquals(20, emptyPayloadKeys.size(), "Stream collect should return all 20 records with empty payloads"); + + originalKeys.removeAll(deleteRecords1.stream().map(DeleteRecord::getRecordKey).collect(Collectors.toSet())); + originalKeys.removeAll(deletedRecords3.stream().map(DeleteRecord::getRecordKey).collect(Collectors.toSet())); + readKeys.removeAll(emptyPayloadKeys); + + Collections.sort(originalKeys); + Collections.sort(readKeys); + assertEquals(originalKeys, readKeys, "HoodieMergedLogRecordScanner should return 180 records from 4 versions"); + } + @ParameterizedTest @MethodSource("testArguments") public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType, @@ -1131,12 +1248,12 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { // Delete 50 keys // Delete 50 keys - List deletedKeys = copyOfRecords1.stream() - .map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + List deleteRecords = copyOfRecords1.stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) .collect(Collectors.toList()).subList(0, 50); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header); writer.appendBlock(deleteBlock); FileCreateUtils.createDeltaCommit(basePath, "100", fs); @@ -1208,11 +1325,11 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.appendBlock(dataBlock); // Delete 50 keys - List deletedKeys = copyOfRecords1.stream() - .map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + List deleteRecords = copyOfRecords1.stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) .collect(Collectors.toList()).subList(0, 50); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header); writer.appendBlock(deleteBlock); FileCreateUtils.createDeltaCommit(basePath, "100", fs); @@ -1328,11 +1445,11 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { // Delete 50 keys // Delete 50 keys - List deletedKeys = copyOfRecords1.stream() - .map(s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + List deleteRecords = copyOfRecords1.stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) .collect(Collectors.toList()).subList(0, 50); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header); writer.appendBlock(deleteBlock); FileCreateUtils.createDeltaCommit(basePath, "100", fs); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java similarity index 97% rename from hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java index a0d90e028..3bd6a60c4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteRecordGenerator.java @@ -28,7 +28,7 @@ import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class TestGlobalDeleteKeyGenerator extends KeyGeneratorTestUtilities { +public class TestGlobalDeleteRecordGenerator extends KeyGeneratorTestUtilities { private TypedProperties getCommonProps(boolean getComplexRecordKey) { TypedProperties properties = new TypedProperties(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 5c20939cf..d8ebe5cbc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -517,17 +517,14 @@ class TestMORDataSource extends HoodieClientTestBase { checkAnswer((1, "a0", 12, 101, false)) writeData((1, "a0", 16, 97, true)) - // Ordering value will not be honored for a delete record as the payload is sent as empty payload - checkAnswer((1, "a0", 16, 97, true)) + // Ordering value will be honored, the delete record is considered as obsolete + // because it has smaller version number (97 < 101) + checkAnswer((1, "a0", 12, 101, false)) writeData((1, "a0", 18, 96, false)) - // Ideally, once a record is deleted, preCombine does not kick. So, any new record will be considered valid ignoring - // ordering val. But what happens ini hudi is, all records in log files are reconciled and then merged with base - // file. After reconciling all records from log files, it results in (1, "a0", 18, 96, false) and ths is merged with - // (1, "a0", 10, 100, false) in base file and hence we see (1, "a0", 10, 100, false) as it has higher preComine value. - // the result might differ depending on whether compaction was triggered or not(after record is deleted). In this - // test, no compaction is triggered and hence we see the record from base file. - checkAnswer((1, "a0", 10, 100, false)) + // Ordering value will be honored, the data record is considered as obsolete + // because it has smaller version number (96 < 101) + checkAnswer((1, "a0", 12, 101, false)) } private def writeData(data: (Int, String, Int, Int, Boolean)): Unit = {