[HUDI-298] Fix issue with incorrect column mapping casusing bad data, during on-the-fly merge of Real Time tables (#956)
* Fix issue with incorrect column mapping casusing bad data, during on-the-fly merge of Real Time tables
This commit is contained in:
committed by
Balaji Varadarajan
parent
c052167c06
commit
12523c379f
@@ -33,6 +33,7 @@ import org.apache.avro.generic.GenericFixed;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
|
||||
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
@@ -89,13 +90,14 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
// Schema handles
|
||||
private Schema readerSchema;
|
||||
private Schema writerSchema;
|
||||
private Schema hiveSchema;
|
||||
|
||||
public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
|
||||
this.split = split;
|
||||
this.jobConf = job;
|
||||
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
|
||||
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
||||
LOG.info("partitioningColumns ==> " + job.get("partition_columns", ""));
|
||||
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
|
||||
try {
|
||||
this.usesCustomPayload = usesCustomPayload();
|
||||
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
|
||||
@@ -179,7 +181,8 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
/**
|
||||
* Generate a reader schema off the provided writeSchema, to just project out the provided columns
|
||||
*/
|
||||
public static Schema generateProjectionSchema(Schema writeSchema, List<String> fieldNames) {
|
||||
public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Field> schemaFieldsMap,
|
||||
List<String> fieldNames) {
|
||||
/**
|
||||
* Avro & Presto field names seems to be case sensitive (support fields differing only in case) whereas
|
||||
* Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable using
|
||||
@@ -191,8 +194,6 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
*
|
||||
*/
|
||||
List<Schema.Field> projectedFields = new ArrayList<>();
|
||||
Map<String, Schema.Field> schemaFieldsMap = writeSchema.getFields().stream()
|
||||
.map(r -> Pair.of(r.name().toLowerCase(), r)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
|
||||
for (String fn : fieldNames) {
|
||||
Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
|
||||
if (field == null) {
|
||||
@@ -209,6 +210,11 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
return projectedSchema;
|
||||
}
|
||||
|
||||
public static Map<String, Field> getNameToFieldMap(Schema schema) {
|
||||
return schema.getFields().stream().map(r -> Pair.of(r.name().toLowerCase(), r))
|
||||
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the projected read from delta record into an array writable
|
||||
*/
|
||||
@@ -321,20 +327,48 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
|
||||
}
|
||||
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
|
||||
String partitionFields = jobConf.get("partition_columns", "");
|
||||
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
|
||||
List<String> partitioningFields =
|
||||
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList())
|
||||
: new ArrayList<>();
|
||||
writerSchema = addPartitionFields(writerSchema, partitioningFields);
|
||||
List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
|
||||
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);
|
||||
|
||||
Map<String, Field> schemaFieldsMap = getNameToFieldMap(writerSchema);
|
||||
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
|
||||
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
|
||||
// to null out fields not present before
|
||||
readerSchema = generateProjectionSchema(writerSchema, projectionFields);
|
||||
|
||||
readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
|
||||
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
|
||||
split.getDeltaFilePaths(), split.getPath(), projectionFields));
|
||||
}
|
||||
|
||||
private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
|
||||
// Get all column names of hive table
|
||||
String hiveColumnString = jobConf.get(hive_metastoreConstants.META_TABLE_COLUMNS);
|
||||
String[] hiveColumns = hiveColumnString.split(",");
|
||||
List<Field> hiveSchemaFields = new ArrayList<>();
|
||||
|
||||
for (String columnName : hiveColumns) {
|
||||
Field field = schemaFieldsMap.get(columnName.toLowerCase());
|
||||
|
||||
if (field != null) {
|
||||
hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
|
||||
} else {
|
||||
// Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
|
||||
// They will get skipped as they won't be found in the original schema.
|
||||
LOG.debug("Skipping Hive Column => " + columnName);
|
||||
}
|
||||
}
|
||||
|
||||
Schema hiveSchema = Schema.createRecord(writerSchema.getName(), writerSchema.getDoc(), writerSchema.getNamespace(),
|
||||
writerSchema.isError());
|
||||
hiveSchema.setFields(hiveSchemaFields);
|
||||
return hiveSchema;
|
||||
}
|
||||
|
||||
public Schema getReaderSchema() {
|
||||
return readerSchema;
|
||||
}
|
||||
@@ -343,6 +377,10 @@ public abstract class AbstractRealtimeRecordReader {
|
||||
return writerSchema;
|
||||
}
|
||||
|
||||
public Schema getHiveSchema() {
|
||||
return hiveSchema;
|
||||
}
|
||||
|
||||
public long getMaxCompactionMemoryInBytes() {
|
||||
// jobConf.getMemoryForMapTask() returns in MB
|
||||
return (long) Math
|
||||
|
||||
@@ -101,7 +101,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
|
||||
}
|
||||
// we assume, a later safe record in the log, is newer than what we have in the map &
|
||||
// replace it.
|
||||
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getWriterSchema());
|
||||
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
|
||||
Writable[] replaceValue = aWritable.get();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable),
|
||||
|
||||
Reference in New Issue
Block a user