diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java index b0010b41d..c2fe7301c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java @@ -310,8 +310,6 @@ public abstract class AbstractHoodieLogRecordScanner { processAvroDataBlock((HoodieAvroDataBlock) lastBlock); break; case DELETE_BLOCK: - // TODO : If delete is the only block written and/or records are present in parquet file - // TODO : Mark as tombstone (optional.empty()) for data instead of deleting the entry Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey); break; case CORRUPT_BLOCK: diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java index 8c2dea4e4..d062cc181 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java @@ -331,6 +331,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { /** * hasPrev is not idempotent */ + @Override public boolean hasPrev() { try { if (!this.reverseReader) { @@ -352,6 +353,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { * iterate reverse (prev) or forward (next). Doing both in the same instance is not supported * WARNING : Every call to prev() should be preceded with hasPrev() */ + @Override public HoodieLogBlock prev() throws IOException { if (!this.reverseReader) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java index 3f0117917..650700a91 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java @@ -81,6 +81,19 @@ public interface HoodieLogFormat { * @return the path to this {@link HoodieLogFormat} */ HoodieLogFile getLogFile(); + + /** + * Read log file in reverse order and check if prev block is present + * @return + */ + public boolean hasPrev(); + + /** + * Read log file in reverse order and return prev block if present + * @return + * @throws IOException + */ + public HoodieLogBlock prev() throws IOException; } @@ -246,6 +259,13 @@ public interface HoodieLogFormat { return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false); } + static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean + readBlockLazily, boolean reverseReader) + throws IOException { + return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, + readBlockLazily, reverseReader); + } + /** * A set of feature flags associated with a log format. Versions are changed when the log format * changes. TODO(na) - Implement policies around major/minor versions diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java index 9c5f82fd6..39ae5a933 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java @@ -119,4 +119,14 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { public void remove() { } + @Override + public boolean hasPrev() { + return this.currentReader.hasPrev(); + } + + @Override + public HoodieLogBlock prev() throws IOException { + return this.currentReader.prev(); + } + } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java index 918c9a336..9b34fab63 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java @@ -44,6 +44,7 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.NullNode; /** * Helper class to do common stuff across Avro. @@ -156,16 +157,16 @@ public class HoodieAvroUtils { * Add null fields to passed in schema. Caller is responsible for ensuring there is no duplicates. * As different query engines have varying constraints regarding treating the case-sensitivity of fields, its best * to let caller determine that. + * * @param schema Passed in schema * @param newFieldNames Null Field names to be added - * @return */ public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { List newFields = schema.getFields().stream().map(field -> { return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()); }).collect(Collectors.toList()); for (String newField : newFieldNames) { - newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", null)); + newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", NullNode.getInstance())); } Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()); newSchema.setFields(newFields); @@ -184,11 +185,24 @@ public class HoodieAvroUtils { /** - * Given a avro record with a given schema, rewrites it into the new schema + * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old + * schema */ public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) { + return rewrite(record, record.getSchema(), newSchema); + } + + /** + * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new + * schema + */ + public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) { + return rewrite(record, newSchema, newSchema); + } + + private static GenericRecord rewrite(GenericRecord record, Schema schemaWithFields, Schema newSchema) { GenericRecord newRecord = new GenericData.Record(newSchema); - for (Schema.Field f : record.getSchema().getFields()) { + for (Schema.Field f : schemaWithFields.getFields()) { newRecord.put(f.name(), record.get(f.name())); } if (!GenericData.get().validate(newSchema, newRecord)) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java new file mode 100644 index 000000000..ad5d500c8 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; + +/** + * Utils class for performing various log file reading operations + */ +public class LogReaderUtils { + + private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, Path path) + throws IOException { + HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, true); + Schema writerSchema = null; + HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + while (reader.hasPrev()) { + HoodieLogBlock block = reader.prev(); + if (block instanceof HoodieAvroDataBlock && block != null) { + HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block; + if (completedTimeline.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType + .INSTANT_TIME))) { + writerSchema = Schema.parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + break; + } + } + } + reader.close(); + return writerSchema; + } + + public static Schema readLatestSchemaFromLogFiles(String basePath, List deltaFilePaths, JobConf jobConf) + throws IOException { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, basePath); + List deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))) + .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + if (deltaPaths.size() > 0) { + for (String logPath : deltaPaths) { + FileSystem fs = FSUtils.getFs(logPath, jobConf); + Schema schemaFromLogFile = + readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath)); + if (schemaFromLogFile != null) { + return schemaFromLogFile; + } + } + } + return null; + } + +} diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java index 6df237526..c15663041 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -187,7 +188,7 @@ public class HoodieInputFormat extends MapredParquetInputFormat implements Confi } @Override - public RecordReader getRecordReader(final InputSplit split, + public RecordReader getRecordReader(final InputSplit split, final JobConf job, final Reporter reporter) throws IOException { // TODO enable automatic predicate pushdown after fixing issues // FileSplit fileSplit = (FileSplit) split; diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java index 48c3f86d1..274e955a0 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java @@ -20,6 +20,7 @@ package com.uber.hoodie.hadoop; import java.io.IOException; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.RecordReader; @@ -31,10 +32,10 @@ import org.apache.hadoop.mapred.RecordReader; * another thread, we need to ensure new instance of ArrayWritable is buffered. ParquetReader createKey/Value is unsafe * as it gets reused for subsequent fetch. This wrapper makes ParquetReader safe for this use-case. */ -public class SafeParquetRecordReaderWrapper implements RecordReader { +public class SafeParquetRecordReaderWrapper implements RecordReader { // real Parquet reader to be wrapped - private final RecordReader parquetReader; + private final RecordReader parquetReader; // Value Class private final Class valueClass; @@ -43,7 +44,7 @@ public class SafeParquetRecordReaderWrapper implements RecordReader parquetReader) { + public SafeParquetRecordReaderWrapper(RecordReader parquetReader) { this.parquetReader = parquetReader; ArrayWritable arrayWritable = parquetReader.createValue(); this.valueClass = arrayWritable.getValueClass(); @@ -51,12 +52,12 @@ public class SafeParquetRecordReaderWrapper implements RecordReader " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + LOG.info("partitioningColumns ==> " + job.get("partition_columns", "")); try { + this.usesCustomPayload = usesCustomPayload(); + LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); baseFileSchema = readSchema(jobConf, split.getPath()); init(); } catch (IOException e) { @@ -109,6 +108,12 @@ public abstract class AbstractRealtimeRecordReader { } } + private boolean usesCustomPayload() { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, split.getBasePath()); + return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName()) + || metaClient.getTableConfig().getPayloadClass().contains("com.uber.hoodie.OverwriteWithLatestAvroPayload")); + } + /** * Reads the schema from the parquet file. This is different from ParquetUtils as it uses the * twitter parquet to support hive 1.1.0 @@ -121,22 +126,32 @@ public abstract class AbstractRealtimeRecordReader { } } + /** + * Prints a JSON representation of the ArrayWritable for easier debuggability + */ protected static String arrayWritableToString(ArrayWritable writable) { if (writable == null) { return "null"; } - StringBuilder builder = new StringBuilder(); Writable[] values = writable.get(); - builder.append(String.format("(Size: %s)[", values.length)); + builder.append("\"values_" + Math.random() + "_" + values.length + "\": {"); + int i = 0; for (Writable w : values) { if (w instanceof ArrayWritable) { - builder.append(arrayWritableToString((ArrayWritable) w)).append(" "); + builder.append(arrayWritableToString((ArrayWritable) w)).append(","); } else { - builder.append(w).append(" "); + builder.append("\"value" + i + "\":" + "\"" + w + "\"").append(","); + if (w == null) { + builder.append("\"type" + i + "\":" + "\"unknown\"").append(","); + } else { + builder.append("\"type" + i + "\":" + "\"" + w.getClass().getSimpleName() + "\"").append(","); + } } + i++; } - builder.append("]"); + builder.deleteCharAt(builder.length() - 1); + builder.append("}"); return builder.toString(); } @@ -187,9 +202,10 @@ public abstract class AbstractRealtimeRecordReader { throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! " + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet())); + } else { + projectedFields + .add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); } - projectedFields - .add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue())); } Schema projectedSchema = Schema @@ -203,17 +219,10 @@ public abstract class AbstractRealtimeRecordReader { */ public static Writable avroToArrayWritable(Object value, Schema schema) { - // if value is null, make a NullWritable - // Hive 2.x does not like NullWritable if (value == null) { - return null; - //return NullWritable.get(); } - - Writable[] wrapperWritable; - switch (schema.getType()) { case STRING: return new Text(value.toString()); @@ -231,39 +240,38 @@ public abstract class AbstractRealtimeRecordReader { return new BooleanWritable((Boolean) value); case NULL: return null; - // return NullWritable.get(); case RECORD: GenericRecord record = (GenericRecord) value; - Writable[] values1 = new Writable[schema.getFields().size()]; - int index1 = 0; + Writable[] recordValues = new Writable[schema.getFields().size()]; + int recordValueIndex = 0; for (Schema.Field field : schema.getFields()) { - values1[index1++] = avroToArrayWritable(record.get(field.name()), field.schema()); + recordValues[recordValueIndex++] = avroToArrayWritable(record.get(field.name()), field.schema()); } - return new ArrayWritable(Writable.class, values1); + return new ArrayWritable(Writable.class, recordValues); case ENUM: return new Text(value.toString()); case ARRAY: GenericArray arrayValue = (GenericArray) value; - Writable[] values2 = new Writable[arrayValue.size()]; - int index2 = 0; + Writable[] arrayValues = new Writable[arrayValue.size()]; + int arrayValueIndex = 0; for (Object obj : arrayValue) { - values2[index2++] = avroToArrayWritable(obj, schema.getElementType()); + arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType()); } - wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, values2)}; - return new ArrayWritable(Writable.class, wrapperWritable); + // Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable + return new ArrayWritable(Writable.class, arrayValues); case MAP: Map mapValue = (Map) value; - Writable[] values3 = new Writable[mapValue.size()]; - int index3 = 0; + Writable[] mapValues = new Writable[mapValue.size()]; + int mapValueIndex = 0; for (Object entry : mapValue.entrySet()) { Map.Entry mapEntry = (Map.Entry) entry; - Writable[] mapValues = new Writable[2]; - mapValues[0] = new Text(mapEntry.getKey().toString()); - mapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType()); - values3[index3++] = new ArrayWritable(Writable.class, mapValues); + Writable[] nestedMapValues = new Writable[2]; + nestedMapValues[0] = new Text(mapEntry.getKey().toString()); + nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType()); + mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues); } - wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, values3)}; - return new ArrayWritable(Writable.class, wrapperWritable); + // Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable + return new ArrayWritable(Writable.class, mapValues); case UNION: List types = schema.getTypes(); if (types.size() != 2) { @@ -285,29 +293,13 @@ public abstract class AbstractRealtimeRecordReader { } } - public static Schema readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); - HoodieAvroDataBlock lastBlock = null; - while (reader.hasNext()) { - HoodieLogBlock block = reader.next(); - if (block instanceof HoodieAvroDataBlock) { - lastBlock = (HoodieAvroDataBlock) block; - } - } - reader.close(); - if (lastBlock != null) { - return lastBlock.getSchema(); - } - return null; - } - /** * Hive implementation of ParquetRecordReader results in partition columns not present in the original parquet file * to also be part of the projected schema. Hive expects the record reader implementation to return the row in its * entirety (with un-projected column having null values). As we use writerSchema for this, make sure writer schema * also includes partition columns + * * @param schema Schema to be changed - * @return */ private static Schema addPartitionFields(Schema schema, List partitioningFields) { final Set firstLevelFieldNames = schema.getFields().stream().map(Field::name) @@ -319,27 +311,26 @@ public abstract class AbstractRealtimeRecordReader { } /** - * Goes through the log files and populates a map with latest version of each key logged, since - * the base split was written. + * Goes through the log files in reverse order and finds the schema from the last available data block. If not, falls + * back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into + * the job conf. */ private void init() throws IOException { - writerSchema = new AvroSchemaConverter().convert(baseFileSchema); - List fieldNames = writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList()); - if (split.getDeltaFilePaths().size() > 0) { - String logPath = split.getDeltaFilePaths().get(split.getDeltaFilePaths().size() - 1); - FileSystem fs = FSUtils.getFs(logPath, jobConf); - writerSchema = readSchemaFromLogFile(fs, new Path(logPath)); - fieldNames = writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList()); + Schema schemaFromLogFile = LogReaderUtils + .readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaFilePaths(), jobConf); + if (schemaFromLogFile == null) { + writerSchema = new AvroSchemaConverter().convert(baseFileSchema); + LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields()); + } else { + writerSchema = schemaFromLogFile; + LOG.debug("Writer Schema From Log => " + writerSchema.getFields()); } - // Add partitioning fields to writer schema for resulting row to contain null values for these fields - String partitionFields = jobConf.get("partition_columns", ""); List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList()) : new ArrayList<>(); writerSchema = addPartitionFields(writerSchema, partitioningFields); - List projectionFields = orderFields( jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), @@ -347,7 +338,6 @@ public abstract class AbstractRealtimeRecordReader { // TODO(vc): In the future, the reader schema should be updated based on log files & be able // to null out fields not present before readerSchema = generateProjectionSchema(writerSchema, projectionFields); - LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", split.getDeltaFilePaths(), split.getPath(), projectionFields)); } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java index 6e323eca3..142637386 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -68,6 +69,15 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf public static final int HOODIE_COMMIT_TIME_COL_POS = 0; public static final int HOODIE_RECORD_KEY_COL_POS = 2; public static final int HOODIE_PARTITION_PATH_COL_POS = 3; + // Track the read column ids and names to be used throughout the execution and lifetime of this task + // Needed for Hive on Spark. Our theory is that due to + // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher} + // not handling empty list correctly, the ParquetRecordReaderWrapper ends up adding the same column ids multiple + // times which ultimately breaks the query. + // TODO : Find why RO view works fine but RT doesn't, JIRA: https://issues.apache.org/jira/browse/HUDI-151 + public static String READ_COLUMN_IDS; + public static String READ_COLUMN_NAMES; + public static boolean isReadColumnsSet = false; @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -190,7 +200,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf return conf; } - private static Configuration addRequiredProjectionFields(Configuration configuration) { + private static synchronized Configuration addRequiredProjectionFields(Configuration configuration) { // Need this to do merge records in HoodieRealtimeRecordReader configuration = addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS); @@ -198,11 +208,16 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf HOODIE_COMMIT_TIME_COL_POS); configuration = addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS); + if (!isReadColumnsSet) { + READ_COLUMN_IDS = configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); + READ_COLUMN_NAMES = configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + isReadColumnsSet = true; + } return configuration; } @Override - public RecordReader getRecordReader(final InputSplit split, + public RecordReader getRecordReader(final InputSplit split, final JobConf job, final Reporter reporter) throws IOException { LOG.info("Before adding Hoodie columns, Projections :" + job @@ -225,6 +240,10 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split); + // Reset the original column ids and names + job.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS); + job.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, READ_COLUMN_NAMES); + return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job, super.getRecordReader(split, job, reporter)); } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index 6a515b8b2..61f267a64 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -30,17 +31,17 @@ import org.apache.hadoop.mapred.RecordReader; * Realtime Record Reader which can do compacted (merge-on-read) record reading or * unmerged reading (parquet and log files read in parallel) based on job configuration. */ -public class HoodieRealtimeRecordReader implements RecordReader { +public class HoodieRealtimeRecordReader implements RecordReader { // Property to enable parallel reading of parquet and log files without merging. public static final String REALTIME_SKIP_MERGE_PROP = "hoodie.realtime.merge.skip"; // By default, we do merged-reading public static final String DEFAULT_REALTIME_SKIP_MERGE = "false"; public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class); - private final RecordReader reader; + private final RecordReader reader; public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job, - RecordReader realReader) { + RecordReader realReader) { this.reader = constructRecordReader(split, job, realReader); } @@ -56,8 +57,8 @@ public class HoodieRealtimeRecordReader implements RecordReader constructRecordReader(HoodieRealtimeFileSplit split, - JobConf jobConf, RecordReader realReader) { + private static RecordReader constructRecordReader(HoodieRealtimeFileSplit split, + JobConf jobConf, RecordReader realReader) { try { if (canSkipMerging(jobConf)) { LOG.info("Enabling un-merged reading of realtime records"); @@ -71,12 +72,12 @@ public class HoodieRealtimeRecordReader implements RecordReader { + RecordReader { - protected final RecordReader parquetReader; - private final HashMap deltaRecordMap; + protected final RecordReader parquetReader; + private final Map> deltaRecordMap; public RealtimeCompactedRecordReader(HoodieRealtimeFileSplit split, JobConf job, - RecordReader realReader) throws IOException { + RecordReader realReader) throws IOException { super(split, job); this.parquetReader = realReader; - this.deltaRecordMap = new HashMap<>(); - readAndCompactLog(); + this.deltaRecordMap = getMergedLogRecordScanner().getRecords(); } /** * Goes through the log files and populates a map with latest version of each key logged, since * the base split was written. */ - private void readAndCompactLog() throws IOException { - HoodieMergedLogRecordScanner compactedLogRecordScanner = new HoodieMergedLogRecordScanner( + private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOException { + // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit + // but can return records for completed commits > the commit we are trying to read (if using + // readCommit() API) + return new HoodieMergedLogRecordScanner( FSUtils.getFs(split.getPath().toString(), jobConf), split.getBasePath(), - split.getDeltaFilePaths(), getReaderSchema(), split.getMaxCommitTime(), getMaxCompactionMemoryInBytes(), + split.getDeltaFilePaths(), usesCustomPayload ? getWriterSchema() : getReaderSchema(), split.getMaxCommitTime(), + getMaxCompactionMemoryInBytes(), Boolean.valueOf(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH)); - // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit - // but can return records for completed commits > the commit we are trying to read (if using - // readCommit() API) - for (HoodieRecord hoodieRecord : compactedLogRecordScanner) { - Optional recordOptional = hoodieRecord.getData().getInsertValue(getReaderSchema()); - ArrayWritable aWritable; - String key = hoodieRecord.getRecordKey(); - if (recordOptional.isPresent()) { - GenericRecord rec = (GenericRecord) recordOptional.get(); - // we assume, a later safe record in the log, is newer than what we have in the map & - // replace it. - // TODO : handle deletes here - aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema()); - deltaRecordMap.put(key, aWritable); - } else { - aWritable = new ArrayWritable(Writable.class, new Writable[0]); - deltaRecordMap.put(key, aWritable); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Log record : " + arrayWritableToString(aWritable)); - } - } } @Override - public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException { + public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws IOException { // Call the underlying parquetReader.next - which may replace the passed in ArrayWritable // with a new block of values boolean result = this.parquetReader.next(aVoid, arrayWritable); @@ -96,18 +78,33 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme // return from delta records map if we have some match. String key = arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS] .toString(); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("key %s, base values: %s, log values: %s", key, - arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key)))); - } if (deltaRecordMap.containsKey(key)) { // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the // deltaRecord may not be a full record and needs values of columns from the parquet - Writable[] replaceValue = deltaRecordMap.get(key).get(); - if (replaceValue.length < 1) { - // This record has been deleted, move to the next record + Optional rec; + if (usesCustomPayload) { + rec = deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema()); + } else { + rec = deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema()); + } + if (!rec.isPresent()) { + // If the record is not present, this is a delete record using an empty payload so skip this base record + // and move to the next record return next(aVoid, arrayWritable); } + GenericRecord recordToReturn = rec.get(); + if (usesCustomPayload) { + // If using a custom payload, return only the projection fields + recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema()); + } + // we assume, a later safe record in the log, is newer than what we have in the map & + // replace it. + ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getWriterSchema()); + Writable[] replaceValue = aWritable.get(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("key %s, base values: %s, log values: %s", key, + arrayWritableToString(arrayWritable), arrayWritableToString(aWritable))); + } Writable[] originalValue = arrayWritable.get(); try { System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length); @@ -115,7 +112,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme } catch (RuntimeException re) { LOG.error("Got exception when doing array copy", re); LOG.error("Base record :" + arrayWritableToString(arrayWritable)); - LOG.error("Log record :" + arrayWritableToString(deltaRecordMap.get(key))); + LOG.error("Log record :" + arrayWritableToString(aWritable)); throw re; } } @@ -124,7 +121,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme } @Override - public Void createKey() { + public NullWritable createKey() { return parquetReader.createKey(); } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java index 861e01fcc..da0994507 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -34,20 +34,21 @@ import java.util.List; import java.util.Optional; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implements - RecordReader { + RecordReader { // Log Record unmerged scanner private final HoodieUnMergedLogRecordScanner logRecordScanner; // Parquet record reader - private final RecordReader parquetReader; + private final RecordReader parquetReader; // Parquet record iterator wrapper for the above reader - private final RecordReaderValueIterator parquetRecordsIterator; + private final RecordReaderValueIterator parquetRecordsIterator; // Executor that runs the above producers in parallel private final BoundedInMemoryExecutor executor; @@ -64,7 +65,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implemen * @param realReader Parquet Reader */ public RealtimeUnmergedRecordReader(HoodieRealtimeFileSplit split, JobConf job, - RecordReader realReader) { + RecordReader realReader) { super(split, job); this.parquetReader = new SafeParquetRecordReaderWrapper(realReader); // Iterator for consuming records from parquet file @@ -103,7 +104,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implemen } @Override - public boolean next(Void key, ArrayWritable value) throws IOException { + public boolean next(NullWritable key, ArrayWritable value) throws IOException { if (!iterator.hasNext()) { return false; } @@ -113,7 +114,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader implemen } @Override - public Void createKey() { + public NullWritable createKey() { return parquetReader.createKey(); } diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java index 93c38b202..dbda1ed50 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java @@ -26,6 +26,7 @@ import java.io.IOException; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -214,9 +215,9 @@ public class HoodieInputFormatTest { int totalCount = 0; InputSplit[] splits = inputFormat.getSplits(jobConf, 1); for (InputSplit split : splits) { - RecordReader recordReader = inputFormat + RecordReader recordReader = inputFormat .getRecordReader(split, jobConf, null); - Void key = recordReader.createKey(); + NullWritable key = recordReader.createKey(); ArrayWritable writable = recordReader.createValue(); while (recordReader.next(key, writable)) { diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java index 57272ff9a..2fa07055e 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java @@ -20,7 +20,9 @@ package com.uber.hoodie.hadoop; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.SchemaTestUtil; import java.io.File; import java.io.FilenameFilter; @@ -29,8 +31,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.parquet.avro.AvroParquetWriter; @@ -79,6 +83,11 @@ public class InputFormatTestUtil { new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit").createNewFile(); } + public static void deltaCommit(TemporaryFolder basePath, String commitNumber) throws IOException { + // create the commit + new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".deltacommit").createNewFile(); + } + public static void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) { String modePropertyName = String @@ -107,6 +116,16 @@ public class InputFormatTestUtil { return partitionPath; } + + public static File prepareSimpleParquetDataset(TemporaryFolder basePath, Schema schema, + int numberOfFiles, int numberOfRecords, String commitNumber) throws Exception { + basePath.create(); + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString()); + File partitionPath = basePath.newFolder("2016", "05", "01"); + createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber); + return partitionPath; + } + public static File prepareNonPartitionedParquetDataset(TemporaryFolder baseDir, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException { baseDir.create(); @@ -135,6 +154,31 @@ public class InputFormatTestUtil { } } + private static void createSimpleData(Schema schema, + File partitionPath, int numberOfFiles, int numberOfRecords, String commitNumber) + throws Exception { + AvroParquetWriter parquetWriter; + for (int i = 0; i < numberOfFiles; i++) { + String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" + i); + File dataFile = new File(partitionPath, fileId); + parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema); + try { + List records = SchemaTestUtil.generateTestRecords(0, numberOfRecords); + String commitTime = HoodieActiveTimeline.createNewCommitTime(); + Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(schema); + for (IndexedRecord record : records) { + GenericRecord p = HoodieAvroUtils.rewriteRecord((GenericRecord) record, hoodieFieldsSchema); + p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, UUID.randomUUID().toString()); + p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00"); + p.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitNumber); + parquetWriter.write(p); + } + } finally { + parquetWriter.close(); + } + } + } + private static Iterable generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException { List records = new ArrayList<>(numberOfRecords); diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index c9ffa802c..dc268d68c 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -60,6 +61,7 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -91,7 +93,7 @@ public class HoodieRealtimeRecordReaderTest { private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit, int numberOfRecords) throws InterruptedException, IOException { - return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0); + return writeDataBlockToLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, 0, 0); } private HoodieLogFormat.Writer writeRollback(File partitionDir, Schema schema, String fileId, @@ -115,7 +117,7 @@ public class HoodieRealtimeRecordReaderTest { return writer; } - private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId, + private HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Schema schema, String fileId, String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) throws InterruptedException, IOException { HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() @@ -137,6 +139,25 @@ public class HoodieRealtimeRecordReaderTest { return writer; } + private HoodieLogFormat.Writer writeRollbackBlockToLogFile(File partitionDir, Schema schema, String fileId, + String baseCommit, String newCommit, String oldCommit, int logVersion) + throws InterruptedException, IOException { + HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(new Path(partitionDir.getPath())) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId) + .overBaseCommit(baseCommit).withLogVersion(logVersion).withFs(fs).build(); + + Map header = Maps.newHashMap(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, oldCommit); + header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK + .ordinal())); + HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header); + writer = writer.appendBlock(rollbackBlock); + return writer; + } + @Test public void testReader() throws Exception { testReader(true); @@ -155,7 +176,7 @@ public class HoodieRealtimeRecordReaderTest { String baseInstant = "100"; File partitionDir = partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, baseInstant) - : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, baseInstant); + : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 100, baseInstant); InputFormatTestUtil.commit(basePath, baseInstant); // Add the paths FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); @@ -183,7 +204,7 @@ public class HoodieRealtimeRecordReaderTest { writer = writeRollback(partitionDir, schema, "fileid0", baseInstant, instantTime, String.valueOf(baseInstantTs + logVersion - 1), logVersion); } else { - writer = writeLogFile(partitionDir, schema, "fileid0", baseInstant, + writer = writeDataBlockToLogFile(partitionDir, schema, "fileid0", baseInstant, instantTime, 100, 0, logVersion); } long size = writer.getCurrentSize(); @@ -199,7 +220,7 @@ public class HoodieRealtimeRecordReaderTest { .collect(Collectors.toList()), instantTime); //create a RecordReader to be used by HoodieRealtimeRecordReader - RecordReader reader = + RecordReader reader = new MapredParquetInputFormat().getRecordReader( new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); @@ -219,7 +240,7 @@ public class HoodieRealtimeRecordReaderTest { //use reader to read base Parquet File and log file, merge in flight and return latest commit //here all 100 records should be updated, see above - Void key = recordReader.createKey(); + NullWritable key = recordReader.createKey(); ArrayWritable value = recordReader.createValue(); while (recordReader.next(key, value)) { Writable[] values = value.get(); @@ -255,7 +276,7 @@ public class HoodieRealtimeRecordReaderTest { // insert new records to log file String newCommitTime = "101"; - HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, + HoodieLogFormat.Writer writer = writeDataBlockToLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, numRecords, numRecords, 0); long size = writer.getCurrentSize(); writer.close(); @@ -268,7 +289,7 @@ public class HoodieRealtimeRecordReaderTest { jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); //create a RecordReader to be used by HoodieRealtimeRecordReader - RecordReader reader = + RecordReader reader = new MapredParquetInputFormat().getRecordReader( new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); @@ -288,7 +309,7 @@ public class HoodieRealtimeRecordReaderTest { //use reader to read base Parquet File and log file //here all records should be present. Also ensure log records are in order. - Void key = recordReader.createKey(); + NullWritable key = recordReader.createKey(); ArrayWritable value = recordReader.createValue(); int numRecordsAtCommit1 = 0; int numRecordsAtCommit2 = 0; @@ -343,6 +364,7 @@ public class HoodieRealtimeRecordReaderTest { long size = writer.getCurrentSize(); writer.close(); assertTrue("block - size should be > 0", size > 0); + InputFormatTestUtil.deltaCommit(basePath, newCommitTime); //create a split with baseFile (parquet file written earlier) and new log file(s) String logFilePath = writer.getLogFile().getPath().toString(); @@ -351,7 +373,7 @@ public class HoodieRealtimeRecordReaderTest { jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); //create a RecordReader to be used by HoodieRealtimeRecordReader - RecordReader reader = + RecordReader reader = new MapredParquetInputFormat().getRecordReader( new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), jobConf, null); @@ -370,7 +392,7 @@ public class HoodieRealtimeRecordReaderTest { // use reader to read base Parquet File and log file, merge in flight and return latest commit // here the first 50 records should be updated, see above - Void key = recordReader.createKey(); + NullWritable key = recordReader.createKey(); ArrayWritable value = recordReader.createValue(); int numRecordsRead = 0; while (recordReader.next(key, value)) { @@ -420,26 +442,26 @@ public class HoodieRealtimeRecordReaderTest { // Assert type MAP ArrayWritable mapItem = (ArrayWritable) values[12]; - Writable[] mapItemValues = ((ArrayWritable) mapItem.get()[0]).get(); - ArrayWritable mapItemValue1 = (ArrayWritable) mapItemValues[0]; - ArrayWritable mapItemValue2 = (ArrayWritable) mapItemValues[1]; - Assert.assertEquals("test value for field: tags", mapItemValue1.get()[0].toString(), + Writable mapItemValue1 = mapItem.get()[0]; + Writable mapItemValue2 = mapItem.get()[1]; + + Assert.assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue1).get()[0].toString(), "mapItem1"); - Assert.assertEquals("test value for field: tags", mapItemValue2.get()[0].toString(), + Assert.assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue2).get()[0].toString(), "mapItem2"); - ArrayWritable mapItemValue1value = (ArrayWritable) mapItemValue1.get()[1]; - ArrayWritable mapItemValue2value = (ArrayWritable) mapItemValue2.get()[1]; - Assert.assertEquals("test value for field: tags", mapItemValue1value.get().length, 2); - Assert.assertEquals("test value for field: tags", mapItemValue2value.get().length, 2); + Assert.assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue1).get().length, 2); + Assert.assertEquals("test value for field: tags", ((ArrayWritable) mapItemValue2).get().length, 2); + Writable mapItemValue1value = ((ArrayWritable) mapItemValue1).get()[1]; + Writable mapItemValue2value = ((ArrayWritable) mapItemValue2).get()[1]; Assert.assertEquals("test value for field: tags[\"mapItem1\"].item1", - mapItemValue1value.get()[0].toString(), "item" + currentRecordNo); + ((ArrayWritable) mapItemValue1value).get()[0].toString(), "item" + currentRecordNo); Assert.assertEquals("test value for field: tags[\"mapItem2\"].item1", - mapItemValue2value.get()[0].toString(), "item2" + currentRecordNo); + ((ArrayWritable) mapItemValue2value).get()[0].toString(), "item2" + currentRecordNo); Assert.assertEquals("test value for field: tags[\"mapItem1\"].item2", - mapItemValue1value.get()[1].toString(), + ((ArrayWritable) mapItemValue1value).get()[1].toString(), "item" + currentRecordNo + recordCommitTimeSuffix); Assert.assertEquals("test value for field: tags[\"mapItem2\"].item2", - mapItemValue2value.get()[1].toString(), + ((ArrayWritable) mapItemValue2value).get()[1].toString(), "item2" + currentRecordNo + recordCommitTimeSuffix); // Assert type RECORD @@ -453,11 +475,96 @@ public class HoodieRealtimeRecordReaderTest { // Assert type ARRAY ArrayWritable arrayValue = (ArrayWritable) values[14]; - Writable[] arrayValues = ((ArrayWritable) arrayValue.get()[0]).get(); + Writable[] arrayValues = arrayValue.get(); for (int i = 0; i < arrayValues.length; i++) { Assert.assertEquals("test value for field: stringArray", "stringArray" + i + recordCommitTimeSuffix, arrayValues[i].toString()); } } } -} + + @Test + public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception { + // initial commit + List logFilePaths = new ArrayList<>(); + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); + HoodieTestUtils.initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(), + HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + int numberOfRecords = 100; + int numberOfLogRecords = numberOfRecords / 2; + File partitionDir = InputFormatTestUtil + .prepareSimpleParquetDataset(basePath, schema, 1, numberOfRecords, commitTime); + InputFormatTestUtil.commit(basePath, commitTime); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + List firstSchemaFields = schema.getFields(); + + // update files and generate new log file but don't commit + schema = SchemaTestUtil.getComplexEvolvedSchema(); + String newCommitTime = "101"; + HoodieLogFormat.Writer writer = writeDataBlockToLogFile(partitionDir, schema, "fileid0", commitTime, + newCommitTime, numberOfLogRecords, 0, 1); + long size = writer.getCurrentSize(); + logFilePaths.add(writer.getLogFile().getPath().toString()); + writer.close(); + assertTrue("block - size should be > 0", size > 0); + + // write rollback for the previous block in new log file version + newCommitTime = "102"; + writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0", commitTime, + newCommitTime, "101", 1); + logFilePaths.add(writer.getLogFile().getPath().toString()); + writer.close(); + assertTrue("block - size should be > 0", size > 0); + InputFormatTestUtil.deltaCommit(basePath, newCommitTime); + + //create a split with baseFile (parquet file written earlier) and new log file(s) + HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( + new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + ".parquet"), 0, 1, + jobConf), basePath.getRoot().getPath(), logFilePaths, newCommitTime); + + //create a RecordReader to be used by HoodieRealtimeRecordReader + RecordReader reader = + new MapredParquetInputFormat().getRecordReader( + new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), (String[]) null), + jobConf, null); + JobConf jobConf = new JobConf(); + List fields = schema.getFields(); + + assert (firstSchemaFields.containsAll(fields) == false); + + // Try to read all the fields passed by the new schema + String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(",")); + String positions = fields.stream().map(f -> String.valueOf(f.pos())) + .collect(Collectors.joining(",")); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); + jobConf.set("partition_columns", "datestr"); + + HoodieRealtimeRecordReader recordReader = null; + try { + // validate record reader compaction + recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); + throw new RuntimeException("should've failed the previous line"); + } catch (HoodieException e) { + // expected, field not found since the data written with the evolved schema was rolled back + } + + // Try to read all the fields passed by the new schema + names = firstSchemaFields.stream().map(f -> f.name()).collect(Collectors.joining(",")); + positions = firstSchemaFields.stream().map(f -> String.valueOf(f.pos())) + .collect(Collectors.joining(",")); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); + jobConf.set("partition_columns", "datestr"); + // This time read only the fields which are part of parquet + recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); + // use reader to read base Parquet File and log file + NullWritable key = recordReader.createKey(); + ArrayWritable value = recordReader.createValue(); + while (recordReader.next(key, value)) { + // keep reading + } + } +} \ No newline at end of file diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml index 01875a7bb..198b1925b 100644 --- a/hoodie-hive/pom.xml +++ b/hoodie-hive/pom.xml @@ -68,6 +68,11 @@ commons-dbcp + + commons-pool + commons-pool + + commons-io commons-io @@ -107,6 +112,16 @@ ${hive.groupid} hive-service ${hive.version} + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + ${hive.groupid} diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index 8d6e84fca..c25200726 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -182,7 +182,10 @@ public class HoodieHiveClient { private List constructChangePartitions(List partitions) { List changePartitions = Lists.newArrayList(); - String alterTable = "ALTER TABLE " + syncConfig.databaseName + "." + syncConfig.tableName; + // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first + String useDatabase = "USE " + syncConfig.databaseName; + changePartitions.add(useDatabase); + String alterTable = "ALTER TABLE " + syncConfig.tableName; for (String partition : partitions) { String partitionClause = getPartitionClause(partition); String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString(); @@ -494,7 +497,7 @@ public class HoodieHiveClient { if (!hiveJdbcUrl.endsWith("/")) { hiveJdbcUrl = hiveJdbcUrl + "/"; } - return hiveJdbcUrl + syncConfig.databaseName + (urlAppend == null ? "" : urlAppend); + return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend); } private static void closeQuietly(ResultSet resultSet, Statement stmt) { @@ -585,7 +588,7 @@ public class HoodieHiveClient { try { Table table = client.getTable(syncConfig.databaseName, syncConfig.tableName); table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced); - client.alter_table(syncConfig.databaseName, syncConfig.tableName, table, true); + client.alter_table(syncConfig.databaseName, syncConfig.tableName, table); } catch (Exception e) { throw new HoodieHiveSyncException( "Failed to get update last commit time synced to " + lastCommitSynced, e); diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java index 2f2a9c8b9..2a7451f26 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java @@ -152,6 +152,9 @@ public class HiveTestService { derbyLogFile.createNewFile(); setSystemProperty("derby.stream.error.file", derbyLogFile.getPath()); conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, Files.createTempDir().getAbsolutePath()); + conf.set("datanucleus.schema.autoCreateTables", "true"); + conf.set("hive.metastore.schema.verification", "false"); + setSystemProperty("derby.stream.error.file", derbyLogFile.getPath()); return new HiveConf(conf, this.getClass()); } diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml index e325041b1..41798370d 100644 --- a/hoodie-utilities/pom.xml +++ b/hoodie-utilities/pom.xml @@ -68,6 +68,12 @@ io.javalin javalin 2.4.0 + + + org.eclipse.jetty + * + + @@ -108,6 +114,18 @@ com.uber.hoodie hoodie-spark ${project.version} + + + javax.servlet + servlet-api + + + + + + org.eclipse.jetty + jetty-server + 7.6.0.v20120127 @@ -135,18 +153,10 @@ - - ${hive.groupid} - hive-exec - ${hive.version} - test - - ${hive.groupid} hive-jdbc ${hive.version} - standalone org.slf4j @@ -159,6 +169,19 @@ + + ${hive.groupid} + hive-exec + ${hive.version} + test + + + + ${hive.groupid} + hive-service + ${hive.version} + + com.uber.hoodie hoodie-hive @@ -232,11 +255,23 @@ org.apache.spark spark-core_2.11 + + + javax.servlet + * + + org.apache.spark spark-sql_2.11 + + + javax.servlet + * + + diff --git a/pom.xml b/pom.xml index 525a6b932..27d1bd8b0 100644 --- a/pom.xml +++ b/pom.xml @@ -136,7 +136,7 @@ 2.9.9 2.7.3 org.apache.hive - 1.2.1 + 2.3.1 4.0.2 2.1.0 1.7.7 diff --git a/release/config/license-mappings.xml b/release/config/license-mappings.xml index acf5de43c..8666f1a05 100644 --- a/release/config/license-mappings.xml +++ b/release/config/license-mappings.xml @@ -25,16 +25,31 @@ servlet-api CDDL - - javax.servlet.jsp - jsp-api - CDDL - - - javax.transaction - jta - OWN LICENSE (See http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html) - + + javax.servlet + jsp-api + CDDL + + + javax.xml.stream + stax-api + CDDL + + + javax.servlet.jsp + jsp-api + CDDL + + + javax.transaction + jta + OWN LICENSE (See http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html) + + + javax.transaction + transaction-api + OWN LICENSE (See http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html) + jdk.tools jdk.tools @@ -90,4 +105,9 @@ antlr-runtime BSD + + xerces + xercesImpl + Apache License, Version 1.1 +