1
0

[HUDI-3516] Implement record iterator for HoodieDataBlock (#4909)

*  Use iterator to void eager materialization to be memory friendly
This commit is contained in:
Bo Cui
2022-03-02 10:19:36 +08:00
committed by GitHub
parent a81a6326d5
commit 3fdc9332e5
14 changed files with 452 additions and 217 deletions

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
@@ -80,8 +81,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
// read the avro blocks // read the avro blocks
while (reader.hasNext()) { while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> records = blk.getRecords(); blk.getRecordItr().forEachRemaining(readRecords::add);
readRecords.addAll(records);
} }
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r) List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION) .filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
@@ -155,8 +155,9 @@ public class ArchivedCommitsCommand implements CommandMarker {
// read the avro blocks // read the avro blocks
while (reader.hasNext()) { while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> records = blk.getRecords(); try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
readRecords.addAll(records); recordItr.forEachRemaining(readRecords::add);
}
} }
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r) List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.map(r -> readCommit(r, skipMetadata)).collect(Collectors.toList()); .map(r -> readCommit(r, skipMetadata)).collect(Collectors.toList());

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
@@ -123,12 +124,13 @@ public class ExportCommand implements CommandMarker {
// read the avro blocks // read the avro blocks
while (reader.hasNext() && copyCount < limit) { while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
for (IndexedRecord ir : blk.getRecords()) { try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
while (recordItr.hasNext()) {
IndexedRecord ir = recordItr.next();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
// metadata record from the entry and convert it to json. // metadata record from the entry and convert it to json.
HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get() HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
.deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir); .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
final String action = archiveEntryRecord.get("actionType").toString(); final String action = archiveEntryRecord.get("actionType").toString();
if (!actionSet.contains(action)) { if (!actionSet.contains(action)) {
continue; continue;
@@ -164,6 +166,7 @@ public class ExportCommand implements CommandMarker {
} }
} }
} }
}
reader.close(); reader.close();
} }

View File

@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieMemoryConfig;
@@ -60,6 +61,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import scala.Tuple2; import scala.Tuple2;
@@ -100,7 +102,7 @@ public class HoodieLogFileCommand implements CommandMarker {
while (reader.hasNext()) { while (reader.hasNext()) {
HoodieLogBlock n = reader.next(); HoodieLogBlock n = reader.next();
String instantTime; String instantTime;
int recordCount = 0; AtomicInteger recordCount = new AtomicInteger(0);
if (n instanceof HoodieCorruptBlock) { if (n instanceof HoodieCorruptBlock) {
try { try {
instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
@@ -120,17 +122,19 @@ public class HoodieLogFileCommand implements CommandMarker {
instantTime = "dummy_instant_time_" + dummyInstantTimeCount; instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
} }
if (n instanceof HoodieDataBlock) { if (n instanceof HoodieDataBlock) {
recordCount = ((HoodieDataBlock) n).getRecords().size(); try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordItr()) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
} }
} }
if (commitCountAndMetadata.containsKey(instantTime)) { if (commitCountAndMetadata.containsKey(instantTime)) {
commitCountAndMetadata.get(instantTime).add( commitCountAndMetadata.get(instantTime).add(
new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get()));
} else { } else {
List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>> list = List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>> list =
new ArrayList<>(); new ArrayList<>();
list.add( list.add(
new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get()));
commitCountAndMetadata.put(instantTime, list); commitCountAndMetadata.put(instantTime, list);
} }
} }
@@ -232,11 +236,12 @@ public class HoodieLogFileCommand implements CommandMarker {
HoodieLogBlock n = reader.next(); HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) { if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n; HoodieDataBlock blk = (HoodieDataBlock) n;
List<IndexedRecord> records = blk.getRecords(); try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
for (IndexedRecord record : records) { recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) { if (allRecords.size() < limit) {
allRecords.add(record); allRecords.add(record);
} }
});
} }
} }
} }

View File

@@ -321,8 +321,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
// Read the avro blocks // Read the avro blocks
while (reader.hasNext()) { while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> recordsPerFile = blk.getRecords(); blk.getRecordItr().forEachRemaining(records::add);
records.addAll(recordsPerFile);
if (records.size() >= this.config.getCommitArchivalBatchSize()) { if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records); writeToFile(wrapperSchema, records);
} }

View File

@@ -62,6 +62,7 @@ import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -694,7 +695,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
while (logFileReader.hasNext()) { while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next(); HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) { if (logBlock instanceof HoodieDataBlock) {
for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord; final GenericRecord record = (GenericRecord) indexRecord;
if (enableMetaFields) { if (enableMetaFields) {
// Metadata table records should have meta fields! // Metadata table records should have meta fields!
@@ -711,6 +713,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
if (enableMetaFields) { if (enableMetaFields) {
assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)))); assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
} }
});
} }
} }
} }

View File

@@ -40,6 +40,7 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.io.storage.HoodieHFileReader;
@@ -292,14 +293,14 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
while (logFileReader.hasNext()) { while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next(); HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) { if (logBlock instanceof HoodieDataBlock) {
for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
recordItr.forEachRemaining(indexRecord -> {
final GenericRecord record = (GenericRecord) indexRecord; final GenericRecord record = (GenericRecord) indexRecord;
// Metadata table records should not have meta fields!
assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
assertFalse(key.isEmpty()); assertFalse(key.isEmpty());
});
} }
} }
} }

View File

@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
@@ -50,8 +51,8 @@ import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Deque; import java.util.Deque;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@@ -359,17 +360,13 @@ public abstract class AbstractHoodieLogRecordReader {
* handle it. * handle it.
*/ */
private void processDataBlock(HoodieDataBlock dataBlock, Option<List<String>> keys) throws Exception { private void processDataBlock(HoodieDataBlock dataBlock, Option<List<String>> keys) throws Exception {
// TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use that here try (ClosableIterator<IndexedRecord> recordItr = dataBlock.getRecordItr(keys.orElse(Collections.emptyList()))) {
List<IndexedRecord> recs = new ArrayList<>(); while (recordItr.hasNext()) {
if (!keys.isPresent()) { IndexedRecord record = recordItr.next();
recs = dataBlock.getRecords(); processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
} else {
recs = dataBlock.getRecords(keys.get());
}
totalLogRecords.addAndGet(recs.size());
for (IndexedRecord rec : recs) {
processNextRecord(createHoodieRecord(rec, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName)); this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
totalLogRecords.incrementAndGet();
}
} }
} }

View File

@@ -31,6 +31,7 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.fs.SizeAwareDataInputStream; import org.apache.hudi.common.fs.SizeAwareDataInputStream;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
@@ -60,7 +61,6 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState;
public class HoodieAvroDataBlock extends HoodieDataBlock { public class HoodieAvroDataBlock extends HoodieDataBlock {
private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>(); private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
private final ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
public HoodieAvroDataBlock(FSDataInputStream inputStream, public HoodieAvroDataBlock(FSDataInputStream inputStream,
Option<byte[]> content, Option<byte[]> content,
@@ -75,8 +75,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records,
@Nonnull Map<HeaderMetadataType, String> header, @Nonnull Map<HeaderMetadataType, String> header,
@Nonnull String keyField @Nonnull String keyField) {
) {
super(records, header, new HashMap<>(), keyField); super(records, header, new HashMap<>(), keyField);
} }
@@ -99,9 +98,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
output.writeInt(records.size()); output.writeInt(records.size());
// 3. Write the records // 3. Write the records
Iterator<IndexedRecord> itr = records.iterator(); for (IndexedRecord s : records) {
while (itr.hasNext()) {
IndexedRecord s = itr.next();
ByteArrayOutputStream temp = new ByteArrayOutputStream(); ByteArrayOutputStream temp = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get()); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get());
encoderCache.set(encoder); encoderCache.set(encoder);
@@ -120,49 +117,80 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e); throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e);
} }
} }
encoderCache.remove();
output.close(); output.close();
return baos.toByteArray(); return baos.toByteArray();
} }
// TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used // TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used
// TODO (na) - Implement a recordItr instead of recordList
@Override @Override
protected List<IndexedRecord> deserializeRecords(byte[] content) throws IOException { protected ClosableIterator<IndexedRecord> deserializeRecords(byte[] content) throws IOException {
checkState(readerSchema != null, "Reader's schema has to be non-null"); checkState(this.readerSchema != null, "Reader's schema has to be non-null");
return RecordIterator.getInstance(this, content);
}
SizeAwareDataInputStream dis = private static class RecordIterator implements ClosableIterator<IndexedRecord> {
new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content))); private byte[] content;
private final SizeAwareDataInputStream dis;
private final GenericDatumReader<IndexedRecord> reader;
private final ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
private int totalRecords = 0;
private int readRecords = 0;
private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content) throws IOException {
this.content = content;
this.dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(this.content)));
// 1. Read version for this data block // 1. Read version for this data block
int version = dis.readInt(); int version = this.dis.readInt();
HoodieAvroDataBlockVersion logBlockVersion = new HoodieAvroDataBlockVersion(version); HoodieAvroDataBlockVersion logBlockVersion = new HoodieAvroDataBlockVersion(version);
// Get schema from the header this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
// 2. Get the total records
int totalRecords = 0;
if (logBlockVersion.hasRecordCount()) { if (logBlockVersion.hasRecordCount()) {
totalRecords = dis.readInt(); this.totalRecords = this.dis.readInt();
} }
List<IndexedRecord> records = new ArrayList<>(totalRecords);
// 3. Read the content
for (int i = 0; i < totalRecords; i++) {
int recordLength = dis.readInt();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(content, dis.getNumberOfBytesRead(),
recordLength, decoderCache.get());
decoderCache.set(decoder);
IndexedRecord record = reader.read(null, decoder);
records.add(record);
dis.skipBytes(recordLength);
} }
dis.close(); public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock, byte[] content) throws IOException {
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(dataBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
return new RecordIterator(dataBlock.readerSchema, writerSchema, content);
}
return records; @Override
public void close() {
try {
this.dis.close();
this.decoderCache.remove();
this.content = null;
} catch (IOException e) {
// ignore
}
}
@Override
public boolean hasNext() {
return readRecords < totalRecords;
}
@Override
public IndexedRecord next() {
try {
int recordLength = this.dis.readInt();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(this.content, this.dis.getNumberOfBytesRead(),
recordLength, this.decoderCache.get());
this.decoderCache.set(decoder);
IndexedRecord record = this.reader.read(null, decoder);
this.dis.skipBytes(recordLength);
this.readRecords++;
return record;
} catch (IOException e) {
throw new HoodieIOException("Unable to convert bytes to record.", e);
}
}
} }
//---------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------
@@ -256,7 +284,10 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
output.writeInt(schemaContent.length); output.writeInt(schemaContent.length);
output.write(schemaContent); output.write(schemaContent);
List<IndexedRecord> records = getRecords(); List<IndexedRecord> records = new ArrayList<>();
try (ClosableIterator<IndexedRecord> recordItr = getRecordItr()) {
recordItr.forEachRemaining(records::add);
}
// 3. Write total number of records // 3. Write total number of records
output.writeInt(records.size()); output.writeInt(records.size());

View File

@@ -18,17 +18,21 @@
package org.apache.hudi.common.table.log.block; package org.apache.hudi.common.table.log.block;
import org.apache.avro.Schema; import org.apache.hudi.common.util.ClosableIterator;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.Set;
import java.util.function.Function;
import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -45,7 +49,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
// TODO rebase records/content to leverage Either to warrant // TODO rebase records/content to leverage Either to warrant
// that they are mutex (used by read/write flows respectively) // that they are mutex (used by read/write flows respectively)
private Option<List<IndexedRecord>> records; private final Option<List<IndexedRecord>> records;
/** /**
* Key field's name w/in the record's schema * Key field's name w/in the record's schema
@@ -110,19 +114,19 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
} }
/** /**
* Returns all the records contained w/in this block * Returns all the records iterator contained w/in this block.
*/ */
public final List<IndexedRecord> getRecords() { public final ClosableIterator<IndexedRecord> getRecordItr() {
if (!records.isPresent()) { if (records.isPresent()) {
return list2Iterator(records.get());
}
try { try {
// in case records are absent, read content lazily and then convert to IndexedRecords // in case records are absent, read content lazily and then convert to IndexedRecords
records = Option.of(readRecordsFromBlockPayload()); return readRecordsFromBlockPayload();
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Unable to convert content bytes to records", io); throw new HoodieIOException("Unable to convert content bytes to records", io);
} }
} }
return records.get();
}
public Schema getSchema() { public Schema getSchema() {
return readerSchema; return readerSchema;
@@ -136,7 +140,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
* @return List of IndexedRecords for the keys of interest. * @return List of IndexedRecords for the keys of interest.
* @throws IOException in case of failures encountered when reading/parsing records * @throws IOException in case of failures encountered when reading/parsing records
*/ */
public final List<IndexedRecord> getRecords(List<String> keys) throws IOException { public final ClosableIterator<IndexedRecord> getRecordItr(List<String> keys) throws IOException {
boolean fullScan = keys.isEmpty(); boolean fullScan = keys.isEmpty();
if (enablePointLookups && !fullScan) { if (enablePointLookups && !fullScan) {
return lookupRecords(keys); return lookupRecords(keys);
@@ -144,18 +148,16 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
// Otherwise, we fetch all the records and filter out all the records, but the // Otherwise, we fetch all the records and filter out all the records, but the
// ones requested // ones requested
List<IndexedRecord> allRecords = getRecords(); ClosableIterator<IndexedRecord> allRecords = getRecordItr();
if (fullScan) { if (fullScan) {
return allRecords; return allRecords;
} }
HashSet<String> keySet = new HashSet<>(keys); HashSet<String> keySet = new HashSet<>(keys);
return allRecords.stream() return FilteringIterator.getInstance(allRecords, keySet, this::getRecordKey);
.filter(record -> keySet.contains(getRecordKey(record).orElse(null)))
.collect(Collectors.toList());
} }
protected List<IndexedRecord> readRecordsFromBlockPayload() throws IOException { protected ClosableIterator<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
if (readBlockLazily && !getContent().isPresent()) { if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk // read log block contents from disk
inflate(); inflate();
@@ -169,7 +171,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
} }
} }
protected List<IndexedRecord> lookupRecords(List<String> keys) throws IOException { protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
String.format("Point lookups are not supported by this Data block type (%s)", getBlockType()) String.format("Point lookups are not supported by this Data block type (%s)", getBlockType())
); );
@@ -177,7 +179,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
protected abstract byte[] serializeRecords(List<IndexedRecord> records) throws IOException; protected abstract byte[] serializeRecords(List<IndexedRecord> records) throws IOException;
protected abstract List<IndexedRecord> deserializeRecords(byte[] content) throws IOException; protected abstract ClosableIterator<IndexedRecord> deserializeRecords(byte[] content) throws IOException;
public abstract HoodieLogBlockType getBlockType(); public abstract HoodieLogBlockType getBlockType();
@@ -190,4 +192,80 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
.map(keyField -> record.get(keyField.pos())) .map(keyField -> record.get(keyField.pos()))
.map(Object::toString); .map(Object::toString);
} }
/**
* Converts the given list to closable iterator.
*/
static <T> ClosableIterator<T> list2Iterator(List<T> list) {
Iterator<T> iterator = list.iterator();
return new ClosableIterator<T>() {
@Override
public void close() {
// ignored
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
return iterator.next();
}
};
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
/**
* A {@link ClosableIterator} that supports filtering strategy with given keys.
* User should supply the key extraction function for fetching string format keys.
*
* @param <T> the element type
*/
private static class FilteringIterator<T extends IndexedRecord> implements ClosableIterator<T> {
private final ClosableIterator<T> nested; // nested iterator
private final Set<String> keys; // the filtering keys
private final Function<T, Option<String>> keyExtract; // function to extract the key
private T next;
private FilteringIterator(ClosableIterator<T> nested, Set<String> keys, Function<T, Option<String>> keyExtract) {
this.nested = nested;
this.keys = keys;
this.keyExtract = keyExtract;
}
public static <T extends IndexedRecord> FilteringIterator<T> getInstance(
ClosableIterator<T> nested,
Set<String> keys,
Function<T, Option<String>> keyExtract) {
return new FilteringIterator<>(nested, keys, keyExtract);
}
@Override
public void close() {
this.nested.close();
}
@Override
public boolean hasNext() {
while (this.nested.hasNext()) {
this.next = this.nested.next();
if (keys.contains(keyExtract.apply(this.next).orElse(null))) {
return true;
}
}
return false;
}
@Override
public T next() {
return this.next;
}
}
} }

View File

@@ -30,16 +30,17 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils; import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem; import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator; import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -51,7 +52,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkState; import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -148,7 +148,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
} }
@Override @Override
protected List<IndexedRecord> deserializeRecords(byte[] content) throws IOException { protected ClosableIterator<IndexedRecord> deserializeRecords(byte[] content) throws IOException {
checkState(readerSchema != null, "Reader's schema has to be non-null"); checkState(readerSchema != null, "Reader's schema has to be non-null");
// Get schema from the header // Get schema from the header
@@ -156,14 +156,30 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// Read the content // Read the content
HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(content); HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(content);
List<Pair<String, IndexedRecord>> records = reader.readAllRecords(writerSchema, readerSchema); // Sets up the writer schema
reader.withSchema(writerSchema);
Iterator<IndexedRecord> recordIterator = reader.getRecordIterator(readerSchema);
return new ClosableIterator<IndexedRecord>() {
@Override
public void close() {
reader.close();
}
return records.stream().map(Pair::getSecond).collect(Collectors.toList()); @Override
public boolean hasNext() {
return recordIterator.hasNext();
}
@Override
public IndexedRecord next() {
return recordIterator.next();
}
};
} }
// TODO abstract this w/in HoodieDataBlock // TODO abstract this w/in HoodieDataBlock
@Override @Override
protected List<IndexedRecord> lookupRecords(List<String> keys) throws IOException { protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get(); HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
// NOTE: It's important to extend Hadoop configuration here to make sure configuration // NOTE: It's important to extend Hadoop configuration here to make sure configuration
@@ -180,12 +196,27 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks. // HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
Collections.sort(keys); Collections.sort(keys);
try (HoodieHFileReader<IndexedRecord> reader = final HoodieHFileReader<IndexedRecord> reader =
new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) { new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf));
// Get writer's schema from the header // Get writer's schema from the header
List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys, readerSchema); final ClosableIterator<IndexedRecord> recordIterator = reader.getRecordIterator(keys, readerSchema);
return logRecords.stream().map(Pair::getSecond).collect(Collectors.toList()); return new ClosableIterator<IndexedRecord>() {
@Override
public boolean hasNext() {
return recordIterator.hasNext();
} }
@Override
public IndexedRecord next() {
return recordIterator.next();
}
@Override
public void close() {
recordIterator.close();
reader.close();
}
};
} }
private byte[] serializeRecord(IndexedRecord record) { private byte[] serializeRecord(IndexedRecord record) {

View File

@@ -18,19 +18,21 @@
package org.apache.hudi.common.table.log.block; package org.apache.hudi.common.table.log.block;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.avro.AvroSchemaConverter;
@@ -41,11 +43,10 @@ import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile; import org.apache.parquet.io.InputFile;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -121,7 +122,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
return baos.toByteArray(); return baos.toByteArray();
} }
public static Iterator<IndexedRecord> getProjectedParquetRecordsIterator(Configuration conf, public static ClosableIterator<IndexedRecord> getProjectedParquetRecordsIterator(Configuration conf,
Schema readerSchema, Schema readerSchema,
InputFile inputFile) throws IOException { InputFile inputFile) throws IOException {
AvroReadSupport.setAvroReadSchema(conf, readerSchema); AvroReadSupport.setAvroReadSchema(conf, readerSchema);
@@ -138,7 +139,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
* requested by the caller (providing projected Reader's schema) * requested by the caller (providing projected Reader's schema)
*/ */
@Override @Override
protected List<IndexedRecord> readRecordsFromBlockPayload() throws IOException { protected ClosableIterator<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get(); HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
// NOTE: It's important to extend Hadoop configuration here to make sure configuration // NOTE: It's important to extend Hadoop configuration here to make sure configuration
@@ -152,20 +153,14 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
blockContentLoc.getContentPositionInLogFile(), blockContentLoc.getContentPositionInLogFile(),
blockContentLoc.getBlockSize()); blockContentLoc.getBlockSize());
ArrayList<IndexedRecord> records = new ArrayList<>(); return getProjectedParquetRecordsIterator(
getProjectedParquetRecordsIterator(
inlineConf, inlineConf,
readerSchema, readerSchema,
HadoopInputFile.fromPath(inlineLogFilePath, inlineConf) HadoopInputFile.fromPath(inlineLogFilePath, inlineConf));
)
.forEachRemaining(records::add);
return records;
} }
@Override @Override
protected List<IndexedRecord> deserializeRecords(byte[] content) throws IOException { protected ClosableIterator<IndexedRecord> deserializeRecords(byte[] content) {
throw new UnsupportedOperationException("Should not be invoked"); throw new UnsupportedOperationException("Should not be invoked");
} }
} }

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -54,10 +55,12 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.StreamSupport;
/** /**
* Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
@@ -248,15 +251,14 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
// TODO If we can store additional metadata in datablock, we can skip parsing records // TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block) // (such as startTime, endTime of records in the block)
List<IndexedRecord> records = blk.getRecords(); try (ClosableIterator<IndexedRecord> itr = blk.getRecordItr()) {
StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
// Filter blocks in desired time window // Filter blocks in desired time window
instantsInRange.addAll(
records.stream()
.filter(r -> commitsFilter.apply((GenericRecord) r)) .filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r, loadInstantDetails)) .map(r -> readCommit((GenericRecord) r, loadInstantDetails))
.filter(c -> filter == null || filter.isInRange(c)) .filter(c -> filter == null || filter.isInRange(c))
.collect(Collectors.toList()) .forEach(instantsInRange::add);
); }
} }
if (filter != null) { if (filter != null) {

View File

@@ -50,6 +50,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
@@ -123,6 +124,13 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
return schema; return schema;
} }
/**
* Sets up the writer schema explicitly.
*/
public void withSchema(Schema schema) {
this.schema = schema;
}
@Override @Override
public BloomFilter readBloomFilter() { public BloomFilter readBloomFilter() {
Map<byte[], byte[]> fileInfo; Map<byte[], byte[]> fileInfo;
@@ -184,7 +192,14 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
return filteredRecords; return filteredRecords;
} }
public List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { /**
* Reads all the records with given schema.
*
* <p>NOTE: This should only be used for testing,
* the records are materialized eagerly into a list and returned,
* use {@code getRecordIterator} where possible.
*/
private List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema readerSchema) {
final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
List<Pair<String, R>> recordList = new LinkedList<>(); List<Pair<String, R>> recordList = new LinkedList<>();
try { try {
@@ -203,17 +218,36 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
} }
} }
public List<Pair<String, R>> readAllRecords() throws IOException { /**
Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes()))); * Reads all the records with current schema.
*
* <p>NOTE: This should only be used for testing,
* the records are materialized eagerly into a list and returned,
* use {@code getRecordIterator} where possible.
*/
public List<Pair<String, R>> readAllRecords() {
Schema schema = getSchema();
return readAllRecords(schema, schema); return readAllRecords(schema, schema);
} }
/**
* Reads all the records with current schema and filtering keys.
*
* <p>NOTE: This should only be used for testing,
* the records are materialized eagerly into a list and returned,
* use {@code getRecordIterator} where possible.
*/
public List<Pair<String, R>> readRecords(List<String> keys) throws IOException { public List<Pair<String, R>> readRecords(List<String> keys) throws IOException {
reader.loadFileInfo(); return readRecords(keys, getSchema());
Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes())));
return readRecords(keys, schema);
} }
/**
* Reads all the records with given schema and filtering keys.
*
* <p>NOTE: This should only be used for testing,
* the records are materialized eagerly into a list and returned,
* use {@code getRecordIterator} where possible.
*/
public List<Pair<String, R>> readRecords(List<String> keys, Schema schema) throws IOException { public List<Pair<String, R>> readRecords(List<String> keys, Schema schema) throws IOException {
this.schema = schema; this.schema = schema;
reader.loadFileInfo(); reader.loadFileInfo();
@@ -227,6 +261,39 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
return records; return records;
} }
public ClosableIterator<R> getRecordIterator(List<String> keys, Schema schema) throws IOException {
this.schema = schema;
reader.loadFileInfo();
Iterator<String> iterator = keys.iterator();
return new ClosableIterator<R>() {
private R next;
@Override
public void close() {
}
@Override
public boolean hasNext() {
try {
while (iterator.hasNext()) {
Option<R> value = getRecordByKey(iterator.next(), schema);
if (value.isPresent()) {
next = value.get();
return true;
}
}
return false;
} catch (IOException e) {
throw new HoodieIOException("unable to read next record from hfile ", e);
}
}
@Override
public R next() {
return next;
}
};
}
@Override @Override
public Iterator getRecordIterator(Schema readerSchema) throws IOException { public Iterator getRecordIterator(Schema readerSchema) throws IOException {
final HFileScanner scanner = reader.getScanner(false, false); final HFileScanner scanner = reader.getScanner(false, false);

View File

@@ -47,6 +47,7 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.CorruptedLogFileException;
@@ -390,9 +391,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
HoodieLogBlock nextBlock = reader.next(); HoodieLogBlock nextBlock = reader.next();
assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next block should be a data block"); assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next block should be a data block");
HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
assertEquals(copyOfRecords.size(), recordsRead.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(copyOfRecords, dataBlockRead.getRecords(), assertEquals(copyOfRecords, recordsRead,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
reader.close(); reader.close();
} }
@@ -430,9 +432,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
HoodieLogBlock nextBlock = reader.next(); HoodieLogBlock nextBlock = reader.next();
assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next block should be a data block"); assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next block should be a data block");
HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
assertEquals(copyOfRecords.size(), recordsRead.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(copyOfRecords, dataBlockRead.getRecords(), assertEquals(copyOfRecords, recordsRead,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
int logBlockReadNum = 1; int logBlockReadNum = 1;
while (reader.hasNext()) { while (reader.hasNext()) {
@@ -514,26 +517,29 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertTrue(reader.hasNext(), "First block should be available"); assertTrue(reader.hasNext(), "First block should be available");
HoodieLogBlock nextBlock = reader.next(); HoodieLogBlock nextBlock = reader.next();
HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead1 = getRecords(dataBlockRead);
assertEquals(copyOfRecords1.size(),recordsRead1.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(copyOfRecords1, dataBlockRead.getRecords(), assertEquals(copyOfRecords1, recordsRead1,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
assertEquals(dataBlockRead.getSchema(), getSimpleSchema()); assertEquals(dataBlockRead.getSchema(), getSimpleSchema());
reader.hasNext(); reader.hasNext();
nextBlock = reader.next(); nextBlock = reader.next();
dataBlockRead = (HoodieDataBlock) nextBlock; dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords2.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead2 = getRecords(dataBlockRead);
assertEquals(copyOfRecords2.size(), recordsRead2.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(copyOfRecords2, dataBlockRead.getRecords(), assertEquals(copyOfRecords2, recordsRead2,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
reader.hasNext(); reader.hasNext();
nextBlock = reader.next(); nextBlock = reader.next();
dataBlockRead = (HoodieDataBlock) nextBlock; dataBlockRead = (HoodieDataBlock) nextBlock;
assertEquals(copyOfRecords3.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead3 = getRecords(dataBlockRead);
assertEquals(copyOfRecords3.size(), recordsRead3.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(copyOfRecords3, dataBlockRead.getRecords(), assertEquals(copyOfRecords3, recordsRead3,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
reader.close(); reader.close();
} }
@@ -1634,25 +1640,28 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
HoodieLogBlock prevBlock = reader.prev(); HoodieLogBlock prevBlock = reader.prev();
HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock; HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
assertEquals(copyOfRecords3.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead1 = getRecords(dataBlockRead);
assertEquals(copyOfRecords3.size(), recordsRead1.size(),
"Third records size should be equal to the written records size"); "Third records size should be equal to the written records size");
assertEquals(copyOfRecords3, dataBlockRead.getRecords(), assertEquals(copyOfRecords3, recordsRead1,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
assertTrue(reader.hasPrev(), "Second block should be available"); assertTrue(reader.hasPrev(), "Second block should be available");
prevBlock = reader.prev(); prevBlock = reader.prev();
dataBlockRead = (HoodieDataBlock) prevBlock; dataBlockRead = (HoodieDataBlock) prevBlock;
assertEquals(copyOfRecords2.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead2 = getRecords(dataBlockRead);
assertEquals(copyOfRecords2.size(), recordsRead2.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(copyOfRecords2, dataBlockRead.getRecords(), assertEquals(copyOfRecords2, recordsRead2,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
assertTrue(reader.hasPrev(), "First block should be available"); assertTrue(reader.hasPrev(), "First block should be available");
prevBlock = reader.prev(); prevBlock = reader.prev();
dataBlockRead = (HoodieDataBlock) prevBlock; dataBlockRead = (HoodieDataBlock) prevBlock;
assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead3 = getRecords(dataBlockRead);
assertEquals(copyOfRecords1.size(), recordsRead3.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(copyOfRecords1, dataBlockRead.getRecords(), assertEquals(copyOfRecords1, recordsRead3,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
assertFalse(reader.hasPrev()); assertFalse(reader.hasPrev());
@@ -1770,9 +1779,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertTrue(reader.hasPrev(), "First block should be available"); assertTrue(reader.hasPrev(), "First block should be available");
HoodieLogBlock prevBlock = reader.prev(); HoodieLogBlock prevBlock = reader.prev();
HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock; HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
assertEquals(copyOfRecords1.size(), recordsRead.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(copyOfRecords1, dataBlockRead.getRecords(), assertEquals(copyOfRecords1, recordsRead,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
assertFalse(reader.hasPrev()); assertFalse(reader.hasPrev());
@@ -1795,7 +1805,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
HoodieLogBlock logBlock = HoodieAvroDataBlock.getBlock(content, schema); HoodieLogBlock logBlock = HoodieAvroDataBlock.getBlock(content, schema);
assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, logBlock.getBlockType()); assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, logBlock.getBlockType());
List<IndexedRecord> readRecords = ((HoodieAvroDataBlock) logBlock).getRecords(); List<IndexedRecord> readRecords = getRecords((HoodieAvroDataBlock) logBlock);
assertEquals(readRecords.size(), recordsCopy.size()); assertEquals(readRecords.size(), recordsCopy.size());
for (int i = 0; i < recordsCopy.size(); ++i) { for (int i = 0; i < recordsCopy.size(); ++i) {
assertEquals(recordsCopy.get(i), readRecords.get(i)); assertEquals(recordsCopy.get(i), readRecords.get(i));
@@ -1804,7 +1814,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
// Reader schema is optional if it is same as write schema // Reader schema is optional if it is same as write schema
logBlock = HoodieAvroDataBlock.getBlock(content, null); logBlock = HoodieAvroDataBlock.getBlock(content, null);
assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, logBlock.getBlockType()); assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, logBlock.getBlockType());
readRecords = ((HoodieAvroDataBlock) logBlock).getRecords(); readRecords = getRecords((HoodieAvroDataBlock) logBlock);
assertEquals(readRecords.size(), recordsCopy.size()); assertEquals(readRecords.size(), recordsCopy.size());
for (int i = 0; i < recordsCopy.size(); ++i) { for (int i = 0; i < recordsCopy.size(); ++i) {
assertEquals(recordsCopy.get(i), readRecords.get(i)); assertEquals(recordsCopy.get(i), readRecords.get(i));
@@ -1861,9 +1871,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605); put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605);
}}; }};
assertEquals(projectedRecords.size(), dataBlockRead.getRecords().size(), List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
assertEquals(projectedRecords.size(), recordsRead.size(),
"Read records size should be equal to the written records size"); "Read records size should be equal to the written records size");
assertEquals(projectedRecords, dataBlockRead.getRecords(), assertEquals(projectedRecords, recordsRead,
"Both records lists should be the same. (ordering guaranteed)"); "Both records lists should be the same. (ordering guaranteed)");
assertEquals(dataBlockRead.getSchema(), projectedSchema); assertEquals(dataBlockRead.getSchema(), projectedSchema);
@@ -1900,4 +1911,15 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true) arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
); );
} }
/**
* Utility to convert the given iterator to a List.
*/
private static List<IndexedRecord> getRecords(HoodieDataBlock dataBlock) {
ClosableIterator<IndexedRecord> itr = dataBlock.getRecordItr();
List<IndexedRecord> elements = new ArrayList<>();
itr.forEachRemaining(elements::add);
return elements;
}
} }