1
0

Add nested fields support for MOR tables

This commit is contained in:
Jian Xu
2017-06-30 17:03:44 -07:00
committed by vinoth chandar
parent 6a3c94aaa3
commit b1cf097b0c
6 changed files with 268 additions and 7 deletions

View File

@@ -16,9 +16,11 @@
package com.uber.hoodie.hadoop;
import com.uber.hoodie.avro.MercifulJsonConverter;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.TestRecord;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -115,10 +117,10 @@ public class InputFormatTestUtil {
}
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) {
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) throws IOException {
List<GenericRecord> records = new ArrayList<>(numberOfRecords);
for(int i=0;i<numberOfRecords;i++) {
records.add(generateAvroRecord(schema, i, commitTime, fileId));
records.add(generateAvroRecordFromJson(schema, i, commitTime, fileId));
}
return records;
}
@@ -136,6 +138,13 @@ public class InputFormatTestUtil {
return record;
}
public static GenericRecord generateAvroRecordFromJson(Schema schema, int recordNumber,
String commitTime, String fileId) throws IOException {
TestRecord record = new TestRecord(commitTime, recordNumber, fileId);
MercifulJsonConverter converter = new MercifulJsonConverter(schema);
return converter.convert(record.toJsonString());
}
public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit,
int totalNumberOfRecords, int numberOfRecordsToUpdate,
String newCommit) throws IOException {

View File

@@ -34,6 +34,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -73,7 +78,7 @@ public class HoodieRealtimeRecordReaderTest {
.overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build();
List<IndexedRecord> records = new ArrayList<>();
for(int i=0; i < numberOfRecords; i++) {
records.add(InputFormatTestUtil.generateAvroRecord(schema, i, newCommit, "fileid0"));
records.add(InputFormatTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
}
Schema writeSchema = records.get(0).getSchema();
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema);
@@ -134,4 +139,119 @@ public class HoodieRealtimeRecordReaderTest {
}
}
@Test
public void testReaderWithNestedAndComplexSchema() throws Exception {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema());
HoodieTestUtils.initTableType(basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
String commitTime = "100";
int numberOfRecords = 100;
int numberOfLogRecords = numberOfRecords / 2;
File partitionDir = InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, numberOfRecords, commitTime);
InputFormatTestUtil.commit(basePath, commitTime);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
// update files or generate new log file
String newCommitTime = "101";
HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, numberOfLogRecords);
long size = writer.getCurrentSize();
writer.close();
assertTrue("block - size should be > 0", size > 0);
//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir
+ "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
RecordReader<Void, ArrayWritable> reader =
new MapredParquetInputFormat().
getRecordReader(new FileSplit(split.getPath(), 0,
FSUtils.getFs().getLength(split.getPath()), (String[]) null), jobConf, null);
JobConf jobConf = new JobConf();
List<Schema.Field> fields = schema.getFields();
String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(","));
String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
jobConf.set("partition_columns", "datestr");
// validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
// use reader to read base Parquet File and log file, merge in flight and return latest commit
// here the first 50 records should be updated, see above
Void key = recordReader.createKey();
ArrayWritable value = recordReader.createValue();
int numRecordsRead = 0;
while (recordReader.next(key, value)) {
int currentRecordNo = numRecordsRead;
++numRecordsRead;
Writable[] values = value.get();
String recordCommitTime;
//check if the record written is with latest commit, here "101"
if (numRecordsRead > numberOfLogRecords) {
recordCommitTime = commitTime;
} else {
recordCommitTime = newCommitTime;
}
String recordCommitTimeSuffix = "@" + recordCommitTime;
Assert.assertEquals(values[0].toString(), recordCommitTime);
key = recordReader.createKey();
value = recordReader.createValue();
// Assert type STRING
Assert.assertEquals("test value for field: field1", values[5].toString(), "field" + currentRecordNo);
Assert.assertEquals("test value for field: field2",values[6].toString(), "field" + currentRecordNo + recordCommitTimeSuffix);
Assert.assertEquals("test value for field: name", values[7].toString(), "name" + currentRecordNo);
// Assert type INT
IntWritable intWritable = (IntWritable)values[8];
Assert.assertEquals("test value for field: favoriteIntNumber", intWritable.get(), currentRecordNo + recordCommitTime.hashCode());
// Assert type LONG
LongWritable longWritable = (LongWritable)values[9];
Assert.assertEquals("test value for field: favoriteNumber", longWritable.get(), currentRecordNo + recordCommitTime.hashCode());
// Assert type FLOAT
FloatWritable floatWritable = (FloatWritable)values[10];
Assert.assertEquals("test value for field: favoriteFloatNumber", floatWritable.get(), (float)((currentRecordNo + recordCommitTime.hashCode()) / 1024.0), 0);
// Assert type DOUBLE
DoubleWritable doubleWritable = (DoubleWritable)values[11];
Assert.assertEquals("test value for field: favoriteDoubleNumber", doubleWritable.get(), (currentRecordNo + recordCommitTime.hashCode()) / 1024.0, 0);
// Assert type MAP
ArrayWritable mapItem = (ArrayWritable)values[12];
Writable[] mapItemValues = mapItem.get();
ArrayWritable mapItemValue1 = (ArrayWritable)mapItemValues[0];
ArrayWritable mapItemValue2 = (ArrayWritable)mapItemValues[1];
Assert.assertEquals("test value for field: tags", mapItemValue1.get()[0].toString(), "mapItem1");
Assert.assertEquals("test value for field: tags", mapItemValue2.get()[0].toString(), "mapItem2");
ArrayWritable mapItemValue1value = (ArrayWritable)mapItemValue1.get()[1];
ArrayWritable mapItemValue2value = (ArrayWritable)mapItemValue2.get()[1];
Assert.assertEquals("test value for field: tags", mapItemValue1value.get().length, 2);
Assert.assertEquals("test value for field: tags", mapItemValue2value.get().length, 2);
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item1", mapItemValue1value.get()[0].toString(), "item" + currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item1", mapItemValue2value.get()[0].toString(), "item2" + currentRecordNo);
Assert.assertEquals("test value for field: tags[\"mapItem1\"].item2", mapItemValue1value.get()[1].toString(), "item" + currentRecordNo + recordCommitTimeSuffix);
Assert.assertEquals("test value for field: tags[\"mapItem2\"].item2", mapItemValue2value.get()[1].toString(), "item2" + currentRecordNo + recordCommitTimeSuffix);
// Assert type RECORD
ArrayWritable recordItem = (ArrayWritable)values[13];
Writable[] nestedRecord = recordItem.get();
Assert.assertEquals("test value for field: testNestedRecord.isAdmin", ((BooleanWritable)nestedRecord[0]).get(), false);
Assert.assertEquals("test value for field: testNestedRecord.userId", nestedRecord[1].toString(), "UserId" + currentRecordNo + recordCommitTimeSuffix);
// Assert type ARRAY
ArrayWritable arrayValue = (ArrayWritable)values[14];
Writable[] arrayValues = arrayValue.get();
for (int i = 0; i < arrayValues.length; i++) {
Assert.assertEquals("test value for field: stringArray", arrayValues[i].toString(), "stringArray" + i + recordCommitTimeSuffix);
}
}
}
}