Merge pull request #180 from n3nash/record-reader-with-compaction
updated HoodieRealtimeRecordReader to use HoodieCompactedLogRecordSca…
This commit is contained in:
@@ -18,7 +18,6 @@ package com.uber.hoodie.common.util;
|
|||||||
|
|
||||||
import com.uber.hoodie.common.model.HoodieRecord;
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
import com.uber.hoodie.exception.HoodieIOException;
|
import com.uber.hoodie.exception.HoodieIOException;
|
||||||
import java.util.UUID;
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericDatumReader;
|
import org.apache.avro.generic.GenericDatumReader;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
@@ -30,6 +29,7 @@ import java.net.URISyntaxException;
|
|||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@@ -39,6 +39,11 @@ public class SchemaTestUtil {
|
|||||||
.parse(SchemaTestUtil.class.getResourceAsStream("/simple-test.avro"));
|
.parse(SchemaTestUtil.class.getResourceAsStream("/simple-test.avro"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Schema getEvolvedSchema() throws IOException {
|
||||||
|
return new Schema.Parser()
|
||||||
|
.parse(SchemaTestUtil.class.getResourceAsStream("/simple-test-evolved.avro"));
|
||||||
|
}
|
||||||
|
|
||||||
public static List<IndexedRecord> generateTestRecords(int from, int limit)
|
public static List<IndexedRecord> generateTestRecords(int from, int limit)
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
return toRecords(getSimpleSchema(), getSimpleSchema(), from, limit);
|
return toRecords(getSimpleSchema(), getSimpleSchema(), from, limit);
|
||||||
|
|||||||
12
hoodie-common/src/test/resources/simple-test-evolved.avro
Normal file
12
hoodie-common/src/test/resources/simple-test-evolved.avro
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
{
|
||||||
|
"namespace": "example.avro",
|
||||||
|
"type": "record",
|
||||||
|
"name": "User",
|
||||||
|
"fields": [
|
||||||
|
{"name": "field1", "type": ["null", "string"], "default": null},
|
||||||
|
{"name": "field2", "type": ["null", "string"], "default": null},
|
||||||
|
{"name": "name", "type": ["null", "string"], "default": null},
|
||||||
|
{"name": "favorite_number", "type": ["null", "long"], "default": null},
|
||||||
|
{"name": "favorite_color", "type": ["null", "string"], "default": null}
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -18,24 +18,20 @@
|
|||||||
|
|
||||||
package com.uber.hoodie.hadoop.realtime;
|
package com.uber.hoodie.hadoop.realtime;
|
||||||
|
|
||||||
|
import com.uber.hoodie.common.model.HoodieAvroPayload;
|
||||||
import com.uber.hoodie.common.model.HoodieRecord;
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||||
import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline;
|
import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
|
||||||
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
import com.uber.hoodie.common.util.ParquetUtils;
|
import com.uber.hoodie.common.util.ParquetUtils;
|
||||||
import com.uber.hoodie.exception.HoodieException;
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
import com.uber.hoodie.exception.HoodieIOException;
|
import com.uber.hoodie.exception.HoodieIOException;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.file.DataFileReader;
|
|
||||||
import org.apache.avro.generic.GenericArray;
|
import org.apache.avro.generic.GenericArray;
|
||||||
import org.apache.avro.generic.GenericDatumReader;
|
|
||||||
import org.apache.avro.generic.GenericFixed;
|
import org.apache.avro.generic.GenericFixed;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.AvroFSInput;
|
|
||||||
import org.apache.hadoop.fs.FileContext;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||||
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
|
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
|
||||||
import org.apache.hadoop.io.ArrayWritable;
|
import org.apache.hadoop.io.ArrayWritable;
|
||||||
@@ -56,6 +52,7 @@ import java.io.IOException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
@@ -76,7 +73,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
|
|||||||
private final HashMap<String, ArrayWritable> deltaRecordMap;
|
private final HashMap<String, ArrayWritable> deltaRecordMap;
|
||||||
private final MessageType baseFileSchema;
|
private final MessageType baseFileSchema;
|
||||||
|
|
||||||
|
|
||||||
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split,
|
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split,
|
||||||
JobConf job,
|
JobConf job,
|
||||||
RecordReader<Void, ArrayWritable> realReader) {
|
RecordReader<Void, ArrayWritable> realReader) {
|
||||||
@@ -106,29 +102,25 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
|
|||||||
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
|
// TODO(vc): In the future, the reader schema should be updated based on log files & be able to null out fields not present before
|
||||||
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);
|
Schema readerSchema = generateProjectionSchema(writerSchema, projectionFields);
|
||||||
|
|
||||||
LOG.info(String.format("About to read logs %s for base split %s, projecting cols %s",
|
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
|
||||||
split.getDeltaFilePaths(), split.getPath(), projectionFields));
|
split.getDeltaFilePaths(), split.getPath(), projectionFields));
|
||||||
for (String logFilePath: split.getDeltaFilePaths()) {
|
|
||||||
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(writerSchema, readerSchema);
|
|
||||||
final AvroFSInput input = new AvroFSInput(FileContext.getFileContext(jobConf), new Path(logFilePath));
|
|
||||||
DataFileReader<GenericRecord> reader = (DataFileReader<GenericRecord>) DataFileReader.openReader(input, datumReader);
|
|
||||||
while (reader.hasNext()) {
|
|
||||||
GenericRecord rec = reader.next();
|
|
||||||
String key = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
|
||||||
String commitTime = rec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
|
|
||||||
if (HoodieTimeline.compareTimestamps(commitTime, split.getMaxCommitTime(), HoodieTimeline.GREATER)) {
|
|
||||||
// stop reading this log file. we hit a record later than max known commit time.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
|
HoodieCompactedLogRecordScanner compactedLogRecordScanner =
|
||||||
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
|
new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(), readerSchema);
|
||||||
deltaRecordMap.put(key, aWritable);
|
Iterator<HoodieRecord<HoodieAvroPayload>> itr = compactedLogRecordScanner.iterator();
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Log record : " + arrayWritableToString(aWritable));
|
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
|
||||||
}
|
// but can return records for completed commits > the commit we are trying to read (if using readCommit() API)
|
||||||
|
while(itr.hasNext()) {
|
||||||
|
HoodieRecord<HoodieAvroPayload> hoodieRecord = itr.next();
|
||||||
|
GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(readerSchema).get();
|
||||||
|
String key = hoodieRecord.getRecordKey();
|
||||||
|
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
|
||||||
|
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
|
||||||
|
deltaRecordMap.put(key, aWritable);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Log record : " + arrayWritableToString(aWritable));
|
||||||
}
|
}
|
||||||
reader.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,7 +138,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
|
|||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a comma separated list of field names and positions at which they appear on Hive,
|
* Given a comma separated list of field names and positions at which they appear on Hive,
|
||||||
* return a ordered list of field names, that can be passed onto storage.
|
* return a ordered list of field names, that can be passed onto storage.
|
||||||
@@ -173,8 +164,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
|
|||||||
return orderedFieldMap.values().stream().collect(Collectors.toList());
|
return orderedFieldMap.values().stream().collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate a reader schema off the provided writeSchema, to just project out
|
* Generate a reader schema off the provided writeSchema, to just project out
|
||||||
* the provided columns
|
* the provided columns
|
||||||
@@ -266,7 +255,6 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException {
|
public boolean next(Void aVoid, ArrayWritable arrayWritable) throws IOException {
|
||||||
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable with a new block of values
|
// Call the underlying parquetReader.next - which may replace the passed in ArrayWritable with a new block of values
|
||||||
|
|||||||
@@ -20,12 +20,10 @@ import com.uber.hoodie.common.model.HoodieRecord;
|
|||||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||||
import com.uber.hoodie.common.util.FSUtils;
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.GenericData;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.GenericRecordBuilder;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.parquet.avro.AvroParquetReader;
|
|
||||||
import org.apache.parquet.avro.AvroParquetWriter;
|
import org.apache.parquet.avro.AvroParquetWriter;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
@@ -99,13 +97,14 @@ public class InputFormatTestUtil {
|
|||||||
File partitionPath = basePath.newFolder("2016", "05", "01");
|
File partitionPath = basePath.newFolder("2016", "05", "01");
|
||||||
AvroParquetWriter parquetWriter;
|
AvroParquetWriter parquetWriter;
|
||||||
for (int i = 0; i < numberOfFiles; i++) {
|
for (int i = 0; i < numberOfFiles; i++) {
|
||||||
|
String fileId = FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i);
|
||||||
File dataFile =
|
File dataFile =
|
||||||
new File(partitionPath, FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i));
|
new File(partitionPath, fileId);
|
||||||
// dataFile.createNewFile();
|
// dataFile.createNewFile();
|
||||||
parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()),
|
parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()),
|
||||||
schema);
|
schema);
|
||||||
try {
|
try {
|
||||||
for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber)) {
|
for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber, fileId)) {
|
||||||
parquetWriter.write(record);
|
parquetWriter.write(record);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
@@ -116,20 +115,25 @@ public class InputFormatTestUtil {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime) {
|
private static Iterable<? extends GenericRecord> generateAvroRecords(Schema schema, int numberOfRecords, String commitTime, String fileId) {
|
||||||
List<GenericRecord> records = new ArrayList<>(numberOfRecords);
|
List<GenericRecord> records = new ArrayList<>(numberOfRecords);
|
||||||
for(int i=0;i<numberOfRecords;i++) {
|
for(int i=0;i<numberOfRecords;i++) {
|
||||||
records.add(generateAvroRecord(schema, i, commitTime));
|
records.add(generateAvroRecord(schema, i, commitTime, fileId));
|
||||||
}
|
}
|
||||||
return records;
|
return records;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static GenericRecord generateAvroRecord(Schema schema, int recordNumber,
|
public static GenericRecord generateAvroRecord(Schema schema, int recordNumber,
|
||||||
String commitTime) {
|
String commitTime, String fileId) {
|
||||||
return new GenericRecordBuilder(schema).set("field1", "field" + recordNumber)
|
GenericRecord record = new GenericData.Record(schema);
|
||||||
.set("field2", "field" + recordNumber)
|
record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime);
|
||||||
.set(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime)
|
record.put("field1", "field" + recordNumber);
|
||||||
.set(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, commitTime + "_" + recordNumber).build();
|
record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, "key_" + recordNumber);
|
||||||
|
record.put("field2", "field" + recordNumber);
|
||||||
|
record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, commitTime);
|
||||||
|
record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileId);
|
||||||
|
record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, commitTime + "_" + recordNumber);
|
||||||
|
return record;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit,
|
public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit,
|
||||||
@@ -146,7 +150,7 @@ public class InputFormatTestUtil {
|
|||||||
schema);
|
schema);
|
||||||
try {
|
try {
|
||||||
for (GenericRecord record : generateAvroRecords(schema, totalNumberOfRecords,
|
for (GenericRecord record : generateAvroRecords(schema, totalNumberOfRecords,
|
||||||
originalCommit)) {
|
originalCommit, fileId)) {
|
||||||
if (numberOfRecordsToUpdate > 0) {
|
if (numberOfRecordsToUpdate > 0) {
|
||||||
// update this record
|
// update this record
|
||||||
record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, newCommit);
|
record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, newCommit);
|
||||||
|
|||||||
@@ -0,0 +1,137 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.hadoop.realtime;
|
||||||
|
|
||||||
|
|
||||||
|
import com.uber.hoodie.common.model.HoodieTableType;
|
||||||
|
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||||
|
import com.uber.hoodie.common.table.log.HoodieLogFile;
|
||||||
|
import com.uber.hoodie.common.table.log.HoodieLogFormat;
|
||||||
|
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
|
||||||
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
|
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||||
|
import com.uber.hoodie.common.util.SchemaTestUtil;
|
||||||
|
import com.uber.hoodie.hadoop.InputFormatTestUtil;
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
|
||||||
|
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
|
||||||
|
import org.apache.hadoop.io.ArrayWritable;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.mapred.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapred.FileSplit;
|
||||||
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
|
import org.apache.hadoop.mapred.RecordReader;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class HoodieRealtimeRecordReaderTest {
|
||||||
|
|
||||||
|
private JobConf jobConf;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
jobConf = new JobConf();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder basePath = new TemporaryFolder();
|
||||||
|
|
||||||
|
private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, String fileId,
|
||||||
|
String baseCommit, String newCommit, int numberOfRecords) throws InterruptedException,IOException {
|
||||||
|
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath()))
|
||||||
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
|
||||||
|
.overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build();
|
||||||
|
List<IndexedRecord> records = new ArrayList<>();
|
||||||
|
for(int i=0; i < numberOfRecords; i++) {
|
||||||
|
records.add(InputFormatTestUtil.generateAvroRecord(schema, i, newCommit, "fileid0"));
|
||||||
|
}
|
||||||
|
Schema writeSchema = records.get(0).getSchema();
|
||||||
|
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema);
|
||||||
|
writer = writer.appendBlock(dataBlock);
|
||||||
|
long size = writer.getCurrentSize();
|
||||||
|
return writer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReader() throws Exception {
|
||||||
|
// initial commit
|
||||||
|
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
|
||||||
|
HoodieTestUtils.initTableType(basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
|
||||||
|
String commitTime = "100";
|
||||||
|
File partitionDir = InputFormatTestUtil.prepareParquetDataset(basePath, schema, 1, 100, commitTime);
|
||||||
|
InputFormatTestUtil.commit(basePath, commitTime);
|
||||||
|
// Add the paths
|
||||||
|
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
|
||||||
|
|
||||||
|
// update files or generate new log file
|
||||||
|
String newCommitTime = "101";
|
||||||
|
HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, "fileid0", commitTime, newCommitTime, 100);
|
||||||
|
long size = writer.getCurrentSize();
|
||||||
|
writer.close();
|
||||||
|
assertTrue("block - size should be > 0", size > 0);
|
||||||
|
|
||||||
|
//create a split with baseFile (parquet file written earlier) and new log file(s)
|
||||||
|
String logFilePath = writer.getLogFile().getPath().toString();
|
||||||
|
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(new FileSplit(new Path(partitionDir
|
||||||
|
+ "/fileid0_1_" + commitTime + ".parquet"),0,1,jobConf), Arrays.asList(logFilePath), newCommitTime);
|
||||||
|
|
||||||
|
//create a RecordReader to be used by HoodieRealtimeRecordReader
|
||||||
|
RecordReader<Void, ArrayWritable> reader =
|
||||||
|
new MapredParquetInputFormat().
|
||||||
|
getRecordReader(new FileSplit(split.getPath(), 0,
|
||||||
|
FSUtils.getFs().getLength(split.getPath()), (String[]) null), jobConf, null);
|
||||||
|
JobConf jobConf = new JobConf();
|
||||||
|
List<Schema.Field> fields = schema.getFields();
|
||||||
|
String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(","));
|
||||||
|
String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
|
||||||
|
jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
|
||||||
|
jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions);
|
||||||
|
jobConf.set("partition_columns", "datestr");
|
||||||
|
|
||||||
|
//validate record reader compaction
|
||||||
|
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
|
||||||
|
|
||||||
|
//use reader to read base Parquet File and log file, merge in flight and return latest commit
|
||||||
|
//here all 100 records should be updated, see above
|
||||||
|
Void key = recordReader.createKey();
|
||||||
|
ArrayWritable value = recordReader.createValue();
|
||||||
|
while(recordReader.next(key, value)) {
|
||||||
|
Writable[] values = value.get();
|
||||||
|
//check if the record written is with latest commit, here "101"
|
||||||
|
Assert.assertEquals(values[0].toString(), newCommitTime);
|
||||||
|
key = recordReader.createKey();
|
||||||
|
value = recordReader.createValue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user