From 6f57bbfac4881388d73a9e0768498fe47fe6bb5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E6=89=BF=E7=A5=A5?= Date: Mon, 7 Mar 2022 14:35:55 +0800 Subject: [PATCH] [HUDI-3069] Improve HoodieMergedLogRecordScanner avoid putting unnecessary hoodie records (#4932) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * log scanner optimization * payload equals switches to `=` Co-authored-by: 苏承祥 --- .../table/log/HoodieMergedLogRecordScanner.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index d0ab73ab0..882e1057c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -59,18 +59,15 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader implements Iterable> { private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class); - + // A timer for calculating elapsed time in millis + public final HoodieTimer timer = new HoodieTimer(); // Final map of compacted/merged records protected final ExternalSpillableMap> records; - // count of merged records in log private long numMergedRecordsInLog; private long maxMemorySizeInBytes; - // Stores the total time taken to perform reading and merging of log blocks private long totalTimeTakenToReadAndMergeBlocks; - // A timer for calculating elapsed time in millis - public final HoodieTimer timer = new HoodieTimer(); @SuppressWarnings("unchecked") protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, @@ -143,9 +140,11 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader HoodieRecord oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue); - boolean choosePrev = combinedValue.equals(oldValue); - HoodieOperation operation = choosePrev ? oldRecord.getOperation() : hoodieRecord.getOperation(); - records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation)); + // If combinedValue is oldValue, no need rePut oldRecord + if (combinedValue != oldValue) { + HoodieOperation operation = hoodieRecord.getOperation(); + records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation)); + } } else { // Put the record as is records.put(key, hoodieRecord); @@ -187,11 +186,11 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue(); // incremental filtering protected Option instantRange = Option.empty(); + protected String partitionName; // auto scan default true private boolean autoScan = true; // operation field default false private boolean withOperationField = false; - protected String partitionName; @Override public Builder withFileSystem(FileSystem fs) {