1
0

Moving depedencies off cdh to apache + Hive2 support

- Tests redone in the process
 - Main changes are to RealtimeRecordReader and how it treats maps/arrays
 - Make hive sync work with Hive 1/2 and CDH environments
 - Fixes to make corner cases for Hive queries
 - Spark Hive integration - Working version across Apache and CDH versions
 - Known Issue - https://github.com/uber/hudi/issues/439
This commit is contained in:
Vinoth Chandar
2018-07-15 22:34:02 -07:00
committed by vinoth chandar
parent 2b1af18941
commit a5359662be
32 changed files with 1983 additions and 407 deletions

View File

@@ -16,13 +16,8 @@
package com.uber.hoodie.hadoop;
import static parquet.filter2.predicate.FilterApi.and;
import static parquet.filter2.predicate.FilterApi.binaryColumn;
import static parquet.filter2.predicate.FilterApi.gt;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
@@ -43,26 +38,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import parquet.filter2.predicate.FilterPredicate;
import parquet.filter2.predicate.Operators;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.FileMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.api.Binary;
/**
* HoodieInputFormat which understands the Hoodie File Structure and filters files based on the
@@ -219,62 +201,6 @@ public class HoodieInputFormat extends MapredParquetInputFormat implements Confi
return super.getRecordReader(split, job, reporter);
}
/**
* Clears out the filter expression (if this is not done, then ParquetReader will override the
* FilterPredicate set)
*/
private void clearOutExistingPredicate(JobConf job) {
job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
}
/**
* Constructs the predicate to push down to parquet storage. This creates the predicate for
* `hoodie_commit_time` > 'start_commit_time' and ANDs with the existing predicate if one is
* present already.
*/
private FilterPredicate constructHoodiePredicate(JobConf job, String tableName, InputSplit split)
throws IOException {
FilterPredicate commitTimePushdown = constructCommitTimePushdownPredicate(job, tableName);
LOG.info("Commit time predicate - " + commitTimePushdown.toString());
FilterPredicate existingPushdown = constructHQLPushdownPredicate(job, split);
LOG.info("Existing predicate - " + existingPushdown);
FilterPredicate hoodiePredicate;
if (existingPushdown != null) {
hoodiePredicate = and(existingPushdown, commitTimePushdown);
} else {
hoodiePredicate = commitTimePushdown;
}
LOG.info("Hoodie Predicate - " + hoodiePredicate);
return hoodiePredicate;
}
private FilterPredicate constructHQLPushdownPredicate(JobConf job, InputSplit split)
throws IOException {
String serializedPushdown = job.get(TableScanDesc.FILTER_EXPR_CONF_STR);
String columnNamesString = job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty()
|| columnNamesString.isEmpty()) {
return null;
} else {
SearchArgument sarg = SearchArgumentFactory
.create(Utilities.deserializeExpression(serializedPushdown));
final Path finalPath = ((FileSplit) split).getPath();
final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(job, finalPath);
final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
return ParquetFilterPredicateConverter.toFilterPredicate(sarg, fileMetaData.getSchema());
}
}
private FilterPredicate constructCommitTimePushdownPredicate(JobConf job, String tableName)
throws IOException {
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
Operators.BinaryColumn sequenceColumn = binaryColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
FilterPredicate p = gt(sequenceColumn, Binary.fromString(lastIncrementalTs));
LOG.info("Setting predicate in InputFormat " + p.toString());
return p;
}
/**
* Read the table metadata from a data path. This assumes certain hierarchy of files which should
* be changed once a better way is figured out to pass in the hoodie meta directory

View File

@@ -126,7 +126,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(),
baseDir.toString());
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
fs.listStatus(folder));
List<HoodieDataFile> latestFiles = fsView.getLatestDataFiles()
.collect(Collectors.toList());

View File

@@ -18,6 +18,13 @@
package com.uber.hoodie.hadoop.realtime;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
@@ -29,12 +36,15 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -44,7 +54,6 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
@@ -119,10 +128,15 @@ public abstract class AbstractRealtimeRecordReader {
StringBuilder builder = new StringBuilder();
Writable[] values = writable.get();
builder.append(String.format("Size: %s,", values.length));
builder.append(String.format("(Size: %s)[", values.length));
for (Writable w : values) {
builder.append(w + " ");
if (w instanceof ArrayWritable) {
builder.append(arrayWritableToString((ArrayWritable) w) + " ");
} else {
builder.append(w + " ");
}
}
builder.append("]");
return builder.toString();
}
@@ -130,12 +144,9 @@ public abstract class AbstractRealtimeRecordReader {
* Given a comma separated list of field names and positions at which they appear on Hive, return
* a ordered list of field names, that can be passed onto storage.
*/
public static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv,
String partitioningFieldsCsv) {
private static List<String> orderFields(String fieldNameCsv, String fieldOrderCsv, List<String> partitioningFields) {
String[] fieldOrders = fieldOrderCsv.split(",");
Set<String> partitioningFields = Arrays.stream(partitioningFieldsCsv.split(","))
.collect(Collectors.toSet());
List<String> fieldNames = Arrays.stream(fieldNameCsv.split(","))
.filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList());
@@ -157,17 +168,34 @@ public abstract class AbstractRealtimeRecordReader {
* columns
*/
public static Schema generateProjectionSchema(Schema writeSchema, List<String> fieldNames) {
/**
* Avro & Presto field names seems to be case sensitive (support fields differing only in case)
* whereas Hive/Impala/SparkSQL(default) are case-insensitive. Spark allows this to be configurable
* using spark.sql.caseSensitive=true
*
* For a RT table setup with no delta-files (for a latest file-slice) -> we translate parquet schema to Avro
* Here the field-name case is dependent on parquet schema. Hive (1.x/2.x/CDH) translate column projections
* to lower-cases
*
*/
List<Schema.Field> projectedFields = new ArrayList<>();
Map<String, Schema.Field> schemaFieldsMap = writeSchema.getFields().stream()
.map(r -> Pair.of(r.name().toLowerCase(), r)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
for (String fn : fieldNames) {
Schema.Field field = writeSchema.getField(fn);
Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
if (field == null) {
throw new HoodieException("Field " + fn + " not found log schema. Query cannot proceed!");
throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
+ "Derived Schema Fields: "
+ schemaFieldsMap.keySet().stream().collect(Collectors.toList()));
}
projectedFields
.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
}
return Schema.createRecord(projectedFields);
Schema projectedSchema = Schema
.createRecord(writeSchema.getName(), writeSchema.getDoc(), writeSchema.getNamespace(), writeSchema.isError());
projectedSchema.setFields(projectedFields);
return projectedSchema;
}
/**
@@ -176,10 +204,16 @@ public abstract class AbstractRealtimeRecordReader {
public static Writable avroToArrayWritable(Object value, Schema schema) {
// if value is null, make a NullWritable
// Hive 2.x does not like NullWritable
if (value == null) {
return NullWritable.get();
return null;
//return NullWritable.get();
}
Writable[] wrapperWritable;
switch (schema.getType()) {
case STRING:
return new Text(value.toString());
@@ -196,7 +230,8 @@ public abstract class AbstractRealtimeRecordReader {
case BOOLEAN:
return new BooleanWritable((Boolean) value);
case NULL:
return NullWritable.get();
return null;
// return NullWritable.get();
case RECORD:
GenericRecord record = (GenericRecord) value;
Writable[] values1 = new Writable[schema.getFields().size()];
@@ -214,7 +249,8 @@ public abstract class AbstractRealtimeRecordReader {
for (Object obj : arrayValue) {
values2[index2++] = avroToArrayWritable(obj, schema.getElementType());
}
return new ArrayWritable(Writable.class, values2);
wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, values2)};
return new ArrayWritable(Writable.class, wrapperWritable);
case MAP:
Map mapValue = (Map) value;
Writable[] values3 = new Writable[mapValue.size()];
@@ -226,7 +262,8 @@ public abstract class AbstractRealtimeRecordReader {
mapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType());
values3[index3++] = new ArrayWritable(Writable.class, mapValues);
}
return new ArrayWritable(Writable.class, values3);
wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, values3)};
return new ArrayWritable(Writable.class, wrapperWritable);
case UNION:
List<Schema> types = schema.getTypes();
if (types.size() != 2) {
@@ -248,16 +285,61 @@ public abstract class AbstractRealtimeRecordReader {
}
}
public static Schema readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
HoodieAvroDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
if (block instanceof HoodieAvroDataBlock) {
lastBlock = (HoodieAvroDataBlock) block;
}
}
if (lastBlock != null) {
return lastBlock.getSchema();
}
return null;
}
/**
* Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file
* to also be part of the projected schema. Hive expects the record reader implementation to return the row in its
* entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema
* also includes partition columns
* @param schema Schema to be changed
* @return
*/
private static Schema addPartitionFields(Schema schema, List<String> partitioningFields) {
final Set<String> firstLevelFieldNames = schema.getFields().stream().map(Field::name)
.map(String::toLowerCase).collect(Collectors.toSet());
List<String> fieldsToAdd = partitioningFields.stream().map(String::toLowerCase)
.filter(x -> !firstLevelFieldNames.contains(x)).collect(Collectors.toList());
return HoodieAvroUtils.appendNullSchemaFields(schema, fieldsToAdd);
}
/**
* Goes through the log files and populates a map with latest version of each key logged, since
* the base split was written.
*/
private void init() throws IOException {
writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
List<String> fieldNames = writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList());
if (split.getDeltaFilePaths().size() > 0) {
String logPath = split.getDeltaFilePaths().get(split.getDeltaFilePaths().size() - 1);
FileSystem fs = FSUtils.getFs(logPath, jobConf);
writerSchema = readSchemaFromLogFile(fs, new Path(logPath));
fieldNames = writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList());
}
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
List<String> partitioningFields = Arrays.stream(
jobConf.get("partition_columns", "").split(",")).collect(Collectors.toList());
writerSchema = addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = orderFields(
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
jobConf.get("partition_columns", ""));
partitioningFields);
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
// to null out fields not present before
readerSchema = generateProjectionSchema(writerSchema, projectionFields);

View File

@@ -203,21 +203,31 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit split,
final JobConf job, final Reporter reporter) throws IOException {
LOG.info("Before adding Hoodie columns, Projections :" + job
.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :"
+ job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
// hoodie additional projection columns are reset after calling setConf and only natural projections
// (one found in select queries) are set. things would break because of this.
// For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction time.
this.conf = addRequiredProjectionFields(job);
LOG.info("Creating record reader with readCols :" + job
.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :"
+ job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// sanity check
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with "
+ split);
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job,
super.getRecordReader(split, job, reporter));
}
@Override
public void setConf(Configuration conf) {
this.conf = addRequiredProjectionFields(conf);
}
@Override
public Configuration getConf() {
return conf;

View File

@@ -95,8 +95,15 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro ?
Writable[] replaceValue = deltaRecordMap.get(key).get();
Writable[] originalValue = arrayWritable.get();
System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);
arrayWritable.set(originalValue);
try {
System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);
arrayWritable.set(originalValue);
} catch (RuntimeException re) {
LOG.error("Got exception when doing array copy", re);
LOG.error("Base record :" + arrayWritableToString(arrayWritable));
LOG.error("Log record :" + arrayWritableToString(deltaRecordMap.get(key)));
throw re;
}
}
return true;
}