[HUDI-1969] Support reading logs for MOR Hive rt table (#3033)
This commit is contained in:
@@ -38,7 +38,10 @@ import org.apache.log4j.LogManager;
|
|||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
||||||
implements RecordReader<NullWritable, ArrayWritable> {
|
implements RecordReader<NullWritable, ArrayWritable> {
|
||||||
@@ -48,11 +51,15 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
|
protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
|
||||||
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
|
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
|
||||||
|
|
||||||
|
private final Set<String> deltaRecordKeys;
|
||||||
|
private Iterator<String> deltaItr;
|
||||||
|
|
||||||
public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
|
public RealtimeCompactedRecordReader(RealtimeSplit split, JobConf job,
|
||||||
RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
|
RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
|
||||||
super(split, job);
|
super(split, job);
|
||||||
this.parquetReader = realReader;
|
this.parquetReader = realReader;
|
||||||
this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
|
this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
|
||||||
|
this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -89,72 +96,71 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
|||||||
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
|
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
|
||||||
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
|
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
|
||||||
// with a new block of values
|
// with a new block of values
|
||||||
boolean result = this.parquetReader.next(aVoid, arrayWritable);
|
while (this.parquetReader.next(aVoid, arrayWritable)) {
|
||||||
if (!result) {
|
if (!deltaRecordMap.isEmpty()) {
|
||||||
// if the result is false, then there are no more records
|
String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
|
||||||
return false;
|
if (deltaRecordMap.containsKey(key)) {
|
||||||
}
|
// mark the key as handled
|
||||||
if (!deltaRecordMap.isEmpty()) {
|
this.deltaRecordKeys.remove(key);
|
||||||
// TODO(VC): Right now, we assume all records in log, have a matching base record. (which
|
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
|
||||||
// would be true until we have a way to index logs too)
|
// deltaRecord may not be a full record and needs values of columns from the parquet
|
||||||
// return from delta records map if we have some match.
|
Option<GenericRecord> rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
|
||||||
String key = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
|
// If the record is not present, this is a delete record using an empty payload so skip this base record
|
||||||
if (deltaRecordMap.containsKey(key)) {
|
// and move to the next record
|
||||||
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
|
if (!rec.isPresent()) {
|
||||||
// deltaRecord may not be a full record and needs values of columns from the parquet
|
continue;
|
||||||
Option<GenericRecord> rec;
|
|
||||||
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
|
|
||||||
// If the record is not present, this is a delete record using an empty payload so skip this base record
|
|
||||||
// and move to the next record
|
|
||||||
while (!rec.isPresent()) {
|
|
||||||
// if current parquet reader has no record, return false
|
|
||||||
if (!this.parquetReader.next(aVoid, arrayWritable)) {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
String tempKey = arrayWritable.get()[HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS].toString();
|
setUpWritable(rec, arrayWritable, key);
|
||||||
if (deltaRecordMap.containsKey(tempKey)) {
|
return true;
|
||||||
rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(tempKey));
|
|
||||||
} else {
|
|
||||||
// need to return true, since now log file does not contain tempKey, but parquet file contains tempKey
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!rec.isPresent()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
GenericRecord recordToReturn = rec.get();
|
|
||||||
if (usesCustomPayload) {
|
|
||||||
// If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
|
|
||||||
// the writerSchema with only the projection fields
|
|
||||||
recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema());
|
|
||||||
}
|
|
||||||
// we assume, a later safe record in the log, is newer than what we have in the map &
|
|
||||||
// replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
|
|
||||||
// schema, we use writerSchema to create the arrayWritable from the latest generic record
|
|
||||||
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
|
|
||||||
Writable[] replaceValue = aWritable.get();
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
|
|
||||||
HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
|
|
||||||
}
|
|
||||||
Writable[] originalValue = arrayWritable.get();
|
|
||||||
try {
|
|
||||||
// Sometime originalValue.length > replaceValue.length.
|
|
||||||
// This can happen when hive query is looking for pseudo parquet columns like BLOCK_OFFSET_INSIDE_FILE
|
|
||||||
System.arraycopy(replaceValue, 0, originalValue, 0,
|
|
||||||
Math.min(originalValue.length, replaceValue.length));
|
|
||||||
arrayWritable.set(originalValue);
|
|
||||||
} catch (RuntimeException re) {
|
|
||||||
LOG.error("Got exception when doing array copy", re);
|
|
||||||
LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
|
|
||||||
LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
|
|
||||||
String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
|
|
||||||
+ " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
|
|
||||||
throw new RuntimeException(errMsg, re);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (this.deltaItr == null) {
|
||||||
|
this.deltaItr = this.deltaRecordKeys.iterator();
|
||||||
|
}
|
||||||
|
while (this.deltaItr.hasNext()) {
|
||||||
|
final String key = this.deltaItr.next();
|
||||||
|
Option<GenericRecord> rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
|
||||||
|
if (rec.isPresent()) {
|
||||||
|
setUpWritable(rec, arrayWritable, key);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setUpWritable(Option<GenericRecord> rec, ArrayWritable arrayWritable, String key) {
|
||||||
|
GenericRecord recordToReturn = rec.get();
|
||||||
|
if (usesCustomPayload) {
|
||||||
|
// If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
|
||||||
|
// the writerSchema with only the projection fields
|
||||||
|
recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema());
|
||||||
|
}
|
||||||
|
// we assume, a later safe record in the log, is newer than what we have in the map &
|
||||||
|
// replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
|
||||||
|
// schema, we use writerSchema to create the arrayWritable from the latest generic record
|
||||||
|
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema());
|
||||||
|
Writable[] replaceValue = aWritable.get();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable),
|
||||||
|
HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable)));
|
||||||
|
}
|
||||||
|
Writable[] originalValue = arrayWritable.get();
|
||||||
|
try {
|
||||||
|
// Sometime originalValue.length > replaceValue.length.
|
||||||
|
// This can happen when hive query is looking for pseudo parquet columns like BLOCK_OFFSET_INSIDE_FILE
|
||||||
|
System.arraycopy(replaceValue, 0, originalValue, 0,
|
||||||
|
Math.min(originalValue.length, replaceValue.length));
|
||||||
|
arrayWritable.set(originalValue);
|
||||||
|
} catch (RuntimeException re) {
|
||||||
|
LOG.error("Got exception when doing array copy", re);
|
||||||
|
LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
|
||||||
|
LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable));
|
||||||
|
String errMsg = "Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable)
|
||||||
|
+ " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
|
||||||
|
throw new RuntimeException(errMsg, re);
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -155,7 +155,7 @@ public class TestHoodieRealtimeRecordReader {
|
|||||||
} else {
|
} else {
|
||||||
writer =
|
writer =
|
||||||
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant,
|
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", baseInstant,
|
||||||
instantTime, 100, 0, logVersion);
|
instantTime, 120, 0, logVersion);
|
||||||
}
|
}
|
||||||
long size = writer.getCurrentSize();
|
long size = writer.getCurrentSize();
|
||||||
writer.close();
|
writer.close();
|
||||||
@@ -182,17 +182,21 @@ public class TestHoodieRealtimeRecordReader {
|
|||||||
|
|
||||||
// use reader to read base Parquet File and log file, merge in flight and return latest commit
|
// use reader to read base Parquet File and log file, merge in flight and return latest commit
|
||||||
// here all 100 records should be updated, see above
|
// here all 100 records should be updated, see above
|
||||||
|
// another 20 new insert records should also output with new commit time.
|
||||||
NullWritable key = recordReader.createKey();
|
NullWritable key = recordReader.createKey();
|
||||||
ArrayWritable value = recordReader.createValue();
|
ArrayWritable value = recordReader.createValue();
|
||||||
|
int recordCnt = 0;
|
||||||
while (recordReader.next(key, value)) {
|
while (recordReader.next(key, value)) {
|
||||||
Writable[] values = value.get();
|
Writable[] values = value.get();
|
||||||
// check if the record written is with latest commit, here "101"
|
// check if the record written is with latest commit, here "101"
|
||||||
assertEquals(latestInstant, values[0].toString());
|
assertEquals(latestInstant, values[0].toString());
|
||||||
key = recordReader.createKey();
|
key = recordReader.createKey();
|
||||||
value = recordReader.createValue();
|
value = recordReader.createValue();
|
||||||
|
recordCnt++;
|
||||||
}
|
}
|
||||||
recordReader.getPos();
|
recordReader.getPos();
|
||||||
assertEquals(1.0, recordReader.getProgress(), 0.05);
|
assertEquals(1.0, recordReader.getProgress(), 0.05);
|
||||||
|
assertEquals(120, recordCnt);
|
||||||
recordReader.close();
|
recordReader.close();
|
||||||
} catch (Exception ioe) {
|
} catch (Exception ioe) {
|
||||||
throw new HoodieException(ioe.getMessage(), ioe);
|
throw new HoodieException(ioe.getMessage(), ioe);
|
||||||
|
|||||||
Reference in New Issue
Block a user