[HUDI-3069] Improve HoodieMergedLogRecordScanner avoid putting unnecessary hoodie records (#4932)
* log scanner optimization * payload equals switches to `=` Co-authored-by: 苏承祥 <sucx@tuya.com>
This commit is contained in:
@@ -59,18 +59,15 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
|
|||||||
implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
|
implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
|
||||||
|
// A timer for calculating elapsed time in millis
|
||||||
|
public final HoodieTimer timer = new HoodieTimer();
|
||||||
// Final map of compacted/merged records
|
// Final map of compacted/merged records
|
||||||
protected final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
|
protected final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
|
||||||
|
|
||||||
// count of merged records in log
|
// count of merged records in log
|
||||||
private long numMergedRecordsInLog;
|
private long numMergedRecordsInLog;
|
||||||
private long maxMemorySizeInBytes;
|
private long maxMemorySizeInBytes;
|
||||||
|
|
||||||
// Stores the total time taken to perform reading and merging of log blocks
|
// Stores the total time taken to perform reading and merging of log blocks
|
||||||
private long totalTimeTakenToReadAndMergeBlocks;
|
private long totalTimeTakenToReadAndMergeBlocks;
|
||||||
// A timer for calculating elapsed time in millis
|
|
||||||
public final HoodieTimer timer = new HoodieTimer();
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
|
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
|
||||||
@@ -143,9 +140,11 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
|
|||||||
HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
|
HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
|
||||||
HoodieRecordPayload oldValue = oldRecord.getData();
|
HoodieRecordPayload oldValue = oldRecord.getData();
|
||||||
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
|
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
|
||||||
boolean choosePrev = combinedValue.equals(oldValue);
|
// If combinedValue is oldValue, no need rePut oldRecord
|
||||||
HoodieOperation operation = choosePrev ? oldRecord.getOperation() : hoodieRecord.getOperation();
|
if (combinedValue != oldValue) {
|
||||||
records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
|
HoodieOperation operation = hoodieRecord.getOperation();
|
||||||
|
records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Put the record as is
|
// Put the record as is
|
||||||
records.put(key, hoodieRecord);
|
records.put(key, hoodieRecord);
|
||||||
@@ -187,11 +186,11 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
|
|||||||
protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
|
protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
|
||||||
// incremental filtering
|
// incremental filtering
|
||||||
protected Option<InstantRange> instantRange = Option.empty();
|
protected Option<InstantRange> instantRange = Option.empty();
|
||||||
|
protected String partitionName;
|
||||||
// auto scan default true
|
// auto scan default true
|
||||||
private boolean autoScan = true;
|
private boolean autoScan = true;
|
||||||
// operation field default false
|
// operation field default false
|
||||||
private boolean withOperationField = false;
|
private boolean withOperationField = false;
|
||||||
protected String partitionName;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Builder withFileSystem(FileSystem fs) {
|
public Builder withFileSystem(FileSystem fs) {
|
||||||
|
|||||||
Reference in New Issue
Block a user