diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 1ae25f80e..ddb3708ea 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -38,7 +38,10 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Set; class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { @@ -48,11 +51,15 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader protected final RecordReader parquetReader; private final Map> deltaRecordMap; + private final Set deltaRecordKeys; + private Iterator deltaItr; + public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job, RecordReader realReader) throws IOException { super(split, job); this.parquetReader = realReader; this.deltaRecordMap = getMergedLogRecordScanner().getRecords(); + this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet()); } /** @@ -89,72 +96,71 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException { // Call the underlying parquetReader.next - which may replace the passed in ArrayWritable // with a new block of values - boolean result = this.parquetReader.next(aVoid, arrayWritable); - if (!result) { - // if the result is false, then there are no more records - return false; - } - if (!deltaRecordMap.isEmpty()) { - // TODO(VC): Right now, we assume all records in log, have a matching base record. (which - // would be true until we have a way to index logs too) - // return from delta records map if we have some match. - String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString(); - if (deltaRecordMap.containsKey(key)) { - // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the - // deltaRecord may not be a full record and needs values of columns from the parquet - Option rec; - rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); - // If the record is not present, this is a delete record using an empty payload so skip this base record - // and move to the next record - while (!rec.isPresent()) { - // if current parquet reader has no record, return false - if (!this.parquetReader.next(aVoid, arrayWritable)) { - return false; + while (this.parquetReader.next(aVoid, arrayWritable)) { + if (!deltaRecordMap.isEmpty()) { + String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString(); + if (deltaRecordMap.containsKey(key)) { + // mark the key as handled + this.deltaRecordKeys.remove(key); + // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the + // deltaRecord may not be a full record and needs values of columns from the parquet + Option rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); + // If the record is not present, this is a delete record using an empty payload so skip this base record + // and move to the next record + if (!rec.isPresent()) { + continue; } - String tempKey = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString(); - if (deltaRecordMap.containsKey(tempKey)) { - rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey)); - } else { - // need to return true, since now log file does not contain tempKey, but parquet file contains tempKey - return true; - } - } - if (!rec.isPresent()) { - return false; - } - GenericRecord recordToReturn = rec.get(); - if (usesCustomPayload) { - // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from - // the writerSchema with only the projection fields - recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema()); - } - // we assume, a later safe record in the log, is newer than what we have in the map & - // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest - // schema, we use writerSchema to create the arrayWritable from the latest generic record - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema()); - Writable[] replaceValue = aWritable.get(); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable), - HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable))); - } - Writable[] originalValue = arrayWritable.get(); - try { - // Sometime originalValue.length > replaceValue.length. - // This can happen when hive query is looking for pseudo parquet columns like BLOCK_OFFSET_INSIDE_FILE - System.arraycopy(replaceValue, 0, originalValue, 0, - Math.min(originalValue.length, replaceValue.length)); - arrayWritable.set(originalValue); - } catch (RuntimeException re) { - LOG.error("Got exception when doing array copy", re); - LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)); - LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)); - String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable) - + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage(); - throw new RuntimeException(errMsg, re); + setUpWritable(rec, arrayWritable, key); + return true; } } + return true; + } + if (this.deltaItr == null) { + this.deltaItr = this.deltaRecordKeys.iterator(); + } + while (this.deltaItr.hasNext()) { + final String key = this.deltaItr.next(); + Option rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); + if (rec.isPresent()) { + setUpWritable(rec, arrayWritable, key); + return true; + } + } + return false; + } + + private void setUpWritable(Option rec, ArrayWritable arrayWritable, String key) { + GenericRecord recordToReturn = rec.get(); + if (usesCustomPayload) { + // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from + // the writerSchema with only the projection fields + recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema()); + } + // we assume, a later safe record in the log, is newer than what we have in the map & + // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest + // schema, we use writerSchema to create the arrayWritable from the latest generic record + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema()); + Writable[] replaceValue = aWritable.get(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable), + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable))); + } + Writable[] originalValue = arrayWritable.get(); + try { + // Sometime originalValue.length > replaceValue.length. + // This can happen when hive query is looking for pseudo parquet columns like BLOCK_OFFSET_INSIDE_FILE + System.arraycopy(replaceValue, 0, originalValue, 0, + Math.min(originalValue.length, replaceValue.length)); + arrayWritable.set(originalValue); + } catch (RuntimeException re) { + LOG.error("Got exception when doing array copy", re); + LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)); + LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)); + String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable) + + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage(); + throw new RuntimeException(errMsg, re); } - return true; } @Override diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 001b1ac57..1f955e87e 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -155,7 +155,7 @@ public class TestHoodieRealtimeRecordReader { } else { writer = InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant, - instantTime, 100, 0, logVersion); + instantTime, 120, 0, logVersion); } long size = writer.getCurrentSize(); writer.close(); @@ -182,17 +182,21 @@ public class TestHoodieRealtimeRecordReader { // use reader to read base Parquet File and log file, merge in flight and return latest commit // here all 100 records should be updated, see above + // another 20 new insert records should also output with new commit time. NullWritable key = recordReader.createKey(); ArrayWritable value = recordReader.createValue(); + int recordCnt = 0; while (recordReader.next(key, value)) { Writable[] values = value.get(); // check if the record written is with latest commit, here "101" assertEquals(latestInstant, values[0].toString()); key = recordReader.createKey(); value = recordReader.createValue(); + recordCnt++; } recordReader.getPos(); assertEquals(1.0, recordReader.getProgress(), 0.05); + assertEquals(120, recordCnt); recordReader.close(); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe);