1
0

- Ugrading to Hive 2.x

- Eliminating in-memory deltaRecordsMap
- Use writerSchema to generate generic record needed by custom payloads
- changes to make tests work with hive 2.x
This commit is contained in:
Nishith Agarwal
2019-05-10 13:09:09 -07:00
committed by vinoth chandar
parent cd7623e216
commit 129e433641
22 changed files with 554 additions and 191 deletions

View File

@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -187,7 +188,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat implements Confi
}
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split,
final JobConf job, final Reporter reporter) throws IOException {
// TODO enable automatic predicate pushdown after fixing issues
// FileSplit fileSplit = (FileSplit) split;

View File

@@ -20,6 +20,7 @@ package com.uber.hoodie.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RecordReader;
@@ -31,10 +32,10 @@ import org.apache.hadoop.mapred.RecordReader;
* another thread, we need to ensure new instance of ArrayWritable is buffered. ParquetReader createKey/Value is unsafe
* as it gets reused for subsequent fetch. This wrapper makes ParquetReader safe for this use-case.
*/
public class SafeParquetRecordReaderWrapper implements RecordReader<Void, ArrayWritable> {
public class SafeParquetRecordReaderWrapper implements RecordReader<NullWritable, ArrayWritable> {
// real Parquet reader to be wrapped
private final RecordReader<Void, ArrayWritable> parquetReader;
private final RecordReader<NullWritable, ArrayWritable> parquetReader;
// Value Class
private final Class valueClass;
@@ -43,7 +44,7 @@ public class SafeParquetRecordReaderWrapper implements RecordReader<Void, ArrayW
private final int numValueFields;
public SafeParquetRecordReaderWrapper(RecordReader<Void, ArrayWritable> parquetReader) {
public SafeParquetRecordReaderWrapper(RecordReader<NullWritable, ArrayWritable> parquetReader) {
this.parquetReader = parquetReader;
ArrayWritable arrayWritable = parquetReader.createValue();
this.valueClass = arrayWritable.getValueClass();
@@ -51,12 +52,12 @@ public class SafeParquetRecordReaderWrapper implements RecordReader<Void, ArrayW
}
@Override
public boolean next(Void key, ArrayWritable value) throws IOException {
public boolean next(NullWritable key, ArrayWritable value) throws IOException {
return parquetReader.next(key, value);
}
@Override
public Void createKey() {
public NullWritable createKey() {
return parquetReader.createKey();
}

View File

@@ -18,13 +18,10 @@
package com.uber.hoodie.hadoop.realtime;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.LogReaderUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
@@ -44,7 +41,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -90,7 +86,7 @@ public abstract class AbstractRealtimeRecordReader {
protected final HoodieRealtimeFileSplit split;
protected final JobConf jobConf;
private final MessageType baseFileSchema;
protected final boolean usesCustomPayload;
// Schema handles
private Schema readerSchema;
private Schema writerSchema;
@@ -98,9 +94,12 @@ public abstract class AbstractRealtimeRecordReader {
public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
this.split = split;
this.jobConf = job;
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("partitioningColumns ==> " + job.get("partition_columns", ""));
try {
this.usesCustomPayload = usesCustomPayload();
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
baseFileSchema = readSchema(jobConf, split.getPath());
init();
} catch (IOException e) {
@@ -109,6 +108,12 @@ public abstract class AbstractRealtimeRecordReader {
}
}
private boolean usesCustomPayload() {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, split.getBasePath());
return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
|| metaClient.getTableConfig().getPayloadClass().contains("com.uber.hoodie.OverwriteWithLatestAvroPayload"));
}
/**
* Reads the schema from the parquet file. This is different from ParquetUtils as it uses the
* twitter parquet to support hive 1.1.0
@@ -121,22 +126,32 @@ public abstract class AbstractRealtimeRecordReader {
}
}
/**
* Prints a JSON representation of the ArrayWritable for easier debuggability
*/
protected static String arrayWritableToString(ArrayWritable writable) {
if (writable == null) {
return "null";
}
StringBuilder builder = new StringBuilder();
Writable[] values = writable.get();
builder.append(String.format("(Size: %s)[", values.length));
builder.append("\"values_" + Math.random() + "_" + values.length + "\": {");
int i = 0;
for (Writable w : values) {
if (w instanceof ArrayWritable) {
builder.append(arrayWritableToString((ArrayWritable) w)).append(" ");
builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
} else {
builder.append(w).append(" ");
builder.append("\"value" + i + "\":" + "\"" + w + "\"").append(",");
if (w == null) {
builder.append("\"type" + i + "\":" + "\"unknown\"").append(",");
} else {
builder.append("\"type" + i + "\":" + "\"" + w.getClass().getSimpleName() + "\"").append(",");
}
}
i++;
}
builder.append("]");
builder.deleteCharAt(builder.length() - 1);
builder.append("}");
return builder.toString();
}
@@ -187,9 +202,10 @@ public abstract class AbstractRealtimeRecordReader {
throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
+ "Derived Schema Fields: "
+ new ArrayList<>(schemaFieldsMap.keySet()));
} else {
projectedFields
.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
}
projectedFields
.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
}
Schema projectedSchema = Schema
@@ -203,17 +219,10 @@ public abstract class AbstractRealtimeRecordReader {
*/
public static Writable avroToArrayWritable(Object value, Schema schema) {
// if value is null, make a NullWritable
// Hive 2.x does not like NullWritable
if (value == null) {
return null;
//return NullWritable.get();
}
Writable[] wrapperWritable;
switch (schema.getType()) {
case STRING:
return new Text(value.toString());
@@ -231,39 +240,38 @@ public abstract class AbstractRealtimeRecordReader {
return new BooleanWritable((Boolean) value);
case NULL:
return null;
// return NullWritable.get();
case RECORD:
GenericRecord record = (GenericRecord) value;
Writable[] values1 = new Writable[schema.getFields().size()];
int index1 = 0;
Writable[] recordValues = new Writable[schema.getFields().size()];
int recordValueIndex = 0;
for (Schema.Field field : schema.getFields()) {
values1[index1++] = avroToArrayWritable(record.get(field.name()), field.schema());
recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema());
}
return new ArrayWritable(Writable.class, values1);
return new ArrayWritable(Writable.class, recordValues);
case ENUM:
return new Text(value.toString());
case ARRAY:
GenericArray arrayValue = (GenericArray) value;
Writable[] values2 = new Writable[arrayValue.size()];
int index2 = 0;
Writable[] arrayValues = new Writable[arrayValue.size()];
int arrayValueIndex = 0;
for (Object obj : arrayValue) {
values2[index2++] = avroToArrayWritable(obj, schema.getElementType());
arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType());
}
wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, values2)};
return new ArrayWritable(Writable.class, wrapperWritable);
// Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable
return new ArrayWritable(Writable.class, arrayValues);
case MAP:
Map mapValue = (Map) value;
Writable[] values3 = new Writable[mapValue.size()];
int index3 = 0;
Writable[] mapValues = new Writable[mapValue.size()];
int mapValueIndex = 0;
for (Object entry : mapValue.entrySet()) {
Map.Entry mapEntry = (Map.Entry) entry;
Writable[] mapValues = new Writable[2];
mapValues[0] = new Text(mapEntry.getKey().toString());
mapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
values3[index3++] = new ArrayWritable(Writable.class, mapValues);
Writable[] nestedMapValues = new Writable[2];
nestedMapValues[0] = new Text(mapEntry.getKey().toString());
nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues);
}
wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, values3)};
return new ArrayWritable(Writable.class, wrapperWritable);
// Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable
return new ArrayWritable(Writable.class, mapValues);
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
@@ -285,29 +293,13 @@ public abstract class AbstractRealtimeRecordReader {
}
}
public static Schema readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
HoodieAvroDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieAvroDataBlock) {
lastBlock = (HoodieAvroDataBlock) block;
}
}
reader.close();
if (lastBlock != null) {
return lastBlock.getSchema();
}
return null;
}
/**
* Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file
* to also be part of the projected schema. Hive expects the record reader implementation to return the row in its
* entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema
* also includes partition columns
*
* @param schema Schema to be changed
* @return
*/
private static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
final Set<String> firstLevelFieldNames = schema.getFields().stream().map(Field::name)
@@ -319,27 +311,26 @@ public abstract class AbstractRealtimeRecordReader {
}
/**
* Goes through the log files and populates a map with latest version of each key logged, since
* the base split was written.
* Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls
* back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into
* the job conf.
*/
private void init() throws IOException {
writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
List<String> fieldNames = writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList());
if (split.getDeltaFilePaths().size() > 0) {
String logPath = split.getDeltaFilePaths().get(split.getDeltaFilePaths().size() - 1);
FileSystem fs = FSUtils.getFs(logPath, jobConf);
writerSchema = readSchemaFromLogFile(fs, new Path(logPath));
fieldNames = writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList());
Schema schemaFromLogFile = LogReaderUtils
.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaFilePaths(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
} else {
writerSchema = schemaFromLogFile;
LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
}
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get("partition_columns", "");
List<String> partitioningFields =
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList())
: new ArrayList<>();
writerSchema = addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = orderFields(
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
@@ -347,7 +338,6 @@ public abstract class AbstractRealtimeRecordReader {
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
// to null out fields not present before
readerSchema = generateProjectionSchema(writerSchema, projectionFields);
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaFilePaths(), split.getPath(), projectionFields));
}

View File

@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -68,6 +69,15 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2;
public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
// Track the read column ids and names to be used throughout the execution and lifetime of this task
// Needed for Hive on Spark. Our theory is that due to
// {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
// not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
// times which ultimately breaks the query.
// TODO : Find why RO view works fine but RT doesn't, JIRA: https://issues.apache.org/jira/browse/HUDI-151
public static String READ_COLUMN_IDS;
public static String READ_COLUMN_NAMES;
public static boolean isReadColumnsSet = false;
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
@@ -190,7 +200,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
return conf;
}
private static Configuration addRequiredProjectionFields(Configuration configuration) {
private static synchronized Configuration addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader
configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD,
HOODIE_RECORD_KEY_COL_POS);
@@ -198,11 +208,16 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
HOODIE_COMMIT_TIME_COL_POS);
configuration = addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HOODIE_PARTITION_PATH_COL_POS);
if (!isReadColumnsSet) {
READ_COLUMN_IDS = configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
READ_COLUMN_NAMES = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
isReadColumnsSet = true;
}
return configuration;
}
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split,
final JobConf job, final Reporter reporter) throws IOException {
LOG.info("Before adding Hoodie columns, Projections :" + job
@@ -225,6 +240,10 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with "
+ split);
// Reset the original column ids and names
job.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS);
job.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, READ_COLUMN_NAMES);
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job,
super.getRecordReader(split, job, reporter));
}

View File

@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -30,17 +31,17 @@ import org.apache.hadoop.mapred.RecordReader;
* Realtime Record Reader which can do compacted (merge-on-read) record reading or
* unmerged reading (parquet and log files read in parallel) based on job configuration.
*/
public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWritable> {
public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, ArrayWritable> {
// Property to enable parallel reading of parquet and log files without merging.
public static final String REALTIME_SKIP_MERGE_PROP = "hoodie.realtime.merge.skip";
// By default, we do merged-reading
public static final String DEFAULT_REALTIME_SKIP_MERGE = "false";
public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class);
private final RecordReader<Void, ArrayWritable> reader;
private final RecordReader<NullWritable, ArrayWritable> reader;
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job,
RecordReader<Void, ArrayWritable> realReader) {
RecordReader<NullWritable, ArrayWritable> realReader) {
this.reader = constructRecordReader(split, job, realReader);
}
@@ -56,8 +57,8 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
* @param realReader Parquet Record Reader
* @return Realtime Reader
*/
private static RecordReader<Void, ArrayWritable> constructRecordReader(HoodieRealtimeFileSplit split,
JobConf jobConf, RecordReader<Void, ArrayWritable> realReader) {
private static RecordReader<NullWritable, ArrayWritable> constructRecordReader(HoodieRealtimeFileSplit split,
JobConf jobConf, RecordReader<NullWritable, ArrayWritable> realReader) {
try {
if (canSkipMerging(jobConf)) {
LOG.info("Enabling un-merged reading of realtime records");
@@ -71,12 +72,12 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
}
@Override
public boolean next(Void key, ArrayWritable value) throws IOException {
public boolean next(NullWritable key, ArrayWritable value) throws IOException {
return this.reader.next(key, value);
}
@Override
public Void createKey() {
public NullWritable createKey() {
return this.reader.createKey();
}

View File

@@ -22,68 +22,50 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements
RecordReader<Void, ArrayWritable> {
RecordReader<NullWritable, ArrayWritable> {
protected final RecordReader<Void, ArrayWritable> parquetReader;
private final HashMap<String, ArrayWritable> deltaRecordMap;
protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
public RealtimeCompactedRecordReader(HoodieRealtimeFileSplit split, JobConf job,
RecordReader<Void, ArrayWritable> realReader) throws IOException {
RecordReader<NullWritable, ArrayWritable> realReader) throws IOException {
super(split, job);
this.parquetReader = realReader;
this.deltaRecordMap = new HashMap<>();
readAndCompactLog();
this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
}
/**
* Goes through the log files and populates a map with latest version of each key logged, since
* the base split was written.
*/
private void readAndCompactLog() throws IOException {
HoodieMergedLogRecordScanner compactedLogRecordScanner = new HoodieMergedLogRecordScanner(
private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOException {
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using
// readCommit() API)
return new HoodieMergedLogRecordScanner(
FSUtils.getFs(split.getPath().toString(), jobConf), split.getBasePath(),
split.getDeltaFilePaths(), getReaderSchema(), split.getMaxCommitTime(), getMaxCompactionMemoryInBytes(),
split.getDeltaFilePaths(), usesCustomPayload ? getWriterSchema() : getReaderSchema(), split.getMaxCommitTime(),
getMaxCompactionMemoryInBytes(),
Boolean.valueOf(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH));
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using
// readCommit() API)
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : compactedLogRecordScanner) {
Optional<IndexedRecord> recordOptional = hoodieRecord.getData().getInsertValue(getReaderSchema());
ArrayWritable aWritable;
String key = hoodieRecord.getRecordKey();
if (recordOptional.isPresent()) {
GenericRecord rec = (GenericRecord) recordOptional.get();
// we assume, a later safe record in the log, is newer than what we have in the map &
// replace it.
// TODO : handle deletes here
aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema());
deltaRecordMap.put(key, aWritable);
} else {
aWritable = new ArrayWritable(Writable.class, new Writable[0]);
deltaRecordMap.put(key, aWritable);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Log record : " + arrayWritableToString(aWritable));
}
}
}
@Override
public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException {
public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException {
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable
// with a new block of values
boolean result = this.parquetReader.next(aVoid, arrayWritable);
@@ -96,18 +78,33 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme
// return from delta records map if we have some match.
String key = arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS]
.toString();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("key %s, base values: %s, log values: %s", key,
arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key))));
}
if (deltaRecordMap.containsKey(key)) {
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
// deltaRecord may not be a full record and needs values of columns from the parquet
Writable[] replaceValue = deltaRecordMap.get(key).get();
if (replaceValue.length < 1) {
// This record has been deleted, move to the next record
Optional<GenericRecord> rec;
if (usesCustomPayload) {
rec = deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
} else {
rec = deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
}
if (!rec.isPresent()) {
// If the record is not present, this is a delete record using an empty payload so skip this base record
// and move to the next record
return next(aVoid, arrayWritable);
}
GenericRecord recordToReturn = rec.get();
if (usesCustomPayload) {
// If using a custom payload, return only the projection fields
recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
}
// we assume, a later safe record in the log, is newer than what we have in the map &
// replace it.
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getWriterSchema());
Writable[] replaceValue = aWritable.get();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("key %s, base values: %s, log values: %s", key,
arrayWritableToString(arrayWritable), arrayWritableToString(aWritable)));
}
Writable[] originalValue = arrayWritable.get();
try {
System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);
@@ -115,7 +112,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme
} catch (RuntimeException re) {
LOG.error("Got exception when doing array copy", re);
LOG.error("Base record :" + arrayWritableToString(arrayWritable));
LOG.error("Log record :" + arrayWritableToString(deltaRecordMap.get(key)));
LOG.error("Log record :" + arrayWritableToString(aWritable));
throw re;
}
}
@@ -124,7 +121,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme
}
@Override
public Void createKey() {
public NullWritable createKey() {
return parquetReader.createKey();
}

View File

@@ -34,20 +34,21 @@ import java.util.List;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implements
RecordReader<Void, ArrayWritable> {
RecordReader<NullWritable, ArrayWritable> {
// Log Record unmerged scanner
private final HoodieUnMergedLogRecordScanner logRecordScanner;
// Parquet record reader
private final RecordReader<Void, ArrayWritable> parquetReader;
private final RecordReader<NullWritable, ArrayWritable> parquetReader;
// Parquet record iterator wrapper for the above reader
private final RecordReaderValueIterator<Void, ArrayWritable> parquetRecordsIterator;
private final RecordReaderValueIterator<NullWritable, ArrayWritable> parquetRecordsIterator;
// Executor that runs the above producers in parallel
private final BoundedInMemoryExecutor<ArrayWritable, ArrayWritable, ?> executor;
@@ -64,7 +65,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implemen
* @param realReader Parquet Reader
*/
public RealtimeUnmergedRecordReader(HoodieRealtimeFileSplit split, JobConf job,
RecordReader<Void, ArrayWritable> realReader) {
RecordReader<NullWritable, ArrayWritable> realReader) {
super(split, job);
this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
// Iterator for consuming records from parquet file
@@ -103,7 +104,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implemen
}
@Override
public boolean next(Void key, ArrayWritable value) throws IOException {
public boolean next(NullWritable key, ArrayWritable value) throws IOException {
if (!iterator.hasNext()) {
return false;
}
@@ -113,7 +114,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implemen
}
@Override
public Void createKey() {
public NullWritable createKey() {
return parquetReader.createKey();
}

View File

@@ -26,6 +26,7 @@ import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -214,9 +215,9 @@ public class HoodieInputFormatTest {
int totalCount = 0;
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
for (InputSplit split : splits) {
RecordReader<Void, ArrayWritable> recordReader = inputFormat
RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat
.getRecordReader(split, jobConf, null);
Void key = recordReader.createKey();
NullWritable key = recordReader.createKey();
ArrayWritable writable = recordReader.createValue();
while (recordReader.next(key, writable)) {

View File

@@ -20,7 +20,9 @@ package com.uber.hoodie.hadoop;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.SchemaTestUtil;
import java.io.File;
import java.io.FilenameFilter;
@@ -29,8 +31,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.avro.AvroParquetWriter;
@@ -79,6 +83,11 @@ public class InputFormatTestUtil {
new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit").createNewFile();
}
public static void deltaCommit(TemporaryFolder basePath, String commitNumber) throws IOException {
// create the commit
new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".deltacommit").createNewFile();
}
public static void setupIncremental(JobConf jobConf, String startCommit,
int numberOfCommitsToPull) {
String modePropertyName = String
@@ -107,6 +116,16 @@ public class InputFormatTestUtil {
return partitionPath;
}
public static File prepareSimpleParquetDataset(TemporaryFolder basePath, Schema schema,
int numberOfFiles, int numberOfRecords, String commitNumber) throws Exception {
basePath.create();
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString());
File partitionPath = basePath.newFolder("2016", "05", "01");
createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber);
return partitionPath;
}
public static File prepareNonPartitionedParquetDataset(TemporaryFolder baseDir, Schema schema,
int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException {
baseDir.create();
@@ -135,6 +154,31 @@ public class InputFormatTestUtil {
}
}
private static void createSimpleData(Schema schema,
File partitionPath, int numberOfFiles, int numberOfRecords, String commitNumber)
throws Exception {
AvroParquetWriter parquetWriter;
for (int i = 0; i < numberOfFiles; i++) {
String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" + i);
File dataFile = new File(partitionPath, fileId);
parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema);
try {
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numberOfRecords);
String commitTime = HoodieActiveTimeline.createNewCommitTime();
Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(schema);
for (IndexedRecord record : records) {
GenericRecord p = HoodieAvroUtils.rewriteRecord((GenericRecord) record, hoodieFieldsSchema);
p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, UUID.randomUUID().toString());
p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00");
p.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitNumber);
parquetWriter.write(p);
}
} finally {
parquetWriter.close();
}
}
}
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema,
int numberOfRecords, String commitTime, String fileId) throws IOException {
List<GenericRecord> records = new ArrayList<>(numberOfRecords);

View File

@@ -48,6 +48,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -60,6 +61,7 @@ import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -91,7 +93,7 @@ public class HoodieRealtimeRecordReaderTest {
private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId,
String baseCommit, String newCommit, int numberOfRecords)
throws InterruptedException, IOException {
return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0);
return writeDataBlockToLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0);
}
private HoodieLogFormat.Writer writeRollback(File partitionDir, Schema schema, String fileId,
@@ -115,7 +117,7 @@ public class HoodieRealtimeRecordReaderTest {
return writer;
}
private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId,
private HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Schema schema, String fileId,
String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
@@ -137,6 +139,25 @@ public class HoodieRealtimeRecordReaderTest {
return writer;
}
private HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionDir, Schema schema, String fileId,
String baseCommit, String newCommit, String oldCommit, int logVersion)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(partitionDir.getPath()))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
.overBaseCommit(baseCommit).withLogVersion(logVersion).withFs(fs).build();
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, oldCommit);
header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK
.ordinal()));
HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header);
writer = writer.appendBlock(rollbackBlock);
return writer;
}
@Test
public void testReader() throws Exception {
testReader(true);
@@ -155,7 +176,7 @@ public class HoodieRealtimeRecordReaderTest {
String baseInstant = "100";
File partitionDir =
partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, baseInstant)
: InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, baseInstant);
: InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, baseInstant);
InputFormatTestUtil.commit(basePath, baseInstant);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
@@ -183,7 +204,7 @@ public class HoodieRealtimeRecordReaderTest {
writer = writeRollback(partitionDir, schema, "fileid0", baseInstant,
instantTime, String.valueOf(baseInstantTs + logVersion - 1), logVersion);
} else {
writer = writeLogFile(partitionDir, schema, "fileid0", baseInstant,
writer = writeDataBlockToLogFile(partitionDir, schema, "fileid0", baseInstant,
instantTime, 100, 0, logVersion);
}
long size = writer.getCurrentSize();
@@ -199,7 +220,7 @@ public class HoodieRealtimeRecordReaderTest {
.collect(Collectors.toList()), instantTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<Void, ArrayWritable> reader =
RecordReader<NullWritable, ArrayWritable> reader =
new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null),
jobConf, null);
@@ -219,7 +240,7 @@ public class HoodieRealtimeRecordReaderTest {
//use reader to read base Parquet File and log file, merge in flight and return latest commit
//here all 100 records should be updated, see above
Void key = recordReader.createKey();
NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
while (recordReader.next(key, value)) {
Writable[] values = value.get();
@@ -255,7 +276,7 @@ public class HoodieRealtimeRecordReaderTest {
// insert new records to log file
String newCommitTime = "101";
HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime,
HoodieLogFormat.Writer writer = writeDataBlockToLogFile(partitionDir, schema, "fileid0", commitTime,
newCommitTime, numRecords, numRecords, 0);
long size = writer.getCurrentSize();
writer.close();
@@ -268,7 +289,7 @@ public class HoodieRealtimeRecordReaderTest {
jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<Void, ArrayWritable> reader =
RecordReader<NullWritable, ArrayWritable> reader =
new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null),
jobConf, null);
@@ -288,7 +309,7 @@ public class HoodieRealtimeRecordReaderTest {
//use reader to read base Parquet File and log file
//here all records should be present. Also ensure log records are in order.
Void key = recordReader.createKey();
NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int numRecordsAtCommit1 = 0;
int numRecordsAtCommit2 = 0;
@@ -343,6 +364,7 @@ public class HoodieRealtimeRecordReaderTest {
long size = writer.getCurrentSize();
writer.close();
assertTrue("block - size should be > 0", size > 0);
InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
@@ -351,7 +373,7 @@ public class HoodieRealtimeRecordReaderTest {
jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<Void, ArrayWritable> reader =
RecordReader<NullWritable, ArrayWritable> reader =
new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null),
jobConf, null);
@@ -370,7 +392,7 @@ public class HoodieRealtimeRecordReaderTest {
// use reader to read base Parquet File and log file, merge in flight and return latest commit
// here the first 50 records should be updated, see above
Void key = recordReader.createKey();
NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int numRecordsRead = 0;
while (recordReader.next(key, value)) {
@@ -420,26 +442,26 @@ public class HoodieRealtimeRecordReaderTest {
// Assert type MAP
ArrayWritable mapItem = (ArrayWritable) values[12];
Writable[] mapItemValues = ((ArrayWritable) mapItem.get()[0]).get();
ArrayWritable mapItemValue1 = (ArrayWritable) mapItemValues[0];
ArrayWritable mapItemValue2 = (ArrayWritable) mapItemValues[1];
Assert.assertEquals("test value for field: tags", mapItemValue1.get()[0].toString(),
Writable mapItemValue1 = mapItem.get()[0];
Writable mapItemValue2 = mapItem.get()[1];
Assert.assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue1).get()[0].toString(),
"mapItem1");
Assert.assertEquals("test value for field: tags", mapItemValue2.get()[0].toString(),
Assert.assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue2).get()[0].toString(),
"mapItem2");
ArrayWritable mapItemValue1value = (ArrayWritable) mapItemValue1.get()[1];
ArrayWritable mapItemValue2value = (ArrayWritable) mapItemValue2.get()[1];
Assert.assertEquals("test value for field: tags", mapItemValue1value.get().length, 2);
Assert.assertEquals("test value for field: tags", mapItemValue2value.get().length, 2);
Assert.assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue1).get().length, 2);
Assert.assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue2).get().length, 2);
Writable mapItemValue1value = ((ArrayWritable) mapItemValue1).get()[1];
Writable mapItemValue2value = ((ArrayWritable) mapItemValue2).get()[1];
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item1",
mapItemValue1value.get()[0].toString(), "item" + currentRecordNo);
((ArrayWritable) mapItemValue1value).get()[0].toString(), "item" + currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item1",
mapItemValue2value.get()[0].toString(), "item2" + currentRecordNo);
((ArrayWritable) mapItemValue2value).get()[0].toString(), "item2" + currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item2",
mapItemValue1value.get()[1].toString(),
((ArrayWritable) mapItemValue1value).get()[1].toString(),
"item" + currentRecordNo + recordCommitTimeSuffix);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item2",
mapItemValue2value.get()[1].toString(),
((ArrayWritable) mapItemValue2value).get()[1].toString(),
"item2" + currentRecordNo + recordCommitTimeSuffix);
// Assert type RECORD
@@ -453,11 +475,96 @@ public class HoodieRealtimeRecordReaderTest {
// Assert type ARRAY
ArrayWritable arrayValue = (ArrayWritable) values[14];
Writable[] arrayValues = ((ArrayWritable) arrayValue.get()[0]).get();
Writable[] arrayValues = arrayValue.get();
for (int i = 0; i < arrayValues.length; i++) {
Assert.assertEquals("test value for field: stringArray", "stringArray" + i + recordCommitTimeSuffix,
arrayValues[i].toString());
}
}
}
}
@Test
public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception {
// initial commit
List<String> logFilePaths = new ArrayList<>();
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
HoodieTestUtils.initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(),
HoodieTableType.MERGE_ON_READ);
String commitTime = "100";
int numberOfRecords = 100;
int numberOfLogRecords = numberOfRecords / 2;
File partitionDir = InputFormatTestUtil
.prepareSimpleParquetDataset(basePath, schema, 1, numberOfRecords, commitTime);
InputFormatTestUtil.commit(basePath, commitTime);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
List<Field> firstSchemaFields = schema.getFields();
// update files and generate new log file but don't commit
schema = SchemaTestUtil.getComplexEvolvedSchema();
String newCommitTime = "101";
HoodieLogFormat.Writer writer = writeDataBlockToLogFile(partitionDir, schema, "fileid0", commitTime,
newCommitTime, numberOfLogRecords, 0, 1);
long size = writer.getCurrentSize();
logFilePaths.add(writer.getLogFile().getPath().toString());
writer.close();
assertTrue("block - size should be > 0", size > 0);
// write rollback for the previous block in new log file version
newCommitTime = "102";
writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0", commitTime,
newCommitTime, "101", 1);
logFilePaths.add(writer.getLogFile().getPath().toString());
writer.close();
assertTrue("block - size should be > 0", size > 0);
InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
//create a split with baseFile (parquet file written earlier) and new log file(s)
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + ".parquet"), 0, 1,
jobConf), basePath.getRoot().getPath(), logFilePaths, newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<NullWritable, ArrayWritable> reader =
new MapredParquetInputFormat().getRecordReader(
new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null),
jobConf, null);
JobConf jobConf = new JobConf();
List<Schema.Field> fields = schema.getFields();
assert (firstSchemaFields.containsAll(fields) == false);
// Try to read all the fields passed by the new schema
String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(","));
String positions = fields.stream().map(f -> String.valueOf(f.pos()))
.collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set("partition_columns", "datestr");
HoodieRealtimeRecordReader recordReader = null;
try {
// validate record reader compaction
recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
throw new RuntimeException("should've failed the previous line");
} catch (HoodieException e) {
// expected, field not found since the data written with the evolved schema was rolled back
}
// Try to read all the fields passed by the new schema
names = firstSchemaFields.stream().map(f -> f.name()).collect(Collectors.joining(","));
positions = firstSchemaFields.stream().map(f -> String.valueOf(f.pos()))
.collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set("partition_columns", "datestr");
// This time read only the fields which are part of parquet
recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
// use reader to read base Parquet File and log file
NullWritable key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
while (recordReader.next(key, value)) {
// keep reading
}
}
}