1
0

updated HoodieRealtimeRecordReader to use HoodieCompactedLogRecordScanner, added test for recordreader

This commit is contained in:
Nishith Agarwal
2017-05-31 15:06:22 -07:00
parent 933cc8071f
commit ba050973e3
6 changed files with 194 additions and 48 deletions

View File

@@ -18,24 +18,20 @@
package com.uber.hoodie.hadoop.realtime;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline;
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.io.ArrayWritable;
@@ -56,6 +52,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
@@ -76,7 +73,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
private final HashMap<String, ArrayWritable> deltaRecordMap;
private final MessageType baseFileSchema;
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split,
JobConf job,
RecordReader<Void, ArrayWritable> realReader) {
@@ -106,29 +102,25 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);
LOG.info(String.format("About to read logs %s for base split %s, projecting cols %s",
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
for (String logFilePath: split.getDeltaFilePaths()) {
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(writerSchema, readerSchema);
final AvroFSInput input = new AvroFSInput(FileContext.getFileContext(jobConf), new Path(logFilePath));
DataFileReader<GenericRecord> reader = (DataFileReader<GenericRecord>) DataFileReader.openReader(input, datumReader);
while (reader.hasNext()) {
GenericRecord rec = reader.next();
String key = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String commitTime = rec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
if (HoodieTimeline.compareTimestamps(commitTime, split.getMaxCommitTime(), HoodieTimeline.GREATER)) {
// stop reading this log file. we hit a record later than max known commit time.
break;
}
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
deltaRecordMap.put(key, aWritable);
if (LOG.isDebugEnabled()) {
LOG.debug("Log record : " + arrayWritableToString(aWritable));
}
HoodieCompactedLogRecordScanner compactedLogRecordScanner =
new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(), readerSchema);
Iterator<HoodieRecord<HoodieAvroPayload>> itr = compactedLogRecordScanner.iterator();
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using readCommit() API)
while(itr.hasNext()) {
HoodieRecord<HoodieAvroPayload> hoodieRecord = itr.next();
GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema).get();
String key = hoodieRecord.getRecordKey();
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
deltaRecordMap.put(key, aWritable);
if (LOG.isDebugEnabled()) {
LOG.debug("Log record : " + arrayWritableToString(aWritable));
}
reader.close();
}
}
@@ -146,7 +138,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
return builder.toString();
}
/**
* Given a comma separated list of field names and positions at which they appear on Hive,
* return a ordered list of field names, that can be passed onto storage.
@@ -173,8 +164,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
return orderedFieldMap.values().stream().collect(Collectors.toList());
}
/**
* Generate a reader schema off the provided writeSchema, to just project out
* the provided columns
@@ -266,7 +255,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
return null;
}
@Override
public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException {
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable with a new block of values