diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java index 399abfbb0..7f0dc94e8 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/SchemaTestUtil.java @@ -18,7 +18,6 @@ package com.uber.hoodie.common.util; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.exception.HoodieIOException; -import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -30,6 +29,7 @@ import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -39,6 +39,11 @@ public class SchemaTestUtil { .parse(SchemaTestUtil.class.getResourceAsStream("/simple-test.avro")); } + public static Schema getEvolvedSchema() throws IOException { + return new Schema.Parser() + .parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved.avro")); + } + public static List generateTestRecords(int from, int limit) throws IOException, URISyntaxException { return toRecords(getSimpleSchema(), getSimpleSchema(), from, limit); diff --git a/hoodie-common/src/test/resources/simple-test-evolved.avro b/hoodie-common/src/test/resources/simple-test-evolved.avro new file mode 100644 index 000000000..32a460d60 --- /dev/null +++ b/hoodie-common/src/test/resources/simple-test-evolved.avro @@ -0,0 +1,12 @@ +{ +"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "field1", "type": ["null", "string"], "default": null}, + {"name": "field2", "type": ["null", "string"], "default": null}, + {"name": "name", "type": ["null", "string"], "default": null}, + {"name": "favorite_number", "type": ["null", "long"], "default": null}, + {"name": "favorite_color", "type": ["null", "string"], "default": null} + ] +} \ No newline at end of file diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index 312720441..cbd485461 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -18,24 +18,20 @@ package com.uber.hoodie.hadoop.realtime; +import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline; +import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ParquetUtils; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; - import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericArray; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.AvroFSInput; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.io.ArrayWritable; @@ -56,6 +52,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeMap; @@ -76,7 +73,6 @@ public class HoodieRealtimeRecordReader implements RecordReader deltaRecordMap; private final MessageType baseFileSchema; - public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job, RecordReader realReader) { @@ -106,29 +102,25 @@ public class HoodieRealtimeRecordReader implements RecordReader datumReader = new GenericDatumReader<>(writerSchema, readerSchema); - final AvroFSInput input = new AvroFSInput(FileContext.getFileContext(jobConf), new Path(logFilePath)); - DataFileReader reader = (DataFileReader) DataFileReader.openReader(input, datumReader); - while (reader.hasNext()) { - GenericRecord rec = reader.next(); - String key = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - String commitTime = rec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); - if (HoodieTimeline.compareTimestamps(commitTime, split.getMaxCommitTime(), HoodieTimeline.GREATER)) { - // stop reading this log file. we hit a record later than max known commit time. - break; - } - // we assume, a later safe record in the log, is newer than what we have in the map & replace it. - ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema); - deltaRecordMap.put(key, aWritable); - if (LOG.isDebugEnabled()) { - LOG.debug("Log record : " + arrayWritableToString(aWritable)); - } + HoodieCompactedLogRecordScanner compactedLogRecordScanner = + new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(), readerSchema); + Iterator> itr = compactedLogRecordScanner.iterator(); + + // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit + // but can return records for completed commits > the commit we are trying to read (if using readCommit() API) + while(itr.hasNext()) { + HoodieRecord hoodieRecord = itr.next(); + GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema).get(); + String key = hoodieRecord.getRecordKey(); + // we assume, a later safe record in the log, is newer than what we have in the map & replace it. + ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema); + deltaRecordMap.put(key, aWritable); + if (LOG.isDebugEnabled()) { + LOG.debug("Log record : " + arrayWritableToString(aWritable)); } - reader.close(); } } @@ -146,7 +138,6 @@ public class HoodieRealtimeRecordReader implements RecordReader generateAvroRecords(Schema schema, int numberOfRecords, String commitTime) { + private static Iterable generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) { List records = new ArrayList<>(numberOfRecords); for(int i=0;i 0) { // update this record record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, newCommit); diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java new file mode 100644 index 000000000..d19f0e60b --- /dev/null +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.uber.hoodie.hadoop.realtime; + + +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.model.HoodieTestUtils; +import com.uber.hoodie.common.table.log.HoodieLogFile; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.SchemaTestUtil; +import com.uber.hoodie.hadoop.InputFormatTestUtil; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; + +public class HoodieRealtimeRecordReaderTest { + + private JobConf jobConf; + + @Before + public void setUp() { + jobConf = new JobConf(); + } + + @Rule + public TemporaryFolder basePath = new TemporaryFolder(); + + private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId, + String baseCommit, String newCommit, int numberOfRecords) throws InterruptedException,IOException { + HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId) + .overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build(); + List records = new ArrayList<>(); + for(int i=0; i < numberOfRecords; i++) { + records.add(InputFormatTestUtil.generateAvroRecord(schema, i, newCommit, "fileid0")); + } + Schema writeSchema = records.get(0).getSchema(); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema); + writer = writer.appendBlock(dataBlock); + long size = writer.getCurrentSize(); + return writer; + } + + @Test + public void testReader() throws Exception { + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.initTableType(basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); + String commitTime = "100"; + File partitionDir = InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, commitTime); + InputFormatTestUtil.commit(basePath, commitTime); + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + // update files or generate new log file + String newCommitTime = "101"; + HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, 100); + long size = writer.getCurrentSize(); + writer.close(); + assertTrue("block - size should be > 0", size > 0); + + //create a split with baseFile (parquet file written earlier) and new log file(s) + String logFilePath = writer.getLogFile().getPath().toString(); + HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir + + "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime); + + //create a RecordReader to be used by HoodieRealtimeRecordReader + RecordReader reader = + new MapredParquetInputFormat(). + getRecordReader(new FileSplit(split.getPath(), 0, + FSUtils.getFs().getLength(split.getPath()), (String[]) null), jobConf, null); + JobConf jobConf = new JobConf(); + List fields = schema.getFields(); + String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); + String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + jobConf.set("partition_columns", "datestr"); + + //validate record reader compaction + HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); + + //use reader to read base Parquet File and log file, merge in flight and return latest commit + //here all 100 records should be updated, see above + Void key = recordReader.createKey(); + ArrayWritable value = recordReader.createValue(); + while(recordReader.next(key, value)) { + Writable[] values = value.get(); + //check if the record written is with latest commit, here "101" + Assert.assertEquals(values[0].toString(), newCommitTime); + key = recordReader.createKey(); + value = recordReader.createValue(); + } + } + +} diff --git a/hoodie-hadoop-mr/src/test/resources/sample1.avro b/hoodie-hadoop-mr/src/test/resources/sample1.avro index 20455fa5a..6d8e9010f 100644 --- a/hoodie-hadoop-mr/src/test/resources/sample1.avro +++ b/hoodie-hadoop-mr/src/test/resources/sample1.avro @@ -14,4 +14,4 @@ "name" : "_hoodie_commit_seqno", "type" : "string" }] -} +} \ No newline at end of file