diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java index 5311f07a2..657c7869d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java @@ -26,7 +26,6 @@ import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.io.HoodieCreateHandle; import com.uber.hoodie.io.HoodieIOHandle; import com.uber.hoodie.table.HoodieTable; -import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -38,7 +37,6 @@ import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.spark.TaskContext; -import scala.Tuple2; /** * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new @@ -61,20 +59,30 @@ public class CopyOnWriteLazyInsertIterable extend this.hoodieTable = hoodieTable; } + // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. + static class HoodieInsertValueGenResult { + public T record; + public Optional insertValue; + // It caches the exception seen while fetching insert value. + public Optional exception = Optional.empty(); + + public HoodieInsertValueGenResult(T record, Schema schema) { + this.record = record; + try { + this.insertValue = record.getData().getInsertValue(schema); + } catch (Exception e) { + this.exception = Optional.of(e); + } + } + } + /** * Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some * expensive operations of transformation to the reader thread. */ static Function, - Tuple2, Optional>> getTransformFunction(Schema schema) { - return hoodieRecord -> { - try { - return new Tuple2, Optional>(hoodieRecord, - hoodieRecord.getData().getInsertValue(schema)); - } catch (IOException e) { - throw new HoodieException(e); - } - }; + HoodieInsertValueGenResult> getTransformFunction(Schema schema) { + return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, schema); } @Override @@ -85,7 +93,7 @@ public class CopyOnWriteLazyInsertIterable extend protected List computeNext() { // Executor service used for launching writer thread. BoundedInMemoryExecutor, - Tuple2, Optional>, List> bufferedIteratorExecutor = null; + HoodieInsertValueGenResult, List> bufferedIteratorExecutor = null; try { final Schema schema = HoodieIOHandle.createHoodieWriteSchema(hoodieConfig); bufferedIteratorExecutor = @@ -117,14 +125,14 @@ public class CopyOnWriteLazyInsertIterable extend * writes to one or more create-handles */ protected class CopyOnWriteInsertHandler extends - BoundedInMemoryQueueConsumer, Optional>, List> { + BoundedInMemoryQueueConsumer, List> { protected final List statuses = new ArrayList<>(); protected HoodieIOHandle handle; @Override - protected void consumeOneRecord(Tuple2, Optional> payload) { - final HoodieRecord insertPayload = payload._1(); + protected void consumeOneRecord(HoodieInsertValueGenResult payload) { + final HoodieRecord insertPayload = payload.record; // clean up any partial failures if (!partitionsCleaned.contains(insertPayload.getPartitionPath())) { // This insert task could fail multiple times, but Spark will faithfully retry with @@ -141,16 +149,16 @@ public class CopyOnWriteLazyInsertIterable extend .randomUUID().toString()); } - if (handle.canWrite(payload._1())) { + if (handle.canWrite(payload.record)) { // write the payload, if the handle has capacity - handle.write(insertPayload, payload._2()); + handle.write(insertPayload, payload.insertValue, payload.exception); } else { // handle is full. statuses.add(handle.close()); // Need to handle the rejected payload & open new handle handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID .randomUUID().toString()); - handle.write(insertPayload, payload._2()); // we should be able to write 1 payload. + handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java index a6b588976..b4eea0f1b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java @@ -25,9 +25,6 @@ import com.uber.hoodie.table.HoodieTable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Optional; -import org.apache.avro.generic.IndexedRecord; -import scala.Tuple2; /** * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new @@ -49,8 +46,8 @@ public class MergeOnReadLazyInsertIterable extend protected class MergeOnReadInsertHandler extends CopyOnWriteInsertHandler { @Override - protected void consumeOneRecord(Tuple2, Optional> payload) { - final HoodieRecord insertPayload = payload._1(); + protected void consumeOneRecord(HoodieInsertValueGenResult payload) { + final HoodieRecord insertPayload = payload.record; List statuses = new ArrayList<>(); // lazily initialize the handle, for the first time if (handle == null) { @@ -58,14 +55,14 @@ public class MergeOnReadLazyInsertIterable extend } if (handle.canWrite(insertPayload)) { // write the payload, if the handle has capacity - handle.write(insertPayload, payload._2); + handle.write(insertPayload, payload.insertValue, payload.exception); } else { // handle is full. handle.close(); statuses.add(handle.getWriteStatus()); // Need to handle the rejected payload & open new handle handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable); - handle.write(insertPayload, payload._2); // we should be able to write 1 payload. + handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 04f715bab..516ab5b78 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -33,7 +33,6 @@ import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.HoodieAvroUtils; -import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieAppendException; import com.uber.hoodie.exception.HoodieUpsertException; @@ -63,7 +62,6 @@ public class HoodieAppendHandle extends HoodieIOH private static Logger logger = LogManager.getLogger(HoodieAppendHandle.class); // This acts as the sequenceID for records written private static AtomicLong recordIndex = new AtomicLong(1); - private final WriteStatus writeStatus; private final String fileId; // Buffer for holding records in memory before they are flushed to disk private List recordList = new ArrayList<>(); @@ -98,9 +96,7 @@ public class HoodieAppendHandle extends HoodieIOH public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String fileId, Iterator> recordItr) { super(config, commitTime, hoodieTable); - WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); writeStatus.setStat(new HoodieDeltaWriteStat()); - this.writeStatus = writeStatus; this.fileId = fileId; this.fileSystemView = hoodieTable.getRTFileSystemView(); this.recordItr = recordItr; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index c82f8a2f2..c56d30c7e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -24,7 +24,6 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.util.FSUtils; -import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.io.storage.HoodieStorageWriter; @@ -43,7 +42,6 @@ public class HoodieCreateHandle extends HoodieIOH private static Logger logger = LogManager.getLogger(HoodieCreateHandle.class); - private final WriteStatus status; private final HoodieStorageWriter storageWriter; private final Path path; private Path tempPath = null; @@ -55,14 +53,13 @@ public class HoodieCreateHandle extends HoodieIOH public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath, String fileId) { super(config, commitTime, hoodieTable); - this.status = ReflectionUtils.loadClass(config.getWriteStatusClassName()); - status.setFileId(fileId); - status.setPartitionPath(partitionPath); + writeStatus.setFileId(fileId); + writeStatus.setPartitionPath(partitionPath); final int sparkPartitionId = TaskContext.getPartitionId(); - this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId()); + this.path = makeNewPath(partitionPath, sparkPartitionId, writeStatus.getFileId()); if (config.shouldUseTempFolderForCopyOnWriteForCreate()) { - this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), + this.tempPath = makeTempPath(partitionPath, sparkPartitionId, writeStatus.getFileId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); } @@ -87,7 +84,7 @@ public class HoodieCreateHandle extends HoodieIOH @Override public boolean canWrite(HoodieRecord record) { - return storageWriter.canWrite() && record.getPartitionPath().equals(status.getPartitionPath()); + return storageWriter.canWrite() && record.getPartitionPath().equals(writeStatus.getPartitionPath()); } /** @@ -99,13 +96,13 @@ public class HoodieCreateHandle extends HoodieIOH if (avroRecord.isPresent()) { storageWriter.writeAvroWithMetadata(avroRecord.get(), record); // update the new location of record, so we know where to find it next - record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId())); + record.setNewLocation(new HoodieRecordLocation(commitTime, writeStatus.getFileId())); recordsWritten++; insertRecordsWritten++; } else { recordsDeleted++; } - status.markSuccess(record, recordMetadata); + writeStatus.markSuccess(record, recordMetadata); // deflate record payload after recording success. This will help users access payload as a // part of marking // record successful. @@ -113,7 +110,7 @@ public class HoodieCreateHandle extends HoodieIOH } catch (Throwable t) { // Not throwing exception from here, since we don't want to fail the entire job // for a single record - status.markFailure(record, t, recordMetadata); + writeStatus.markFailure(record, t, recordMetadata); logger.error("Error writing record " + record, t); } } @@ -135,7 +132,7 @@ public class HoodieCreateHandle extends HoodieIOH @Override public WriteStatus getWriteStatus() { - return status; + return writeStatus; } /** @@ -143,27 +140,27 @@ public class HoodieCreateHandle extends HoodieIOH */ @Override public WriteStatus close() { - logger.info("Closing the file " + status.getFileId() + " as we are done with all the records " + logger.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); try { storageWriter.close(); HoodieWriteStat stat = new HoodieWriteStat(); - stat.setPartitionPath(status.getPartitionPath()); + stat.setPartitionPath(writeStatus.getPartitionPath()); stat.setNumWrites(recordsWritten); stat.setNumDeletes(recordsDeleted); stat.setNumInserts(insertRecordsWritten); stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); - stat.setFileId(status.getFileId()); + stat.setFileId(writeStatus.getFileId()); stat.setPaths(new Path(config.getBasePath()), path, tempPath); stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath())); - stat.setTotalWriteErrors(status.getFailedRecords().size()); + stat.setTotalWriteErrors(writeStatus.getFailedRecords().size()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalCreateTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); - status.setStat(stat); + writeStatus.setStat(stat); - return status; + return writeStatus; } catch (IOException e) { throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index 67aa6b2ef..af8574fd2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -24,6 +24,7 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.HoodieTimer; +import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; @@ -47,6 +48,7 @@ public abstract class HoodieIOHandle { protected final Schema schema; protected HoodieTimeline hoodieTimeline; protected HoodieTimer timer; + protected final WriteStatus writeStatus; public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable) { this.commitTime = commitTime; @@ -56,6 +58,7 @@ public abstract class HoodieIOHandle { this.hoodieTimeline = hoodieTable.getCompletedCommitsTimeline(); this.schema = createHoodieWriteSchema(config); this.timer = new HoodieTimer().startTimer(); + this.writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); } /** @@ -125,6 +128,20 @@ public abstract class HoodieIOHandle { // NO_OP } + /** + * Perform the actual writing of the given record into the backing file. + */ + public void write(HoodieRecord record, Optional avroRecord, Optional exception) { + Optional recordMetadata = record.getData().getMetadata(); + if (exception.isPresent() && exception.get() instanceof Throwable) { + // Not throwing exception from here, since we don't want to fail the entire job for a single record + writeStatus.markFailure(record, exception.get(), recordMetadata); + logger.error("Error writing record " + record, exception.get()); + } else { + write(record, avroRecord); + } + } + public abstract WriteStatus close(); public abstract WriteStatus getWriteStatus(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 210f311bc..0700b4cbb 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -28,7 +28,6 @@ import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.util.DefaultSizeEstimator; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieRecordSizeEstimator; -import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.common.util.collection.ExternalSpillableMap; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; @@ -54,7 +53,6 @@ public class HoodieMergeHandle extends HoodieIOHa private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class); - private WriteStatus writeStatus; private Map> keyToNewRecords; private Set writtenRecordKeys; private HoodieStorageWriter storageWriter; @@ -91,10 +89,7 @@ public class HoodieMergeHandle extends HoodieIOHa */ private void init(String fileId, String partitionPath, Optional dataFileToBeMerged) { this.writtenRecordKeys = new HashSet<>(); - - WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); writeStatus.setStat(new HoodieWriteStat()); - this.writeStatus = writeStatus; try { //TODO: dataFileToBeMerged must be optional. Will be fixed by Nishith's changes to support insert to log-files String latestValidFilePath = dataFileToBeMerged.get().getFileName(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestBoundedInMemoryExecutor.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBoundedInMemoryExecutor.java index b629b837f..886156437 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/func/TestBoundedInMemoryExecutor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBoundedInMemoryExecutor.java @@ -25,6 +25,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.util.queue.BoundedInMemoryQueueConsumer; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult; import java.util.List; import java.util.Optional; import org.apache.avro.generic.IndexedRecord; @@ -55,13 +56,13 @@ public class TestBoundedInMemoryExecutor { HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); - BoundedInMemoryQueueConsumer>, Integer> consumer = - new BoundedInMemoryQueueConsumer>, Integer>() { + BoundedInMemoryQueueConsumer, Integer> consumer = + new BoundedInMemoryQueueConsumer, Integer>() { private int count = 0; @Override - protected void consumeOneRecord(Tuple2> record) { + protected void consumeOneRecord(HoodieInsertValueGenResult record) { count++; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestBoundedInMemoryQueue.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBoundedInMemoryQueue.java index 4092f10a4..1add47af4 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/func/TestBoundedInMemoryQueue.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBoundedInMemoryQueue.java @@ -30,6 +30,7 @@ import com.uber.hoodie.common.util.queue.BoundedInMemoryQueueProducer; import com.uber.hoodie.common.util.queue.FunctionBasedQueueProducer; import com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -78,9 +79,8 @@ public class TestBoundedInMemoryQueue { public void testRecordReading() throws Exception { final int numRecords = 128; final List hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); - final BoundedInMemoryQueue>> queue = new BoundedInMemoryQueue(FileUtils.ONE_KB, - getTransformFunction(HoodieTestDataGenerator.avroSchema)); + final BoundedInMemoryQueue> queue = + new BoundedInMemoryQueue(FileUtils.ONE_KB, getTransformFunction(HoodieTestDataGenerator.avroSchema)); // Produce Future resFuture = executorService.submit(() -> { @@ -94,12 +94,12 @@ public class TestBoundedInMemoryQueue { final HoodieRecord originalRecord = originalRecordIterator.next(); final Optional originalInsertValue = originalRecord.getData() .getInsertValue(HoodieTestDataGenerator.avroSchema); - final Tuple2> payload = queue.iterator().next(); + final HoodieInsertValueGenResult payload = queue.iterator().next(); // Ensure that record ordering is guaranteed. - Assert.assertEquals(originalRecord, payload._1()); + Assert.assertEquals(originalRecord, payload.record); // cached insert value matches the expected insert value. Assert.assertEquals(originalInsertValue, - payload._1().getData().getInsertValue(HoodieTestDataGenerator.avroSchema)); + payload.record.getData().getInsertValue(HoodieTestDataGenerator.avroSchema)); recordsRead++; } Assert.assertFalse(queue.iterator().hasNext() || originalRecordIterator.hasNext()); @@ -119,7 +119,7 @@ public class TestBoundedInMemoryQueue { final int numProducers = 40; final List> recs = new ArrayList<>(); - final BoundedInMemoryQueue>> queue = + final BoundedInMemoryQueue> queue = new BoundedInMemoryQueue(FileUtils.ONE_KB, getTransformFunction(HoodieTestDataGenerator.avroSchema)); // Record Key to @@ -143,7 +143,7 @@ public class TestBoundedInMemoryQueue { if (i % 2 == 0) { producers.add(new IteratorBasedQueueProducer<>(r.iterator())); } else { - producers.add(new FunctionBasedQueueProducer((buf) -> { + producers.add(new FunctionBasedQueueProducer<>((buf) -> { Iterator itr = r.iterator(); while (itr.hasNext()) { try { @@ -185,8 +185,8 @@ public class TestBoundedInMemoryQueue { // Read recs and ensure we have covered all producer recs. while (queue.iterator().hasNext()) { - final Tuple2> payload = queue.iterator().next(); - final HoodieRecord rec = payload._1(); + final HoodieInsertValueGenResult payload = queue.iterator().next(); + final HoodieRecord rec = payload.record; Tuple2 producerPos = keyToProducerAndIndexMap.get(rec.getRecordKey()); Integer lastSeenPos = lastSeenMap.get(producerPos._1()); countMap.put(producerPos._1(), countMap.get(producerPos._1()) + 1); @@ -212,12 +212,13 @@ public class TestBoundedInMemoryQueue { final List hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); // maximum number of records to keep in memory. final int recordLimit = 5; - final SizeEstimator>> sizeEstimator = + final SizeEstimator> sizeEstimator = new DefaultSizeEstimator<>(); - final long objSize = sizeEstimator.sizeEstimate( - getTransformFunction(HoodieTestDataGenerator.avroSchema).apply(hoodieRecords.get(0))); + HoodieInsertValueGenResult payload = getTransformFunction(HoodieTestDataGenerator.avroSchema) + .apply(hoodieRecords.get(0)); + final long objSize = sizeEstimator.sizeEstimate(payload); final long memoryLimitInBytes = recordLimit * objSize; - final BoundedInMemoryQueue>> queue = + final BoundedInMemoryQueue> queue = new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.avroSchema)); @@ -236,8 +237,8 @@ public class TestBoundedInMemoryQueue { Assert.assertEquals(recordLimit - 1, queue.samplingRecordCounter.get()); // try to read 2 records. - Assert.assertEquals(hoodieRecords.get(0), queue.iterator().next()._1()); - Assert.assertEquals(hoodieRecords.get(1), queue.iterator().next()._1()); + Assert.assertEquals(hoodieRecords.get(0), queue.iterator().next().record); + Assert.assertEquals(hoodieRecords.get(1), queue.iterator().next().record); // waiting for permits to expire. while (!isQueueFull(queue.rateLimiter)) { @@ -263,8 +264,9 @@ public class TestBoundedInMemoryQueue { final SizeEstimator>> sizeEstimator = new DefaultSizeEstimator<>(); // queue memory limit - final long objSize = sizeEstimator.sizeEstimate( - getTransformFunction(HoodieTestDataGenerator.avroSchema).apply(hoodieRecords.get(0))); + HoodieInsertValueGenResult payload = getTransformFunction(HoodieTestDataGenerator.avroSchema) + .apply(hoodieRecords.get(0)); + final long objSize = sizeEstimator.sizeEstimate(new Tuple2<>(payload.record, payload.insertValue)); final long memoryLimitInBytes = 4 * objSize; // first let us throw exception from queueIterator reader and test that queueing thread