diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 30badf2a0..aa0dc101d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -25,6 +25,7 @@ import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.strategy.CompactionStrategy; import com.uber.hoodie.metrics.MetricsReporterType; import org.apache.spark.storage.StorageLevel; + import javax.annotation.concurrent.Immutable; import java.io.File; import java.io.FileReader; @@ -46,6 +47,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; + private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes"; + private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4*1024*1024); private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert"; private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false"; private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert"; @@ -104,6 +107,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM)); } + public int getWriteBufferLimitBytes() { + return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES)); + } + public boolean shouldCombineBeforeInsert() { return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_INSERT_PROP)); } @@ -391,6 +398,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withWriteBufferLimitBytes(int writeBufferLimit) { + props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit)); + return this; + } + public Builder combineInput(boolean onInsert, boolean onUpsert) { props.setProperty(COMBINE_BEFORE_INSERT_PROP, String.valueOf(onInsert)); props.setProperty(COMBINE_BEFORE_UPSERT_PROP, String.valueOf(onUpsert)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIterator.java b/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIterator.java new file mode 100644 index 000000000..08cea4dc7 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIterator.java @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.func; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.exception.HoodieException; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.util.SizeEstimator; + +import java.util.Iterator; +import java.util.Optional; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Used for buffering input records. Buffer limit is controlled by {@link #bufferMemoryLimit}. It internally samples + * every {@link #RECORD_SAMPLING_RATE}th record and adjusts number of records in buffer accordingly. This is done to + * ensure that we don't OOM. + */ +public class BufferedIterator> + implements Iterator> { + + private static Logger logger = LogManager.getLogger(BufferedIterator.class); + // interval used for polling records in the queue. + public static final int RECORD_POLL_INTERVAL_SEC = 5; + // rate used for sampling records to determine avg record size in bytes. + public static final int RECORD_SAMPLING_RATE = 64; + // maximum records that will be cached + private static final int RECORD_CACHING_LIMIT = 128 * 1024; + // It indicates number of records to cache. We will be using sampled record's average size to determine how many + // records we should cache and will change (increase/decrease) permits accordingly. + @VisibleForTesting + public final Semaphore rateLimiter = new Semaphore(1); + // used for sampling records with "RECORD_SAMPLING_RATE" frequency. + public final AtomicLong samplingRecordCounter = new AtomicLong(-1); + // indicates rate limit (number of records to cache). it is updated whenever there is a change in avg record size. + @VisibleForTesting + public int currentRateLimit = 1; + // internal buffer to cache buffered records. + private final LinkedBlockingQueue>> buffer = new LinkedBlockingQueue<>(); + // maximum amount of memory to be used for buffering records. + private final long bufferMemoryLimit; + // indicates avg record size in bytes. It is updated whenever a new record is sampled. + @VisibleForTesting + public long avgRecordSizeInBytes = 0; + // indicates number of samples collected so far. + private long numSamples = 0; + // original iterator from where records are read for buffering. + private final Iterator inputIterator; + // it holds the root cause of the exception in case either buffering records (reading from inputIterator) fails or + // thread reading records from buffer fails. + private final AtomicReference hasFailed = new AtomicReference(null); + // used for indicating that all the records from buffer are read successfully. + private final AtomicBoolean isDone = new AtomicBoolean(false); + // next record to be read from buffer. + private BufferedIteratorPayload nextRecord; + // schema used for fetching insertValue from HoodieRecord. + private final Schema schema; + + public BufferedIterator(final Iterator iterator, final long bufferMemoryLimit, final Schema schema) { + this.inputIterator = iterator; + this.bufferMemoryLimit = bufferMemoryLimit; + this.schema = schema; + } + + @VisibleForTesting + public int size() { + return this.buffer.size(); + } + + // It samples records with "RECORD_SAMPLING_RATE" frequency and computes average record size in bytes. It is used + // for determining how many maximum records to buffer. Based on change in avg size it may increase or decrease + // available permits. + private void adjustBufferSizeIfNeeded(final T record) throws InterruptedException { + if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) { + return; + } + final long recordSizeInBytes = SizeEstimator.estimate(record); + final long newAvgRecordSizeInBytes = + Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1)); + final int newRateLimit = + (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.bufferMemoryLimit / newAvgRecordSizeInBytes)); +// System.out.println("recordSizeInBytes:" + recordSizeInBytes + ":newAvgRecordSizeInBytes:" + newAvgRecordSizeInBytes +// + ":newRateLimit:" + newRateLimit + ":currentRateLimit:" + currentRateLimit + ":numSamples:" + numSamples +// + ":avgRecordSizeInBytes:" + avgRecordSizeInBytes); + + // If there is any change in number of records to cache then we will either release (if it increased) or acquire + // (if it decreased) to adjust rate limiting to newly computed value. + if (newRateLimit > currentRateLimit) { + rateLimiter.release(newRateLimit - currentRateLimit); + } else if (newRateLimit < currentRateLimit) { + rateLimiter.acquire(currentRateLimit - newRateLimit); + } + currentRateLimit = newRateLimit; + avgRecordSizeInBytes = newAvgRecordSizeInBytes; + numSamples++; + } + + // inserts record into internal buffer. It also fetches insert value from the record to offload computation work on to + // buffering thread. + private void insertRecord(T t) throws Exception { + rateLimiter.acquire(); + adjustBufferSizeIfNeeded(t); + // We are retrieving insert value in the record buffering thread to offload computation around schema validation + // and record creation to it. + final BufferedIteratorPayload payload = new BufferedIteratorPayload<>(t, this.schema); + buffer.put(Optional.of(payload)); + } + + private void readNextRecord() { + rateLimiter.release(); + Optional> newRecord; + while (true) { + try { + throwExceptionIfFailed(); + newRecord = buffer.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS); + if (newRecord != null) { + break; + } + } catch (InterruptedException e) { + logger.error("error reading records from BufferedIterator", e); + throw new HoodieException(e); + } + } + if (newRecord.isPresent()) { + this.nextRecord = newRecord.get(); + } else { + // We are done reading all the records from internal iterator. + this.isDone.set(true); + this.nextRecord = null; + } + } + + public void startBuffering() throws Exception { + logger.info("starting to buffer records"); + try { + while (inputIterator.hasNext()) { + // We need to stop buffering if buffer-reader has failed and exited. + throwExceptionIfFailed(); + insertRecord(inputIterator.next()); + } + // done buffering records notifying buffer-reader. + buffer.put(Optional.empty()); + } catch (Exception e) { + logger.error("error buffering records", e); + // Used for notifying buffer-reader thread of the failed operation. + markAsFailed(e); + throw e; + } + logger.info("finished buffering records"); + } + + @Override + public boolean hasNext() { + if (this.nextRecord == null && !this.isDone.get()) { + readNextRecord(); + } + return !this.isDone.get(); + } + + @Override + public BufferedIteratorPayload next() { + Preconditions.checkState(hasNext() && this.nextRecord != null); + final BufferedIteratorPayload ret = this.nextRecord; + this.nextRecord = null; + return ret; + } + + private void throwExceptionIfFailed() { + if (this.hasFailed.get() != null) { + throw new HoodieException("operation has failed", this.hasFailed.get()); + } + } + + public void markAsFailed(Exception e) { + this.hasFailed.set(e); + // release the permits so that if the buffering thread is waiting for permits then it will get it. + this.rateLimiter.release(RECORD_CACHING_LIMIT + 1); + } + + // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. + static class BufferedIteratorPayload { + public T record; + public Optional insertValue; + // It caches the exception seen while fetching insert value. + public Optional exception = Optional.empty(); + + public BufferedIteratorPayload(T record, Schema schema) { + this.record = record; + try { + this.insertValue = record.getData().getInsertValue(schema); + } catch (Exception e) { + this.exception = Optional.of(e); + } + } + } +} \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java index 8b49897e7..2406762a9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java @@ -20,15 +20,24 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.io.HoodieCreateHandle; import com.uber.hoodie.io.HoodieIOHandle; import com.uber.hoodie.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; + import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Set; -import org.apache.spark.TaskContext; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new @@ -37,6 +46,7 @@ import org.apache.spark.TaskContext; public class LazyInsertIterable extends LazyIterableIterator, List> { + private static Logger logger = LogManager.getLogger(LazyInsertIterable.class); private final HoodieWriteConfig hoodieConfig; private final String commitTime; private final HoodieTable hoodieTable; @@ -56,57 +66,95 @@ public class LazyInsertIterable extends protected void start() { } - @Override protected List computeNext() { - List statuses = new ArrayList<>(); + // Need to set current spark thread's TaskContext into newly launched thread so that new thread can access + // TaskContext properties. + final TaskContext sparkThreadTaskContext = TaskContext.get(); + // Executor service used for launching writer thread. + final ExecutorService writerService = Executors.newFixedThreadPool(1); + try { + // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES. + final BufferedIterator> bufferedIterator = + new BufferedIterator<>(inputItr, hoodieConfig.getWriteBufferLimitBytes(), + HoodieIOHandle.createHoodieWriteSchema(hoodieConfig)); + Future> writerResult = + writerService.submit( + () -> { + logger.info("starting hoodie writer thread"); + // Passing parent thread's TaskContext to newly launched thread for it to access original TaskContext + // properties. + TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext); + List statuses = new LinkedList<>(); + try { + statuses.addAll(handleWrite(bufferedIterator)); + logger.info("hoodie write is done; notifying reader thread"); + return statuses; + } catch (Exception e) { + logger.error("error writing hoodie records", e); + bufferedIterator.markAsFailed(e); + throw e; + } + }); + // Buffering records into internal buffer. This can throw exception either if reading records from spark fails or + // if writing buffered records into parquet file fails. + bufferedIterator.startBuffering(); + logger.info("waiting for hoodie write to finish"); + final List result = writerResult.get(); + assert result != null && !result.isEmpty() && !bufferedIterator.hasNext(); + return result; + } catch (Exception e) { + throw new HoodieException(e); + } finally { + writerService.shutdownNow(); + } + } - while (inputItr.hasNext()) { - HoodieRecord record = inputItr.next(); + private List handleWrite(final BufferedIterator> bufferedIterator) { + List statuses = new ArrayList<>(); + while (bufferedIterator.hasNext()) { + final BufferedIterator.BufferedIteratorPayload> payload = bufferedIterator.next(); // clean up any partial failures - if (!partitionsCleaned.contains(record.getPartitionPath())) { + if (!partitionsCleaned.contains(payload.record.getPartitionPath())) { // This insert task could fail multiple times, but Spark will faithfully retry with // the same data again. Thus, before we open any files under a given partition, we // first delete any files in the same partitionPath written by same Spark partition HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig, commitTime, - record.getPartitionPath(), + payload.record.getPartitionPath(), TaskContext.getPartitionId(), hoodieTable); - partitionsCleaned.add(record.getPartitionPath()); + partitionsCleaned.add(payload.record.getPartitionPath()); } // lazily initialize the handle, for the first time if (handle == null) { handle = - new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, - record.getPartitionPath()); + new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, payload.record.getPartitionPath()); } - if (handle.canWrite(record)) { - // write the record, if the handle has capacity - handle.write(record); + if (handle.canWrite(payload.record)) { + // write the payload, if the handle has capacity + handle.write(payload.record, payload.insertValue, payload.exception); } else { // handle is full. statuses.add(handle.close()); - // Need to handle the rejected record & open new handle + // Need to handle the rejected payload & open new handle handle = - new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, - record.getPartitionPath()); - handle.write(record); // we should be able to write 1 record. - break; + new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, payload.record.getPartitionPath()); + handle.write(payload.record, payload.insertValue, payload.exception); // we should be able to write 1 payload. } } // If we exited out, because we ran out of records, just close the pending handle. - if (!inputItr.hasNext()) { + if (!bufferedIterator.hasNext()) { if (handle != null) { statuses.add(handle.close()); } } - assert statuses.size() > 0; // should never return empty statuses + assert statuses.size() > 0 && !bufferedIterator.hasNext(); // should never return empty statuses return statuses; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 7f00961f2..4e62818b1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -149,8 +149,10 @@ public class HoodieAppendHandle extends HoodieIOH recordsDeleted++; } - hoodieRecord.deflate(); writeStatus.markSuccess(hoodieRecord, recordMetadata); + // deflate record payload after recording success. This will help users access payload as a part of marking + // record successful. + hoodieRecord.deflate(); return avroRecord; } catch (Exception e) { logger.error("Error writing record " + hoodieRecord, e); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index 1fa52de25..9c7fac34c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -93,11 +93,15 @@ public class HoodieCreateHandle extends HoodieIOH /** * Perform the actual writing of the given record into the backing file. */ - public void write(HoodieRecord record) { + public void write(HoodieRecord record, Optional insertValue, + Optional getInsertValueException) { Optional recordMetadata = record.getData().getMetadata(); try { - Optional avroRecord = record.getData().getInsertValue(schema); - + // throws exception if there was any exception while fetching insert value + if (getInsertValueException.isPresent()) { + throw getInsertValueException.get(); + } + Optional avroRecord = insertValue; if (avroRecord.isPresent()) { storageWriter.writeAvroWithMetadata(avroRecord.get(), record); // update the new location of record, so we know where to find it next @@ -106,8 +110,10 @@ public class HoodieCreateHandle extends HoodieIOH } else { recordsDeleted++; } - record.deflate(); status.markSuccess(record, recordMetadata); + // deflate record payload after recording success. This will help users access payload as a part of marking + // record successful. + record.deflate(); } catch (Throwable t) { // Not throwing exception from here, since we don't want to fail the entire job // for a single record diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index f207ea41a..41cf3644c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -50,8 +50,7 @@ public abstract class HoodieIOHandle { this.fs = hoodieTable.getMetaClient().getFs(); this.hoodieTable = hoodieTable; this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline(); - this.schema = - HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + this.schema = createHoodieWriteSchema(config); } public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) { @@ -101,4 +100,8 @@ public abstract class HoodieIOHandle { public Schema getSchema() { return schema; } + + public static Schema createHoodieWriteSchema(HoodieWriteConfig config) { + return HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 5dbe39d1f..df79cb83d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -167,8 +167,10 @@ public class HoodieMergeHandle extends HoodieIOHa recordsDeleted++; } - hoodieRecord.deflate(); writeStatus.markSuccess(hoodieRecord, recordMetadata); + // deflate record payload after recording success. This will help users access payload as a part of marking + // record successful. + hoodieRecord.deflate(); return true; } catch (Exception e) { logger.error("Error writing record " + hoodieRecord, e); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIterator.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIterator.java new file mode 100644 index 000000000..35e36ef1d --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIterator.java @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.func; + +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.exception.HoodieException; +import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.io.FileUtils; +import org.apache.spark.util.SizeEstimator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestBufferedIterator { + + private final HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(); + private final String commitTime = HoodieActiveTimeline.createNewCommitTime(); + private ExecutorService recordReader = null; + + @Before + public void beforeTest() { + this.recordReader = Executors.newFixedThreadPool(1); + } + + @After + public void afterTest() { + if (this.recordReader != null) { + this.recordReader.shutdownNow(); + this.recordReader = null; + } + } + + // Test to ensure that we are reading all records from buffered iterator in the same order without any exceptions. + @Test(timeout = 60000) + public void testRecordReading() throws IOException, ExecutionException, InterruptedException { + final int numRecords = 128; + final List hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); + final BufferedIterator bufferedIterator = + new BufferedIterator(hoodieRecords.iterator(), FileUtils.ONE_KB, HoodieTestDataGenerator.avroSchema); + Future result = + recordReader.submit( + () -> { + bufferedIterator.startBuffering(); + return true; + } + ); + final Iterator originalRecordIterator = hoodieRecords.iterator(); + int recordsRead = 0; + while (bufferedIterator.hasNext()) { + final HoodieRecord originalRecord = originalRecordIterator.next(); + final Optional originalInsertValue = + originalRecord.getData().getInsertValue(HoodieTestDataGenerator.avroSchema); + final BufferedIterator.BufferedIteratorPayload payload = bufferedIterator.next(); + // Ensure that record ordering is guaranteed. + Assert.assertEquals(originalRecord, payload.record); + // cached insert value matches the expected insert value. + Assert.assertEquals(originalInsertValue, payload.insertValue); + recordsRead++; + } + Assert.assertFalse(bufferedIterator.hasNext() || originalRecordIterator.hasNext()); + // all the records should be read successfully. + Assert.assertEquals(numRecords, recordsRead); + // should not throw any exceptions. + Assert.assertTrue(result.get()); + } + + // Test to ensure that record buffering is throttled when we hit memory limit. + @Test(timeout = 60000) + public void testMemoryLimitForBuffering() throws IOException, InterruptedException { + final int numRecords = 128; + final List hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); + // maximum number of records to keep in memory. + final int recordLimit = 5; + final long memoryLimitInBytes = recordLimit * SizeEstimator.estimate(hoodieRecords.get(0)); + final BufferedIterator bufferedIterator = + new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, HoodieTestDataGenerator.avroSchema); + Future result = + recordReader.submit( + () -> { + bufferedIterator.startBuffering(); + return true; + } + ); + // waiting for permits to expire. + while (!isQueueFull(bufferedIterator.rateLimiter)) { + Thread.sleep(10); + } + Assert.assertEquals(0, bufferedIterator.rateLimiter.availablePermits()); + Assert.assertEquals(recordLimit, bufferedIterator.currentRateLimit); + Assert.assertEquals(recordLimit, bufferedIterator.size()); + Assert.assertEquals(recordLimit - 1, bufferedIterator.samplingRecordCounter.get()); + + // try to read 2 records. + Assert.assertEquals(hoodieRecords.get(0), bufferedIterator.next().record); + Assert.assertEquals(hoodieRecords.get(1), bufferedIterator.next().record); + + // waiting for permits to expire. + while (!isQueueFull(bufferedIterator.rateLimiter)) { + Thread.sleep(10); + } + // No change is expected in rate limit or number of buffered records. We only expect buffering thread to read + // 2 more records into the buffer. + Assert.assertEquals(0, bufferedIterator.rateLimiter.availablePermits()); + Assert.assertEquals(recordLimit, bufferedIterator.currentRateLimit); + Assert.assertEquals(recordLimit, bufferedIterator.size()); + Assert.assertEquals(recordLimit - 1 + 2, bufferedIterator.samplingRecordCounter.get()); + } + + // Test to ensure that exception in either buffering thread or BufferedIterator-reader thread is propagated to + // another thread. + @Test(timeout = 60000) + public void testException() throws IOException, InterruptedException { + final int numRecords = 256; + final List hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); + // buffer memory limit + final long memoryLimitInBytes = 4 * SizeEstimator.estimate(hoodieRecords.get(0)); + + // first let us throw exception from bufferIterator reader and test that buffering thread stops and throws + // correct exception back. + BufferedIterator bufferedIterator1 = + new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, HoodieTestDataGenerator.avroSchema); + Future result = + recordReader.submit( + () -> { + bufferedIterator1.startBuffering(); + return true; + } + ); + // waiting for permits to expire. + while (!isQueueFull(bufferedIterator1.rateLimiter)) { + Thread.sleep(10); + } + // notify buffering thread of an exception and ensure that it exits. + final Exception e = new Exception("Failing it :)"); + bufferedIterator1.markAsFailed(e); + try { + result.get(); + Assert.fail("exception is expected"); + } catch (ExecutionException e1) { + Assert.assertEquals(HoodieException.class, e1.getCause().getClass()); + Assert.assertEquals(e, e1.getCause().getCause()); + } + + // second let us raise an exception while doing record buffering. this exception should get propagated to + // buffered iterator reader. + final RuntimeException expectedException = new RuntimeException("failing record reading"); + final Iterator mockHoodieRecordsIterator = mock(Iterator.class); + when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); + when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); + BufferedIterator bufferedIterator2 = + new BufferedIterator(mockHoodieRecordsIterator, memoryLimitInBytes, HoodieTestDataGenerator.avroSchema); + Future result2 = + recordReader.submit( + () -> { + bufferedIterator2.startBuffering(); + return true; + } + ); + try { + bufferedIterator2.hasNext(); + Assert.fail("exception is expected"); + } catch (Exception e1) { + Assert.assertEquals(expectedException, e1.getCause()); + } + // buffering thread should also have exited. make sure that it is not running. + try { + result2.get(); + Assert.fail("exception is expected"); + } catch (ExecutionException e2) { + Assert.assertEquals(expectedException, e2.getCause()); + } + } + + private boolean isQueueFull(Semaphore rateLimiter) { + return (rateLimiter.availablePermits() == 0 && rateLimiter.hasQueuedThreads()); + } +}