1
0

- Fixing RT queries for HiveOnSpark that causes race conditions

- Adding more comments to understand usage of reader/writer schema
This commit is contained in:
Nishith Agarwal
2019-11-03 23:09:15 -08:00
committed by Balaji Varadarajan
parent 22315a887f
commit 3a05edab01
3 changed files with 66 additions and 35 deletions

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.hadoop.realtime;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@@ -156,24 +157,39 @@ public abstract class AbstractRealtimeRecordReader {
} }
/** /**
* Given a comma separated list of field names and positions at which they appear on Hive, return a ordered list of * Given a comma separated list of field names and positions at which they appear on Hive, return
* field names, that can be passed onto storage. * an ordered list of field names, that can be passed onto storage.
*/ */
private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) { private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
// Need to convert the following to Set first since Hive does not handle duplicate field names correctly but
String[] fieldOrders = fieldOrderCsv.split(","); // handles duplicate fields orders correctly.
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(",")).filter(fn -> !partitioningFields.contains(fn)) // Fields Orders -> {@link https://github
.collect(Collectors.toList()); // .com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188}
// Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java
// /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229}
Set<String> fieldOrdersSet = new LinkedHashSet<>();
String[] fieldOrdersWithDups = fieldOrderCsv.split(",");
for (String fieldOrder : fieldOrdersWithDups) {
fieldOrdersSet.add(fieldOrder);
}
String[] fieldOrders = fieldOrdersSet.toArray(new String[fieldOrdersSet.size()]);
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
.filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
Set<String> fieldNamesSet = new LinkedHashSet<>();
for (String fieldName : fieldNames) {
fieldNamesSet.add(fieldName);
}
// Hive does not provide ids for partitioning fields, so check for lengths excluding that. // Hive does not provide ids for partitioning fields, so check for lengths excluding that.
if (fieldNames.size() != fieldOrders.length) { if (fieldNamesSet.size() != fieldOrders.length) {
throw new HoodieException( throw new HoodieException(String
String.format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d", .format("Error ordering fields for storage read. #fieldNames: %d, #fieldPositions: %d",
fieldNames.size(), fieldOrders.length)); fieldNames.size(), fieldOrders.length));
} }
TreeMap<Integer, String> orderedFieldMap = new TreeMap<>(); TreeMap<Integer, String> orderedFieldMap = new TreeMap<>();
String[] fieldNamesArray = fieldNamesSet.toArray(new String[fieldNamesSet.size()]);
for (int ox = 0; ox < fieldOrders.length; ox++) { for (int ox = 0; ox < fieldOrders.length; ox++) {
orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNames.get(ox)); orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]);
} }
return new ArrayList<>(orderedFieldMap.values()); return new ArrayList<>(orderedFieldMap.values());
} }

View File

@@ -69,7 +69,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
public static final int HOODIE_COMMIT_TIME_COL_POS = 0; public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
public static final int HOODIE_RECORD_KEY_COL_POS = 2; public static final int HOODIE_RECORD_KEY_COL_POS = 2;
public static final int HOODIE_PARTITION_PATH_COL_POS = 3; public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
// Hive on Spark queries do not work with RT tables. Our theory is that due to public static final String HOODIE_READ_COLUMNS_PROP = "hoodie.read.columns.set";
// To make Hive on Spark queries work with RT tables. Our theory is that due to
// {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher} // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
// not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple // not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple
// times which ultimately breaks the query. // times which ultimately breaks the query.
@@ -186,7 +187,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
return conf; return conf;
} }
private static synchronized Configuration addRequiredProjectionFields(Configuration configuration) { private static Configuration addRequiredProjectionFields(Configuration configuration) {
// Need this to do merge records in HoodieRealtimeRecordReader // Need this to do merge records in HoodieRealtimeRecordReader
configuration = configuration =
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS); addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
@@ -204,13 +205,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
* Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438) * Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
*/ */
private static Configuration cleanProjectionColumnIds(Configuration conf) { private static Configuration cleanProjectionColumnIds(Configuration conf) {
synchronized (conf) { String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') { conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1)); if (LOG.isDebugEnabled()) {
if (LOG.isDebugEnabled()) { LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
}
} }
} }
return conf; return conf;
@@ -219,18 +218,30 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
@Override @Override
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job, public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
final Reporter reporter) throws IOException { final Reporter reporter) throws IOException {
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
this.conf = cleanProjectionColumnIds(job); // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
LOG.info("Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); // latency incurred here due to the synchronization since get record reader is called once per spilt before the
// actual heavy lifting of reading the parquet files happen.
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table; if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases synchronized (job) {
// hoodie additional projection columns are reset after calling setConf and only natural projections LOG.info(
// (one found in select queries) are set. things would break because of this. "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
// For e:g _hoodie_record_key would be missing and merge step would throw exceptions. + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction time. if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
this.conf = addRequiredProjectionFields(job); // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
// hoodie additional projection columns are reset after calling setConf and only natural projections
// (one found in select queries) are set. things would break because of this.
// For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
// time.
this.conf = cleanProjectionColumnIds(job);
this.conf = addRequiredProjectionFields(job);
this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
}
}
}
LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));

View File

@@ -96,11 +96,13 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
} }
GenericRecord recordToReturn = rec.get(); GenericRecord recordToReturn = rec.get();
if (usesCustomPayload) { if (usesCustomPayload) {
// If using a custom payload, return only the projection fields // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
// the writerSchema with only the projection fields
recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema()); recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
} }
// we assume, a later safe record in the log, is newer than what we have in the map & // we assume, a later safe record in the log, is newer than what we have in the map &
// replace it. // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
// schema, we use writerSchema to create the arrayWritable from the latest generic record
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema()); ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
Writable[] replaceValue = aWritable.get(); Writable[] replaceValue = aWritable.get();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@@ -115,7 +117,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
LOG.error("Got exception when doing array copy", re); LOG.error("Got exception when doing array copy", re);
LOG.error("Base record :" + arrayWritableToString(arrayWritable)); LOG.error("Base record :" + arrayWritableToString(arrayWritable));
LOG.error("Log record :" + arrayWritableToString(aWritable)); LOG.error("Log record :" + arrayWritableToString(aWritable));
throw re; String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
+ " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
throw new RuntimeException(errMsg, re);
} }
} }
return true; return true;