diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 683c6a75e..5866276b9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -160,14 +160,17 @@ public class HoodieAppendHandle extends HoodieIOH List keysToDelete = new ArrayList<>(); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, commitTime); - records.stream().forEach(record -> { + Iterator> recordsItr = records.iterator(); + while (recordsItr.hasNext()) { + HoodieRecord record = recordsItr.next(); Optional indexedRecord = getIndexedRecord(record); if (indexedRecord.isPresent()) { recordList.add(indexedRecord.get()); } else { keysToDelete.add(record.getRecordKey()); } - }); + recordsItr.remove(); //remove entries when IndexedRecord added to new list + } try { if (recordList.size() > 0) { writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, schema, metadata)); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/storage/SizeAwareDataInputStream.java b/hoodie-common/src/main/java/com/uber/hoodie/common/storage/SizeAwareDataInputStream.java new file mode 100644 index 000000000..dc9e04043 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/storage/SizeAwareDataInputStream.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.storage; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Keeps track of how many bytes were read from a DataInputStream + */ +public class SizeAwareDataInputStream { + + private final DataInputStream dis; + private final AtomicInteger numberOfBytesRead; + + public SizeAwareDataInputStream(DataInputStream dis) { + this.dis = dis; + this.numberOfBytesRead = new AtomicInteger(0); + } + + public int readInt() throws IOException { + numberOfBytesRead.addAndGet(Integer.BYTES); + return dis.readInt(); + } + + public void readFully(byte b[], int off, int len) throws IOException { + numberOfBytesRead.addAndGet(len); + dis.readFully(b, off, len); + } + + public void readFully(byte b[]) throws IOException { + numberOfBytesRead.addAndGet(b.length); + dis.readFully(b); + } + + public int skipBytes(int n) throws IOException { + numberOfBytesRead.addAndGet(n); + return dis.skipBytes(n); + } + + public void close() throws IOException { + dis.close(); + } + + public Integer getNumberOfBytesRead() { + return numberOfBytesRead.get(); + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java index d2f73ef1b..ebb72bb4b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java @@ -16,16 +16,9 @@ package com.uber.hoodie.common.table.log.block; +import com.uber.hoodie.common.storage.SizeAwareDataInputStream; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.exception.HoodieIOException; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; @@ -35,18 +28,31 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + /** - * DataBlock contains a list of records serialized using Avro. The Datablock contains 1. Compressed - * Writer Schema length 2. Compressed Writer Schema content 3. Total number of records in the block - * 4. Size of a record 5. Actual avro serialized content of the record + * DataBlock contains a list of records serialized using Avro. + * The Datablock contains + * 1. Compressed Writer Schema length + * 2. Compressed Writer Schema content + * 3. Total number of records in the block + * 4. Size of a record + * 5. Actual avro serialized content of the record */ public class HoodieAvroDataBlock extends HoodieLogBlock { private List records; private Schema schema; - public HoodieAvroDataBlock(List records, Schema schema, - Map metadata) { + public HoodieAvroDataBlock(List records, Schema schema, Map metadata) { super(metadata); this.records = records; this.schema = schema; @@ -56,6 +62,7 @@ public class HoodieAvroDataBlock extends HoodieLogBlock { this(records, schema, null); } + //TODO : (na) lazily create IndexedRecords only when required public List getRecords() { return records; } @@ -85,7 +92,9 @@ public class HoodieAvroDataBlock extends HoodieLogBlock { output.writeInt(records.size()); // 4. Write the records - records.forEach(s -> { + Iterator itr = records.iterator(); + while (itr.hasNext()) { + IndexedRecord s = itr.next(); ByteArrayOutputStream temp = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null); try { @@ -99,10 +108,11 @@ public class HoodieAvroDataBlock extends HoodieLogBlock { output.writeInt(size); // Write the content output.write(temp.toByteArray()); + itr.remove(); } catch (IOException e) { throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e); } - }); + } output.close(); return baos.toByteArray(); @@ -113,10 +123,10 @@ public class HoodieAvroDataBlock extends HoodieLogBlock { return HoodieLogBlockType.AVRO_DATA_BLOCK; } - public static HoodieLogBlock fromBytes(byte[] content, Schema readerSchema, boolean readMetadata) - throws IOException { + //TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used + public static HoodieLogBlock fromBytes(byte[] content, Schema readerSchema, boolean readMetadata) throws IOException { - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content)); + SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content))); Map metadata = null; // 1. Read the metadata written out, if applicable if (readMetadata) { @@ -132,22 +142,20 @@ public class HoodieAvroDataBlock extends HoodieLogBlock { readerSchema = writerSchema; } + //TODO : (na) lazily create IndexedRecords only when required GenericDatumReader reader = new GenericDatumReader<>(writerSchema, readerSchema); // 2. Get the total records int totalRecords = dis.readInt(); List records = new ArrayList<>(totalRecords); // 3. Read the content - for (int i = 0; i < totalRecords; i++) { - // TODO - avoid bytes copy + for (int i=0;i metadata = null; if (readMetadata) { metadata = HoodieLogBlock.getLogMetadata(dis); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java index 3858ae54e..5819c99e1 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java @@ -16,6 +16,8 @@ package com.uber.hoodie.common.table.log.block; +import com.uber.hoodie.common.storage.SizeAwareDataInputStream; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -62,7 +64,7 @@ public class HoodieCorruptBlock extends HoodieLogBlock { public static HoodieLogBlock fromBytes(byte[] content, int blockSize, boolean readMetadata) throws IOException { - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content)); + SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content))); Map metadata = null; int bytesRemaining = blockSize; if (readMetadata) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java index 485bfdcc0..3751124fe 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java @@ -23,6 +23,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.Charset; import java.util.Map; + +import com.uber.hoodie.common.storage.SizeAwareDataInputStream; import org.apache.commons.lang3.StringUtils; /** @@ -64,7 +66,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock { } public static HoodieLogBlock fromBytes(byte[] content, boolean readMetadata) throws IOException { - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content)); + SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content))); Map metadata = null; if (readMetadata) { metadata = HoodieLogBlock.getLogMetadata(dis); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java index 39049b25b..d21332f2f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java @@ -17,6 +17,7 @@ package com.uber.hoodie.common.table.log.block; import com.google.common.collect.Maps; +import com.uber.hoodie.common.storage.SizeAwareDataInputStream; import com.uber.hoodie.exception.HoodieException; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -90,7 +91,7 @@ public abstract class HoodieLogBlock { /** * Convert bytes to LogMetadata, follow the same order as {@link HoodieLogBlock#getLogMetadataBytes} */ - public static Map getLogMetadata(DataInputStream dis) + public static Map getLogMetadata(SizeAwareDataInputStream dis) throws IOException { Map metadata = Maps.newHashMap(); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index e9f3a3a87..00fd9ca5e 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -276,7 +276,11 @@ public class HoodieLogFormatTest { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); + Schema schema = getSimpleSchema(); List records = SchemaTestUtil.generateTestRecords(0, 100); + List copyOfRecords = records.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, @@ -292,8 +296,8 @@ public class HoodieLogFormatTest { nextBlock.getBlockType()); HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock; assertEquals("Read records size should be equal to the written records size", - records.size(), dataBlockRead.getRecords().size()); - assertEquals("Both records lists should be the same. (ordering guaranteed)", records, + copyOfRecords.size(), dataBlockRead.getRecords().size()); + assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords, dataBlockRead.getRecords()); } @@ -305,6 +309,10 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records1 = SchemaTestUtil.generateTestRecords(0, 100); + Schema schema = getSimpleSchema(); + List copyOfRecords1 = records1.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, @@ -316,6 +324,9 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records2 = SchemaTestUtil.generateTestRecords(0, 100); + List copyOfRecords2 = records2.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); dataBlock = new HoodieAvroDataBlock(records2, getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); @@ -326,6 +337,9 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records3 = SchemaTestUtil.generateTestRecords(0, 100); + List copyOfRecords3 = records3.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); dataBlock = new HoodieAvroDataBlock(records3, getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); @@ -337,22 +351,22 @@ public class HoodieLogFormatTest { HoodieLogBlock nextBlock = reader.next(); HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock; assertEquals("Read records size should be equal to the written records size", - records1.size(), dataBlockRead.getRecords().size()); - assertEquals("Both records lists should be the same. (ordering guaranteed)", records1, + copyOfRecords1.size(), dataBlockRead.getRecords().size()); + assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords1, dataBlockRead.getRecords()); nextBlock = reader.next(); dataBlockRead = (HoodieAvroDataBlock) nextBlock; assertEquals("Read records size should be equal to the written records size", - records2.size(), dataBlockRead.getRecords().size()); - assertEquals("Both records lists should be the same. (ordering guaranteed)", records2, + copyOfRecords2.size(), dataBlockRead.getRecords().size()); + assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords2, dataBlockRead.getRecords()); nextBlock = reader.next(); dataBlockRead = (HoodieAvroDataBlock) nextBlock; assertEquals("Read records size should be equal to the written records size", - records3.size(), dataBlockRead.getRecords().size()); - assertEquals("Both records lists should be the same. (ordering guaranteed)", records3, + copyOfRecords3.size(), dataBlockRead.getRecords().size()); + assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords3, dataBlockRead.getRecords()); } @@ -450,6 +464,9 @@ public class HoodieLogFormatTest { .overBaseCommit("100").withFs(fs).withSizeThreshold(500).build(); // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords1 = records1.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); @@ -459,6 +476,9 @@ public class HoodieLogFormatTest { // Write 2 List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords2 = records2.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -475,8 +495,8 @@ public class HoodieLogFormatTest { Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); - records1.addAll(records2); - Set originalKeys = records1.stream() + copyOfRecords1.addAll(copyOfRecords2); + Set originalKeys = copyOfRecords1.stream() .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect( Collectors.toSet()); @@ -495,6 +515,9 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords1 = records1.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); @@ -515,6 +538,9 @@ public class HoodieLogFormatTest { // Write 3 List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords3 = records3.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); dataBlock = new HoodieAvroDataBlock(records3, schema, metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -532,8 +558,8 @@ public class HoodieLogFormatTest { Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); - records1.addAll(records3); - Set originalKeys = records1.stream() + copyOfRecords1.addAll(copyOfRecords3); + Set originalKeys = copyOfRecords1.stream() .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect( Collectors.toSet()); @@ -552,6 +578,9 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords1 = records1.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); @@ -585,6 +614,10 @@ public class HoodieLogFormatTest { // Write 3 List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords3 = records3.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); + dataBlock = new HoodieAvroDataBlock(records3, schema, metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -602,8 +635,8 @@ public class HoodieLogFormatTest { Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); - records1.addAll(records3); - Set originalKeys = records1.stream() + copyOfRecords1.addAll(copyOfRecords3); + Set originalKeys = copyOfRecords1.stream() .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect( Collectors.toSet()); @@ -622,6 +655,9 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords1 = records1.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); @@ -631,11 +667,14 @@ public class HoodieLogFormatTest { // Write 2 List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords2 = records2.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); writer = writer.appendBlock(dataBlock); - records1.addAll(records2); - List originalKeys = records1.stream() + copyOfRecords1.addAll(copyOfRecords2); + List originalKeys = copyOfRecords1.stream() .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect( Collectors.toList()); @@ -691,9 +730,12 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords1 = records1.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema, metadata); writer = writer.appendBlock(dataBlock); @@ -703,7 +745,7 @@ public class HoodieLogFormatTest { dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); writer = writer.appendBlock(dataBlock); - List originalKeys = records1.stream() + List originalKeys = copyOfRecords1.stream() .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect( Collectors.toList()); @@ -757,6 +799,9 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords1 = records1.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); @@ -764,7 +809,7 @@ public class HoodieLogFormatTest { schema, metadata); writer = writer.appendBlock(dataBlock); - List originalKeys = records1.stream() + List originalKeys = copyOfRecords1.stream() .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect( Collectors.toList());