From b1cf097b0cb685ebfb0e507ac8ad777de6baa8ac Mon Sep 17 00:00:00 2001 From: Jian Xu Date: Fri, 30 Jun 2017 17:03:44 -0700 Subject: [PATCH] Add nested fields support for MOR tables --- .../hoodie/common/util/SchemaTestUtil.java | 5 + .../uber/hoodie/common/util/TestRecord.java | 98 ++++++++++++++ .../test/resources/complex-test-evolved.avro | 17 +++ .../realtime/HoodieRealtimeRecordReader.java | 20 ++- .../hoodie/hadoop/InputFormatTestUtil.java | 13 +- .../HoodieRealtimeRecordReaderTest.java | 122 +++++++++++++++++- 6 files changed, 268 insertions(+), 7 deletions(-) create mode 100644 hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRecord.java create mode 100644 hoodie-common/src/test/resources/complex-test-evolved.avro diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java index b970155d2..cab3d6d37 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java @@ -113,4 +113,9 @@ public class SchemaTestUtil { throws IOException, URISyntaxException { return toRecords(getSimpleSchema(), getEvolvedSchema(), from, limit); } + + public static Schema getComplexEvolvedSchema() throws IOException { + return new Schema.Parser() + .parse(SchemaTestUtil.class.getResourceAsStream("/complex-test-evolved.avro")); + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRecord.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRecord.java new file mode 100644 index 000000000..7852749a0 --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRecord.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2016,2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonMethod; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@JsonIgnoreProperties(ignoreUnknown = true) +@SuppressWarnings({"unused", "FieldCanBeLocal", "MismatchedQueryAndUpdateOfCollection"}) +public class TestRecord implements Serializable { + class TestMapItemRecord implements Serializable { + private String item1; + private String item2; + + TestMapItemRecord(String item1, String item2) { + this.item1 = item1; + this.item2 = item2; + } + } + + class TestNestedRecord implements Serializable { + private boolean isAdmin; + private String userId; + + TestNestedRecord(boolean isAdmin, String userId) { + this.isAdmin = isAdmin; + this.userId = userId; + } + } + + private String _hoodie_commit_time; + private String _hoodie_record_key; + private String _hoodie_partition_path; + private String _hoodie_file_name; + private String _hoodie_commit_seqno; + + private String field1; + private String field2; + private String name; + private Integer favoriteIntNumber; + private Long favoriteNumber; + private Float favoriteFloatNumber; + private Double favoriteDoubleNumber; + private Map tags; + private TestNestedRecord testNestedRecord; + private String[] stringArray; + + public TestRecord(String commitTime, int recordNumber, String fileId) { + this._hoodie_commit_time = commitTime; + this._hoodie_record_key = "key" + recordNumber; + this._hoodie_partition_path = commitTime; + this._hoodie_file_name = fileId; + this._hoodie_commit_seqno = commitTime + recordNumber; + + String commitTimeSuffix = "@" + commitTime; + int commitHashCode = commitTime.hashCode(); + + this.field1 = "field" + recordNumber; + this.field2 = "field" + recordNumber + commitTimeSuffix; + this.name = "name" + recordNumber; + this.favoriteIntNumber = recordNumber + commitHashCode; + this.favoriteNumber = (long)(recordNumber + commitHashCode); + this.favoriteFloatNumber = (float)((recordNumber + commitHashCode) / 1024.0); + this.favoriteDoubleNumber = (recordNumber + commitHashCode) / 1024.0; + this.tags = new HashMap<>(); + this.tags.put("mapItem1", new TestMapItemRecord("item" + recordNumber, "item" + recordNumber + commitTimeSuffix)); + this.tags.put("mapItem2", new TestMapItemRecord("item2" + recordNumber, "item2" + recordNumber + commitTimeSuffix)); + this.testNestedRecord = new TestNestedRecord(false, "UserId" + recordNumber + commitTimeSuffix); + this.stringArray = new String[]{"stringArray0" + commitTimeSuffix, "stringArray1" + commitTimeSuffix}; + } + + public String toJsonString() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + mapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY); + return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(this); + } +} diff --git a/hoodie-common/src/test/resources/complex-test-evolved.avro b/hoodie-common/src/test/resources/complex-test-evolved.avro new file mode 100644 index 000000000..d68a9cb35 --- /dev/null +++ b/hoodie-common/src/test/resources/complex-test-evolved.avro @@ -0,0 +1,17 @@ +{ + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "field1", "type": ["null", "string"], "default": null}, + {"name": "field2", "type": ["null", "string"], "default": null}, + {"name": "name", "type": ["null", "string"], "default": null}, + {"name": "favoriteIntNumber", "type": ["null", "int"], "default": null}, + {"name": "favoriteNumber", "type": ["null", "long"], "default": null}, + {"name": "favoriteFloatNumber", "type": ["null", "float"], "default": null}, + {"name": "favoriteDoubleNumber", "type": ["null", "double"], "default": null}, + {"name": "tags", "type": ["null", {"values": ["null", {"fields": [{"default": null, "type": ["null", "string"], "name": "item1"}, {"default": null, "type": ["null", "string"], "name": "item2"} ], "type": "record", "name": "tagsMapItems"} ], "type": "map"} ], "default": null}, + {"default": null, "name": "testNestedRecord", "type": ["null", {"fields": [{"default": null, "name": "isAdmin", "type": ["null", "boolean"] }, {"default": null, "name": "userId", "type": ["null", "string"] } ], "name": "notes", "type": "record"}]}, + {"default": null, "name": "stringArray", "type": ["null", {"items": "string", "type": "array"}]} + ] +} diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index ed22c14c2..b68a299db 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -55,6 +55,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; + import parquet.avro.AvroSchemaConverter; import parquet.hadoop.ParquetFileReader; import parquet.schema.MessageType; @@ -250,15 +252,25 @@ public class HoodieRealtimeRecordReader implements RecordReader types = schema.getTypes(); if (types.size() != 2) { diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java index a1f263097..d48bfa22c 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java @@ -16,9 +16,11 @@ package com.uber.hoodie.hadoop; +import com.uber.hoodie.avro.MercifulJsonConverter; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.TestRecord; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -115,10 +117,10 @@ public class InputFormatTestUtil { } - private static Iterable generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) { + private static Iterable generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException { List records = new ArrayList<>(numberOfRecords); for(int i=0;i records = new ArrayList<>(); for(int i=0; i < numberOfRecords; i++) { - records.add(InputFormatTestUtil.generateAvroRecord(schema, i, newCommit, "fileid0")); + records.add(InputFormatTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); } Schema writeSchema = records.get(0).getSchema(); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema); @@ -134,4 +139,119 @@ public class HoodieRealtimeRecordReaderTest { } } + @Test + public void testReaderWithNestedAndComplexSchema() throws Exception { + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema()); + HoodieTestUtils.initTableType(basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + int numberOfRecords = 100; + int numberOfLogRecords = numberOfRecords / 2; + File partitionDir = InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, numberOfRecords, commitTime); + InputFormatTestUtil.commit(basePath, commitTime); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + // update files or generate new log file + String newCommitTime = "101"; + HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, numberOfLogRecords); + long size = writer.getCurrentSize(); + writer.close(); + assertTrue("block - size should be > 0", size > 0); + + //create a split with baseFile (parquet file written earlier) and new log file(s) + String logFilePath = writer.getLogFile().getPath().toString(); + HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir + + "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime); + + //create a RecordReader to be used by HoodieRealtimeRecordReader + RecordReader reader = + new MapredParquetInputFormat(). + getRecordReader(new FileSplit(split.getPath(), 0, + FSUtils.getFs().getLength(split.getPath()), (String[]) null), jobConf, null); + JobConf jobConf = new JobConf(); + List fields = schema.getFields(); + + String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(",")); + String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); + jobConf.set("partition_columns", "datestr"); + + // validate record reader compaction + HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); + + // use reader to read base Parquet File and log file, merge in flight and return latest commit + // here the first 50 records should be updated, see above + Void key = recordReader.createKey(); + ArrayWritable value = recordReader.createValue(); + int numRecordsRead = 0; + while (recordReader.next(key, value)) { + int currentRecordNo = numRecordsRead; + ++numRecordsRead; + Writable[] values = value.get(); + String recordCommitTime; + //check if the record written is with latest commit, here "101" + if (numRecordsRead > numberOfLogRecords) { + recordCommitTime = commitTime; + } else { + recordCommitTime = newCommitTime; + } + String recordCommitTimeSuffix = "@" + recordCommitTime; + + Assert.assertEquals(values[0].toString(), recordCommitTime); + key = recordReader.createKey(); + value = recordReader.createValue(); + + // Assert type STRING + Assert.assertEquals("test value for field: field1", values[5].toString(), "field" + currentRecordNo); + Assert.assertEquals("test value for field: field2",values[6].toString(), "field" + currentRecordNo + recordCommitTimeSuffix); + Assert.assertEquals("test value for field: name", values[7].toString(), "name" + currentRecordNo); + + // Assert type INT + IntWritable intWritable = (IntWritable)values[8]; + Assert.assertEquals("test value for field: favoriteIntNumber", intWritable.get(), currentRecordNo + recordCommitTime.hashCode()); + + // Assert type LONG + LongWritable longWritable = (LongWritable)values[9]; + Assert.assertEquals("test value for field: favoriteNumber", longWritable.get(), currentRecordNo + recordCommitTime.hashCode()); + + // Assert type FLOAT + FloatWritable floatWritable = (FloatWritable)values[10]; + Assert.assertEquals("test value for field: favoriteFloatNumber", floatWritable.get(), (float)((currentRecordNo + recordCommitTime.hashCode()) / 1024.0), 0); + + // Assert type DOUBLE + DoubleWritable doubleWritable = (DoubleWritable)values[11]; + Assert.assertEquals("test value for field: favoriteDoubleNumber", doubleWritable.get(), (currentRecordNo + recordCommitTime.hashCode()) / 1024.0, 0); + + // Assert type MAP + ArrayWritable mapItem = (ArrayWritable)values[12]; + Writable[] mapItemValues = mapItem.get(); + ArrayWritable mapItemValue1 = (ArrayWritable)mapItemValues[0]; + ArrayWritable mapItemValue2 = (ArrayWritable)mapItemValues[1]; + Assert.assertEquals("test value for field: tags", mapItemValue1.get()[0].toString(), "mapItem1"); + Assert.assertEquals("test value for field: tags", mapItemValue2.get()[0].toString(), "mapItem2"); + ArrayWritable mapItemValue1value = (ArrayWritable)mapItemValue1.get()[1]; + ArrayWritable mapItemValue2value = (ArrayWritable)mapItemValue2.get()[1]; + Assert.assertEquals("test value for field: tags", mapItemValue1value.get().length, 2); + Assert.assertEquals("test value for field: tags", mapItemValue2value.get().length, 2); + Assert.assertEquals("test value for field: tags[\"mapItem1\"].item1", mapItemValue1value.get()[0].toString(), "item" + currentRecordNo); + Assert.assertEquals("test value for field: tags[\"mapItem2\"].item1", mapItemValue2value.get()[0].toString(), "item2" + currentRecordNo); + Assert.assertEquals("test value for field: tags[\"mapItem1\"].item2", mapItemValue1value.get()[1].toString(), "item" + currentRecordNo + recordCommitTimeSuffix); + Assert.assertEquals("test value for field: tags[\"mapItem2\"].item2", mapItemValue2value.get()[1].toString(), "item2" + currentRecordNo + recordCommitTimeSuffix); + + // Assert type RECORD + ArrayWritable recordItem = (ArrayWritable)values[13]; + Writable[] nestedRecord = recordItem.get(); + Assert.assertEquals("test value for field: testNestedRecord.isAdmin", ((BooleanWritable)nestedRecord[0]).get(), false); + Assert.assertEquals("test value for field: testNestedRecord.userId", nestedRecord[1].toString(), "UserId" + currentRecordNo + recordCommitTimeSuffix); + + // Assert type ARRAY + ArrayWritable arrayValue = (ArrayWritable)values[14]; + Writable[] arrayValues = arrayValue.get(); + for (int i = 0; i < arrayValues.length; i++) { + Assert.assertEquals("test value for field: stringArray", arrayValues[i].toString(), "stringArray" + i + recordCommitTimeSuffix); + } + } + } }