From 3fdc9332e58930aa8e4fef0d0f85ffbec78c6e7b Mon Sep 17 00:00:00 2001 From: Bo Cui Date: Wed, 2 Mar 2022 10:19:36 +0800 Subject: [PATCH] [HUDI-3516] Implement record iterator for HoodieDataBlock (#4909) * Use iterator to void eager materialization to be memory friendly --- .../cli/commands/ArchivedCommitsCommand.java | 9 +- .../hudi/cli/commands/ExportCommand.java | 75 ++++++----- .../cli/commands/HoodieLogFileCommand.java | 23 ++-- .../hudi/client/HoodieTimelineArchiver.java | 3 +- .../functional/TestHoodieBackedMetadata.java | 35 ++--- .../TestHoodieBackedTableMetadata.java | 17 +-- .../log/AbstractHoodieLogRecordReader.java | 21 ++- .../table/log/block/HoodieAvroDataBlock.java | 101 +++++++++----- .../table/log/block/HoodieDataBlock.java | 124 ++++++++++++++---- .../table/log/block/HoodieHFileDataBlock.java | 55 ++++++-- .../log/block/HoodieParquetDataBlock.java | 39 +++--- .../timeline/HoodieArchivedTimeline.java | 22 ++-- .../hudi/io/storage/HoodieHFileReader.java | 79 ++++++++++- .../functional/TestHoodieLogFormat.java | 66 ++++++---- 14 files changed, 452 insertions(+), 217 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index 102fcc2ae..1747a59f4 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.avro.generic.GenericRecord; @@ -80,8 +81,7 @@ public class ArchivedCommitsCommand implements CommandMarker { // read the avro blocks while (reader.hasNext()) { HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - List records = blk.getRecords(); - readRecords.addAll(records); + blk.getRecordItr().forEachRemaining(readRecords::add); } List readCommits = readRecords.stream().map(r -> (GenericRecord) r) .filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION) @@ -155,8 +155,9 @@ public class ArchivedCommitsCommand implements CommandMarker { // read the avro blocks while (reader.hasNext()) { HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - List records = blk.getRecords(); - readRecords.addAll(records); + try (ClosableIterator recordItr = blk.getRecordItr()) { + recordItr.forEachRemaining(readRecords::add); + } } List readCommits = readRecords.stream().map(r -> (GenericRecord) r) .map(r -> readCommit(r, skipMetadata)).collect(Collectors.toList()); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java index 3e5fb8fe0..1d8d6dcd6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.exception.HoodieException; import org.apache.avro.generic.GenericRecord; @@ -123,44 +124,46 @@ public class ExportCommand implements CommandMarker { // read the avro blocks while (reader.hasNext() && copyCount < limit) { HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - for (IndexedRecord ir : blk.getRecords()) { - // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the - // metadata record from the entry and convert it to json. - HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get() - .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir); + try (ClosableIterator recordItr = blk.getRecordItr()) { + while (recordItr.hasNext()) { + IndexedRecord ir = recordItr.next(); + // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the + // metadata record from the entry and convert it to json. + HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get() + .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir); + final String action = archiveEntryRecord.get("actionType").toString(); + if (!actionSet.contains(action)) { + continue; + } - final String action = archiveEntryRecord.get("actionType").toString(); - if (!actionSet.contains(action)) { - continue; - } - - GenericRecord metadata = null; - switch (action) { - case HoodieTimeline.CLEAN_ACTION: - metadata = archiveEntryRecord.getHoodieCleanMetadata(); + GenericRecord metadata = null; + switch (action) { + case HoodieTimeline.CLEAN_ACTION: + metadata = archiveEntryRecord.getHoodieCleanMetadata(); + break; + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.DELTA_COMMIT_ACTION: + metadata = archiveEntryRecord.getHoodieCommitMetadata(); + break; + case HoodieTimeline.ROLLBACK_ACTION: + metadata = archiveEntryRecord.getHoodieRollbackMetadata(); + break; + case HoodieTimeline.SAVEPOINT_ACTION: + metadata = archiveEntryRecord.getHoodieSavePointMetadata(); + break; + case HoodieTimeline.COMPACTION_ACTION: + metadata = archiveEntryRecord.getHoodieCompactionMetadata(); + break; + default: + throw new HoodieException("Unknown type of action " + action); + } + + final String instantTime = archiveEntryRecord.get("commitTime").toString(); + final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action; + writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true)); + if (++copyCount == limit) { break; - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.DELTA_COMMIT_ACTION: - metadata = archiveEntryRecord.getHoodieCommitMetadata(); - break; - case HoodieTimeline.ROLLBACK_ACTION: - metadata = archiveEntryRecord.getHoodieRollbackMetadata(); - break; - case HoodieTimeline.SAVEPOINT_ACTION: - metadata = archiveEntryRecord.getHoodieSavePointMetadata(); - break; - case HoodieTimeline.COMPACTION_ACTION: - metadata = archiveEntryRecord.getHoodieCompactionMetadata(); - break; - default: - throw new HoodieException("Unknown type of action " + action); - } - - final String instantTime = archiveEntryRecord.get("commitTime").toString(); - final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action; - writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true)); - if (++copyCount == limit) { - break; + } } } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 27bcd81fa..4a56858f3 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; @@ -60,6 +61,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import scala.Tuple2; @@ -100,7 +102,7 @@ public class HoodieLogFileCommand implements CommandMarker { while (reader.hasNext()) { HoodieLogBlock n = reader.next(); String instantTime; - int recordCount = 0; + AtomicInteger recordCount = new AtomicInteger(0); if (n instanceof HoodieCorruptBlock) { try { instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME); @@ -120,17 +122,19 @@ public class HoodieLogFileCommand implements CommandMarker { instantTime = "dummy_instant_time_" + dummyInstantTimeCount; } if (n instanceof HoodieDataBlock) { - recordCount = ((HoodieDataBlock) n).getRecords().size(); + try (ClosableIterator recordItr = ((HoodieDataBlock) n).getRecordItr()) { + recordItr.forEachRemaining(r -> recordCount.incrementAndGet()); + } } } if (commitCountAndMetadata.containsKey(instantTime)) { commitCountAndMetadata.get(instantTime).add( - new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); + new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); } else { List, Map>, Integer>> list = new ArrayList<>(); list.add( - new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); + new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get())); commitCountAndMetadata.put(instantTime, list); } } @@ -232,11 +236,12 @@ public class HoodieLogFileCommand implements CommandMarker { HoodieLogBlock n = reader.next(); if (n instanceof HoodieDataBlock) { HoodieDataBlock blk = (HoodieDataBlock) n; - List records = blk.getRecords(); - for (IndexedRecord record : records) { - if (allRecords.size() < limit) { - allRecords.add(record); - } + try (ClosableIterator recordItr = blk.getRecordItr()) { + recordItr.forEachRemaining(record -> { + if (allRecords.size() < limit) { + allRecords.add(record); + } + }); } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index a826cfa08..15401c029 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -321,8 +321,7 @@ public class HoodieTimelineArchiver { // Read the avro blocks while (reader.hasNext()) { HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - List recordsPerFile = blk.getRecords(); - records.addAll(recordsPerFile); + blk.getRecordItr().forEachRemaining(records::add); if (records.size() >= this.config.getCommitArchivalBatchSize()) { writeToFile(wrapperSchema, records); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 37595f6f7..223625fe7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -62,6 +62,7 @@ import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; @@ -694,23 +695,25 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { while (logFileReader.hasNext()) { HoodieLogBlock logBlock = logFileReader.next(); if (logBlock instanceof HoodieDataBlock) { - for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { - final GenericRecord record = (GenericRecord) indexRecord; - if (enableMetaFields) { - // Metadata table records should have meta fields! - assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); - assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); - } else { - // Metadata table records should not have meta fields! - assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); - assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); - } + try (ClosableIterator recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) { + recordItr.forEachRemaining(indexRecord -> { + final GenericRecord record = (GenericRecord) indexRecord; + if (enableMetaFields) { + // Metadata table records should have meta fields! + assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } else { + // Metadata table records should not have meta fields! + assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } - final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); - assertFalse(key.isEmpty()); - if (enableMetaFields) { - assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)))); - } + final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); + assertFalse(key.isEmpty()); + if (enableMetaFields) { + assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)))); + } + }); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 48c2e19b4..70f54b111 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieHFileReader; @@ -292,14 +293,14 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { while (logFileReader.hasNext()) { HoodieLogBlock logBlock = logFileReader.next(); if (logBlock instanceof HoodieDataBlock) { - for (IndexedRecord indexRecord : ((HoodieDataBlock) logBlock).getRecords()) { - final GenericRecord record = (GenericRecord) indexRecord; - // Metadata table records should not have meta fields! - assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); - assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); - - final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); - assertFalse(key.isEmpty()); + try (ClosableIterator recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) { + recordItr.forEachRemaining(indexRecord -> { + final GenericRecord record = (GenericRecord) indexRecord; + assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD)); + assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME)); + assertFalse(key.isEmpty()); + }); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 2b9761176..fa5117e41 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -50,8 +51,8 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Deque; import java.util.HashSet; import java.util.List; @@ -359,17 +360,13 @@ public abstract class AbstractHoodieLogRecordReader { * handle it. */ private void processDataBlock(HoodieDataBlock dataBlock, Option> keys) throws Exception { - // TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use that here - List recs = new ArrayList<>(); - if (!keys.isPresent()) { - recs = dataBlock.getRecords(); - } else { - recs = dataBlock.getRecords(keys.get()); - } - totalLogRecords.addAndGet(recs.size()); - for (IndexedRecord rec : recs) { - processNextRecord(createHoodieRecord(rec, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN, - this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName)); + try (ClosableIterator recordItr = dataBlock.getRecordItr(keys.orElse(Collections.emptyList()))) { + while (recordItr.hasNext()) { + IndexedRecord record = recordItr.next(); + processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN, + this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName)); + totalLogRecords.incrementAndGet(); + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index a79410ec8..e7f183faf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -31,6 +31,7 @@ import org.apache.avro.io.EncoderFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hudi.common.fs.SizeAwareDataInputStream; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; @@ -60,7 +61,6 @@ import static org.apache.hudi.common.util.ValidationUtils.checkState; public class HoodieAvroDataBlock extends HoodieDataBlock { private final ThreadLocal encoderCache = new ThreadLocal<>(); - private final ThreadLocal decoderCache = new ThreadLocal<>(); public HoodieAvroDataBlock(FSDataInputStream inputStream, Option content, @@ -75,8 +75,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { public HoodieAvroDataBlock(@Nonnull List records, @Nonnull Map header, - @Nonnull String keyField - ) { + @Nonnull String keyField) { super(records, header, new HashMap<>(), keyField); } @@ -99,9 +98,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { output.writeInt(records.size()); // 3. Write the records - Iterator itr = records.iterator(); - while (itr.hasNext()) { - IndexedRecord s = itr.next(); + for (IndexedRecord s : records) { ByteArrayOutputStream temp = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get()); encoderCache.set(encoder); @@ -120,49 +117,80 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e); } } + encoderCache.remove(); output.close(); return baos.toByteArray(); } // TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used - // TODO (na) - Implement a recordItr instead of recordList @Override - protected List deserializeRecords(byte[] content) throws IOException { - checkState(readerSchema != null, "Reader's schema has to be non-null"); + protected ClosableIterator deserializeRecords(byte[] content) throws IOException { + checkState(this.readerSchema != null, "Reader's schema has to be non-null"); + return RecordIterator.getInstance(this, content); + } - SizeAwareDataInputStream dis = - new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content))); + private static class RecordIterator implements ClosableIterator { + private byte[] content; + private final SizeAwareDataInputStream dis; + private final GenericDatumReader reader; + private final ThreadLocal decoderCache = new ThreadLocal<>(); - // 1. Read version for this data block - int version = dis.readInt(); - HoodieAvroDataBlockVersion logBlockVersion = new HoodieAvroDataBlockVersion(version); + private int totalRecords = 0; + private int readRecords = 0; - // Get schema from the header - Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content) throws IOException { + this.content = content; - GenericDatumReader reader = new GenericDatumReader<>(writerSchema, readerSchema); + this.dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(this.content))); - // 2. Get the total records - int totalRecords = 0; - if (logBlockVersion.hasRecordCount()) { - totalRecords = dis.readInt(); - } - List records = new ArrayList<>(totalRecords); + // 1. Read version for this data block + int version = this.dis.readInt(); + HoodieAvroDataBlockVersion logBlockVersion = new HoodieAvroDataBlockVersion(version); - // 3. Read the content - for (int i = 0; i < totalRecords; i++) { - int recordLength = dis.readInt(); - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(content, dis.getNumberOfBytesRead(), - recordLength, decoderCache.get()); - decoderCache.set(decoder); - IndexedRecord record = reader.read(null, decoder); - records.add(record); - dis.skipBytes(recordLength); + this.reader = new GenericDatumReader<>(writerSchema, readerSchema); + + if (logBlockVersion.hasRecordCount()) { + this.totalRecords = this.dis.readInt(); + } } - dis.close(); + public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock, byte[] content) throws IOException { + // Get schema from the header + Schema writerSchema = new Schema.Parser().parse(dataBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + return new RecordIterator(dataBlock.readerSchema, writerSchema, content); + } - return records; + @Override + public void close() { + try { + this.dis.close(); + this.decoderCache.remove(); + this.content = null; + } catch (IOException e) { + // ignore + } + } + + @Override + public boolean hasNext() { + return readRecords < totalRecords; + } + + @Override + public IndexedRecord next() { + try { + int recordLength = this.dis.readInt(); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(this.content, this.dis.getNumberOfBytesRead(), + recordLength, this.decoderCache.get()); + this.decoderCache.set(decoder); + IndexedRecord record = this.reader.read(null, decoder); + this.dis.skipBytes(recordLength); + this.readRecords++; + return record; + } catch (IOException e) { + throw new HoodieIOException("Unable to convert bytes to record.", e); + } + } } //---------------------------------------------------------------------------------------- @@ -256,7 +284,10 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { output.writeInt(schemaContent.length); output.write(schemaContent); - List records = getRecords(); + List records = new ArrayList<>(); + try (ClosableIterator recordItr = getRecordItr()) { + recordItr.forEachRemaining(records::add); + } // 3. Write total number of records output.writeInt(records.size()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index afae31b77..846b8d36a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -18,17 +18,21 @@ package org.apache.hudi.common.table.log.block; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FSDataInputStream; + import java.io.IOException; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Set; +import java.util.function.Function; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -45,7 +49,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { // TODO rebase records/content to leverage Either to warrant // that they are mutex (used by read/write flows respectively) - private Option> records; + private final Option> records; /** * Key field's name w/in the record's schema @@ -110,18 +114,18 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { } /** - * Returns all the records contained w/in this block + * Returns all the records iterator contained w/in this block. */ - public final List getRecords() { - if (!records.isPresent()) { - try { - // in case records are absent, read content lazily and then convert to IndexedRecords - records = Option.of(readRecordsFromBlockPayload()); - } catch (IOException io) { - throw new HoodieIOException("Unable to convert content bytes to records", io); - } + public final ClosableIterator getRecordItr() { + if (records.isPresent()) { + return list2Iterator(records.get()); + } + try { + // in case records are absent, read content lazily and then convert to IndexedRecords + return readRecordsFromBlockPayload(); + } catch (IOException io) { + throw new HoodieIOException("Unable to convert content bytes to records", io); } - return records.get(); } public Schema getSchema() { @@ -136,7 +140,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { * @return List of IndexedRecords for the keys of interest. * @throws IOException in case of failures encountered when reading/parsing records */ - public final List getRecords(List keys) throws IOException { + public final ClosableIterator getRecordItr(List keys) throws IOException { boolean fullScan = keys.isEmpty(); if (enablePointLookups && !fullScan) { return lookupRecords(keys); @@ -144,18 +148,16 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { // Otherwise, we fetch all the records and filter out all the records, but the // ones requested - List allRecords = getRecords(); + ClosableIterator allRecords = getRecordItr(); if (fullScan) { return allRecords; } HashSet keySet = new HashSet<>(keys); - return allRecords.stream() - .filter(record -> keySet.contains(getRecordKey(record).orElse(null))) - .collect(Collectors.toList()); + return FilteringIterator.getInstance(allRecords, keySet, this::getRecordKey); } - protected List readRecordsFromBlockPayload() throws IOException { + protected ClosableIterator readRecordsFromBlockPayload() throws IOException { if (readBlockLazily && !getContent().isPresent()) { // read log block contents from disk inflate(); @@ -169,7 +171,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { } } - protected List lookupRecords(List keys) throws IOException { + protected ClosableIterator lookupRecords(List keys) throws IOException { throw new UnsupportedOperationException( String.format("Point lookups are not supported by this Data block type (%s)", getBlockType()) ); @@ -177,7 +179,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { protected abstract byte[] serializeRecords(List records) throws IOException; - protected abstract List deserializeRecords(byte[] content) throws IOException; + protected abstract ClosableIterator deserializeRecords(byte[] content) throws IOException; public abstract HoodieLogBlockType getBlockType(); @@ -190,4 +192,80 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { .map(keyField -> record.get(keyField.pos())) .map(Object::toString); } + + /** + * Converts the given list to closable iterator. + */ + static ClosableIterator list2Iterator(List list) { + Iterator iterator = list.iterator(); + return new ClosableIterator() { + @Override + public void close() { + // ignored + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + }; + } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * A {@link ClosableIterator} that supports filtering strategy with given keys. + * User should supply the key extraction function for fetching string format keys. + * + * @param the element type + */ + private static class FilteringIterator implements ClosableIterator { + private final ClosableIterator nested; // nested iterator + + private final Set keys; // the filtering keys + private final Function> keyExtract; // function to extract the key + + private T next; + + private FilteringIterator(ClosableIterator nested, Set keys, Function> keyExtract) { + this.nested = nested; + this.keys = keys; + this.keyExtract = keyExtract; + } + + public static FilteringIterator getInstance( + ClosableIterator nested, + Set keys, + Function> keyExtract) { + return new FilteringIterator<>(nested, keys, keyExtract); + } + + @Override + public void close() { + this.nested.close(); + } + + @Override + public boolean hasNext() { + while (this.nested.hasNext()) { + this.next = this.nested.next(); + if (keys.contains(keyExtract.apply(this.next).orElse(null))) { + return true; + } + } + return false; + } + + @Override + public T next() { + return this.next; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 897713474..557a0db7c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -30,16 +30,17 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.inline.InLineFSUtils; import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieHBaseKVComparator; import org.apache.hudi.io.storage.HoodieHFileReader; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -51,7 +52,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.stream.Collectors; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -148,7 +148,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { } @Override - protected List deserializeRecords(byte[] content) throws IOException { + protected ClosableIterator deserializeRecords(byte[] content) throws IOException { checkState(readerSchema != null, "Reader's schema has to be non-null"); // Get schema from the header @@ -156,14 +156,30 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { // Read the content HoodieHFileReader reader = new HoodieHFileReader<>(content); - List> records = reader.readAllRecords(writerSchema, readerSchema); + // Sets up the writer schema + reader.withSchema(writerSchema); + Iterator recordIterator = reader.getRecordIterator(readerSchema); + return new ClosableIterator() { + @Override + public void close() { + reader.close(); + } - return records.stream().map(Pair::getSecond).collect(Collectors.toList()); + @Override + public boolean hasNext() { + return recordIterator.hasNext(); + } + + @Override + public IndexedRecord next() { + return recordIterator.next(); + } + }; } // TODO abstract this w/in HoodieDataBlock @Override - protected List lookupRecords(List keys) throws IOException { + protected ClosableIterator lookupRecords(List keys) throws IOException { HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get(); // NOTE: It's important to extend Hadoop configuration here to make sure configuration @@ -180,12 +196,27 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { // HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks. Collections.sort(keys); - try (HoodieHFileReader reader = - new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) { - // Get writer's schema from the header - List> logRecords = reader.readRecords(keys, readerSchema); - return logRecords.stream().map(Pair::getSecond).collect(Collectors.toList()); - } + final HoodieHFileReader reader = + new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf)); + // Get writer's schema from the header + final ClosableIterator recordIterator = reader.getRecordIterator(keys, readerSchema); + return new ClosableIterator() { + @Override + public boolean hasNext() { + return recordIterator.hasNext(); + } + + @Override + public IndexedRecord next() { + return recordIterator.next(); + } + + @Override + public void close() { + recordIterator.close(); + reader.close(); + } + }; } private byte[] serializeRecord(IndexedRecord record) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index d5956863f..5e7bef90a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -18,19 +18,21 @@ package org.apache.hudi.common.table.log.block; +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetReaderIterator; +import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetStreamWriter; + import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.fs.inline.InLineFSUtils; -import org.apache.hudi.common.fs.inline.InLineFileSystem; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetReaderIterator; -import org.apache.hudi.io.storage.HoodieAvroParquetConfig; -import org.apache.hudi.io.storage.HoodieParquetStreamWriter; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroSchemaConverter; @@ -41,11 +43,10 @@ import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.InputFile; import javax.annotation.Nonnull; + import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -121,9 +122,9 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { return baos.toByteArray(); } - public static Iterator getProjectedParquetRecordsIterator(Configuration conf, - Schema readerSchema, - InputFile inputFile) throws IOException { + public static ClosableIterator getProjectedParquetRecordsIterator(Configuration conf, + Schema readerSchema, + InputFile inputFile) throws IOException { AvroReadSupport.setAvroReadSchema(conf, readerSchema); AvroReadSupport.setRequestedProjection(conf, readerSchema); @@ -138,7 +139,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { * requested by the caller (providing projected Reader's schema) */ @Override - protected List readRecordsFromBlockPayload() throws IOException { + protected ClosableIterator readRecordsFromBlockPayload() throws IOException { HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get(); // NOTE: It's important to extend Hadoop configuration here to make sure configuration @@ -152,20 +153,14 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { blockContentLoc.getContentPositionInLogFile(), blockContentLoc.getBlockSize()); - ArrayList records = new ArrayList<>(); - - getProjectedParquetRecordsIterator( + return getProjectedParquetRecordsIterator( inlineConf, readerSchema, - HadoopInputFile.fromPath(inlineLogFilePath, inlineConf) - ) - .forEachRemaining(records::add); - - return records; + HadoopInputFile.fromPath(inlineLogFilePath, inlineConf)); } @Override - protected List deserializeRecords(byte[] content) throws IOException { + protected ClosableIterator deserializeRecords(byte[] content) { throw new UnsupportedOperationException("Should not be invoked"); } } \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 29f166530..ddfe22ac9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; @@ -54,10 +55,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -248,15 +251,14 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); // TODO If we can store additional metadata in datablock, we can skip parsing records // (such as startTime, endTime of records in the block) - List records = blk.getRecords(); - // Filter blocks in desired time window - instantsInRange.addAll( - records.stream() - .filter(r -> commitsFilter.apply((GenericRecord) r)) - .map(r -> readCommit((GenericRecord) r, loadInstantDetails)) - .filter(c -> filter == null || filter.isInRange(c)) - .collect(Collectors.toList()) - ); + try (ClosableIterator itr = blk.getRecordItr()) { + StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true) + // Filter blocks in desired time window + .filter(r -> commitsFilter.apply((GenericRecord) r)) + .map(r -> readCommit((GenericRecord) r, loadInstantDetails)) + .filter(c -> filter == null || filter.isInRange(c)) + .forEach(instantsInRange::add); + } } if (filter != null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index 3404d2bd5..371da7675 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -50,6 +50,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; @@ -123,6 +124,13 @@ public class HoodieHFileReader implements HoodieFileRea return schema; } + /** + * Sets up the writer schema explicitly. + */ + public void withSchema(Schema schema) { + this.schema = schema; + } + @Override public BloomFilter readBloomFilter() { Map fileInfo; @@ -184,7 +192,14 @@ public class HoodieHFileReader implements HoodieFileRea return filteredRecords; } - public List> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { + /** + * Reads all the records with given schema. + * + *

NOTE: This should only be used for testing, + * the records are materialized eagerly into a list and returned, + * use {@code getRecordIterator} where possible. + */ + private List> readAllRecords(Schema writerSchema, Schema readerSchema) { final Option keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); List> recordList = new LinkedList<>(); try { @@ -203,17 +218,36 @@ public class HoodieHFileReader implements HoodieFileRea } } - public List> readAllRecords() throws IOException { - Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes()))); + /** + * Reads all the records with current schema. + * + *

NOTE: This should only be used for testing, + * the records are materialized eagerly into a list and returned, + * use {@code getRecordIterator} where possible. + */ + public List> readAllRecords() { + Schema schema = getSchema(); return readAllRecords(schema, schema); } + /** + * Reads all the records with current schema and filtering keys. + * + *

NOTE: This should only be used for testing, + * the records are materialized eagerly into a list and returned, + * use {@code getRecordIterator} where possible. + */ public List> readRecords(List keys) throws IOException { - reader.loadFileInfo(); - Schema schema = new Schema.Parser().parse(new String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes()))); - return readRecords(keys, schema); + return readRecords(keys, getSchema()); } + /** + * Reads all the records with given schema and filtering keys. + * + *

NOTE: This should only be used for testing, + * the records are materialized eagerly into a list and returned, + * use {@code getRecordIterator} where possible. + */ public List> readRecords(List keys, Schema schema) throws IOException { this.schema = schema; reader.loadFileInfo(); @@ -227,6 +261,39 @@ public class HoodieHFileReader implements HoodieFileRea return records; } + public ClosableIterator getRecordIterator(List keys, Schema schema) throws IOException { + this.schema = schema; + reader.loadFileInfo(); + Iterator iterator = keys.iterator(); + return new ClosableIterator() { + private R next; + @Override + public void close() { + } + + @Override + public boolean hasNext() { + try { + while (iterator.hasNext()) { + Option value = getRecordByKey(iterator.next(), schema); + if (value.isPresent()) { + next = value.get(); + return true; + } + } + return false; + } catch (IOException e) { + throw new HoodieIOException("unable to read next record from hfile ", e); + } + } + + @Override + public R next() { + return next; + } + }; + } + @Override public Iterator getRecordIterator(Schema readerSchema) throws IOException { final HFileScanner scanner = reader.getScanner(false, false); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 0a5ecc098..e9b06e6d6 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -47,6 +47,7 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.exception.CorruptedLogFileException; @@ -390,9 +391,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { HoodieLogBlock nextBlock = reader.next(); assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next block should be a data block"); HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; - assertEquals(copyOfRecords.size(), dataBlockRead.getRecords().size(), + List recordsRead = getRecords(dataBlockRead); + assertEquals(copyOfRecords.size(), recordsRead.size(), "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords, dataBlockRead.getRecords(), + assertEquals(copyOfRecords, recordsRead, "Both records lists should be the same. (ordering guaranteed)"); reader.close(); } @@ -430,9 +432,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { HoodieLogBlock nextBlock = reader.next(); assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next block should be a data block"); HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; - assertEquals(copyOfRecords.size(), dataBlockRead.getRecords().size(), + List recordsRead = getRecords(dataBlockRead); + assertEquals(copyOfRecords.size(), recordsRead.size(), "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords, dataBlockRead.getRecords(), + assertEquals(copyOfRecords, recordsRead, "Both records lists should be the same. (ordering guaranteed)"); int logBlockReadNum = 1; while (reader.hasNext()) { @@ -514,26 +517,29 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { assertTrue(reader.hasNext(), "First block should be available"); HoodieLogBlock nextBlock = reader.next(); HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; - assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(), + List recordsRead1 = getRecords(dataBlockRead); + assertEquals(copyOfRecords1.size(),recordsRead1.size(), "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords1, dataBlockRead.getRecords(), + assertEquals(copyOfRecords1, recordsRead1, "Both records lists should be the same. (ordering guaranteed)"); assertEquals(dataBlockRead.getSchema(), getSimpleSchema()); reader.hasNext(); nextBlock = reader.next(); dataBlockRead = (HoodieDataBlock) nextBlock; - assertEquals(copyOfRecords2.size(), dataBlockRead.getRecords().size(), + List recordsRead2 = getRecords(dataBlockRead); + assertEquals(copyOfRecords2.size(), recordsRead2.size(), "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords2, dataBlockRead.getRecords(), + assertEquals(copyOfRecords2, recordsRead2, "Both records lists should be the same. (ordering guaranteed)"); reader.hasNext(); nextBlock = reader.next(); dataBlockRead = (HoodieDataBlock) nextBlock; - assertEquals(copyOfRecords3.size(), dataBlockRead.getRecords().size(), + List recordsRead3 = getRecords(dataBlockRead); + assertEquals(copyOfRecords3.size(), recordsRead3.size(), "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords3, dataBlockRead.getRecords(), + assertEquals(copyOfRecords3, recordsRead3, "Both records lists should be the same. (ordering guaranteed)"); reader.close(); } @@ -1634,25 +1640,28 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { HoodieLogBlock prevBlock = reader.prev(); HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock; - assertEquals(copyOfRecords3.size(), dataBlockRead.getRecords().size(), + List recordsRead1 = getRecords(dataBlockRead); + assertEquals(copyOfRecords3.size(), recordsRead1.size(), "Third records size should be equal to the written records size"); - assertEquals(copyOfRecords3, dataBlockRead.getRecords(), + assertEquals(copyOfRecords3, recordsRead1, "Both records lists should be the same. (ordering guaranteed)"); assertTrue(reader.hasPrev(), "Second block should be available"); prevBlock = reader.prev(); dataBlockRead = (HoodieDataBlock) prevBlock; - assertEquals(copyOfRecords2.size(), dataBlockRead.getRecords().size(), + List recordsRead2 = getRecords(dataBlockRead); + assertEquals(copyOfRecords2.size(), recordsRead2.size(), "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords2, dataBlockRead.getRecords(), + assertEquals(copyOfRecords2, recordsRead2, "Both records lists should be the same. (ordering guaranteed)"); assertTrue(reader.hasPrev(), "First block should be available"); prevBlock = reader.prev(); dataBlockRead = (HoodieDataBlock) prevBlock; - assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(), + List recordsRead3 = getRecords(dataBlockRead); + assertEquals(copyOfRecords1.size(), recordsRead3.size(), "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords1, dataBlockRead.getRecords(), + assertEquals(copyOfRecords1, recordsRead3, "Both records lists should be the same. (ordering guaranteed)"); assertFalse(reader.hasPrev()); @@ -1770,9 +1779,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { assertTrue(reader.hasPrev(), "First block should be available"); HoodieLogBlock prevBlock = reader.prev(); HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock; - assertEquals(copyOfRecords1.size(), dataBlockRead.getRecords().size(), + List recordsRead = getRecords(dataBlockRead); + assertEquals(copyOfRecords1.size(), recordsRead.size(), "Read records size should be equal to the written records size"); - assertEquals(copyOfRecords1, dataBlockRead.getRecords(), + assertEquals(copyOfRecords1, recordsRead, "Both records lists should be the same. (ordering guaranteed)"); assertFalse(reader.hasPrev()); @@ -1795,7 +1805,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { HoodieLogBlock logBlock = HoodieAvroDataBlock.getBlock(content, schema); assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, logBlock.getBlockType()); - List readRecords = ((HoodieAvroDataBlock) logBlock).getRecords(); + List readRecords = getRecords((HoodieAvroDataBlock) logBlock); assertEquals(readRecords.size(), recordsCopy.size()); for (int i = 0; i < recordsCopy.size(); ++i) { assertEquals(recordsCopy.get(i), readRecords.get(i)); @@ -1804,7 +1814,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { // Reader schema is optional if it is same as write schema logBlock = HoodieAvroDataBlock.getBlock(content, null); assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, logBlock.getBlockType()); - readRecords = ((HoodieAvroDataBlock) logBlock).getRecords(); + readRecords = getRecords((HoodieAvroDataBlock) logBlock); assertEquals(readRecords.size(), recordsCopy.size()); for (int i = 0; i < recordsCopy.size(); ++i) { assertEquals(recordsCopy.get(i), readRecords.get(i)); @@ -1861,9 +1871,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605); }}; - assertEquals(projectedRecords.size(), dataBlockRead.getRecords().size(), + List recordsRead = getRecords(dataBlockRead); + assertEquals(projectedRecords.size(), recordsRead.size(), "Read records size should be equal to the written records size"); - assertEquals(projectedRecords, dataBlockRead.getRecords(), + assertEquals(projectedRecords, recordsRead, "Both records lists should be the same. (ordering guaranteed)"); assertEquals(dataBlockRead.getSchema(), projectedSchema); @@ -1900,4 +1911,15 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true) ); } + + /** + * Utility to convert the given iterator to a List. + */ + private static List getRecords(HoodieDataBlock dataBlock) { + ClosableIterator itr = dataBlock.getRecordItr(); + + List elements = new ArrayList<>(); + itr.forEachRemaining(elements::add); + return elements; + } }