From 65844a8d29b715227f3b2ecdf9b102a08950ee67 Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Tue, 13 Apr 2021 18:23:26 +0800 Subject: [PATCH] [HUDI-1720] Fix RealtimeCompactedRecordReader StackOverflowError (#2721) --- .../RealtimeCompactedRecordReader.java | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index a98a23010..1ae25f80e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -77,6 +77,14 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader .build(); } + private Option buildGenericRecordwithCustomPayload(HoodieRecord record) throws IOException { + if (usesCustomPayload) { + return record.getData().getInsertValue(getWriterSchema()); + } else { + return record.getData().getInsertValue(getReaderSchema()); + } + } + @Override public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException { // Call the underlying parquetReader.next - which may replace the passed in ArrayWritable @@ -95,15 +103,24 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the // deltaRecord may not be a full record and needs values of columns from the parquet Option rec; - if (usesCustomPayload) { - rec = deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema()); - } else { - rec = deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema()); + rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); + // If the record is not present, this is a delete record using an empty payload so skip this base record + // and move to the next record + while (!rec.isPresent()) { + // if current parquet reader has no record, return false + if (!this.parquetReader.next(aVoid, arrayWritable)) { + return false; + } + String tempKey = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString(); + if (deltaRecordMap.containsKey(tempKey)) { + rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey)); + } else { + // need to return true, since now log file does not contain tempKey, but parquet file contains tempKey + return true; + } } if (!rec.isPresent()) { - // If the record is not present, this is a delete record using an empty payload so skip this base record - // and move to the next record - return next(aVoid, arrayWritable); + return false; } GenericRecord recordToReturn = rec.get(); if (usesCustomPayload) {