1
0

Parallelized read-write operations in Hoodie Merge phase

This commit is contained in:
Nishith Agarwal
2018-04-01 21:43:05 -07:00
committed by vinoth chandar
parent 6c226ca21a
commit 720e42f52a
13 changed files with 553 additions and 126 deletions

View File

@@ -18,8 +18,6 @@ package com.uber.hoodie.func;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
@@ -29,8 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.avro.Schema; import java.util.function.Function;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.util.SizeEstimator; import org.apache.spark.util.SizeEstimator;
@@ -39,9 +36,11 @@ import org.apache.spark.util.SizeEstimator;
* Used for buffering input records. Buffer limit is controlled by {@link #bufferMemoryLimit}. It * Used for buffering input records. Buffer limit is controlled by {@link #bufferMemoryLimit}. It
* internally samples every {@link #RECORD_SAMPLING_RATE}th record and adjusts number of records in * internally samples every {@link #RECORD_SAMPLING_RATE}th record and adjusts number of records in
* buffer accordingly. This is done to ensure that we don't OOM. * buffer accordingly. This is done to ensure that we don't OOM.
*
* @param <I> input payload data type
* @param <O> output payload data type
*/ */
public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRecord<K>> implements public class BufferedIterator<I, O> implements Iterator<O> {
Iterator<BufferedIterator.BufferedIteratorPayload<T>> {
// interval used for polling records in the queue. // interval used for polling records in the queue.
public static final int RECORD_POLL_INTERVAL_SEC = 5; public static final int RECORD_POLL_INTERVAL_SEC = 5;
@@ -58,20 +57,18 @@ public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRec
// used for sampling records with "RECORD_SAMPLING_RATE" frequency. // used for sampling records with "RECORD_SAMPLING_RATE" frequency.
public final AtomicLong samplingRecordCounter = new AtomicLong(-1); public final AtomicLong samplingRecordCounter = new AtomicLong(-1);
// internal buffer to cache buffered records. // internal buffer to cache buffered records.
private final LinkedBlockingQueue<Optional<BufferedIteratorPayload<T>>> buffer = new private final LinkedBlockingQueue<Optional<O>> buffer = new
LinkedBlockingQueue<>(); LinkedBlockingQueue<>();
// maximum amount of memory to be used for buffering records. // maximum amount of memory to be used for buffering records.
private final long bufferMemoryLimit; private final long bufferMemoryLimit;
// original iterator from where records are read for buffering. // original iterator from where records are read for buffering.
private final Iterator<T> inputIterator; private final Iterator<I> inputIterator;
// it holds the root cause of the exception in case either buffering records (reading from // it holds the root cause of the exception in case either buffering records (reading from
// inputIterator) fails or // inputIterator) fails or
// thread reading records from buffer fails. // thread reading records from buffer fails.
private final AtomicReference<Exception> hasFailed = new AtomicReference(null); private final AtomicReference<Exception> hasFailed = new AtomicReference(null);
// used for indicating that all the records from buffer are read successfully. // used for indicating that all the records from buffer are read successfully.
private final AtomicBoolean isDone = new AtomicBoolean(false); private final AtomicBoolean isDone = new AtomicBoolean(false);
// schema used for fetching insertValue from HoodieRecord.
private final Schema schema;
// indicates rate limit (number of records to cache). it is updated whenever there is a change // indicates rate limit (number of records to cache). it is updated whenever there is a change
// in avg record size. // in avg record size.
@VisibleForTesting @VisibleForTesting
@@ -82,13 +79,15 @@ public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRec
// indicates number of samples collected so far. // indicates number of samples collected so far.
private long numSamples = 0; private long numSamples = 0;
// next record to be read from buffer. // next record to be read from buffer.
private BufferedIteratorPayload<T> nextRecord; private O nextRecord;
// Function to transform the input payload to the expected output payload
private Function<I, O> bufferedIteratorTransform;
public BufferedIterator(final Iterator<T> iterator, final long bufferMemoryLimit, public BufferedIterator(final Iterator<I> iterator, final long bufferMemoryLimit,
final Schema schema) { final Function<I, O> bufferedIteratorTransform) {
this.inputIterator = iterator; this.inputIterator = iterator;
this.bufferMemoryLimit = bufferMemoryLimit; this.bufferMemoryLimit = bufferMemoryLimit;
this.schema = schema; this.bufferedIteratorTransform = bufferedIteratorTransform;
} }
@VisibleForTesting @VisibleForTesting
@@ -101,7 +100,7 @@ public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRec
// for determining how many maximum records to buffer. Based on change in avg size it may // for determining how many maximum records to buffer. Based on change in avg size it may
// increase or decrease // increase or decrease
// available permits. // available permits.
private void adjustBufferSizeIfNeeded(final T record) throws InterruptedException { private void adjustBufferSizeIfNeeded(final I record) throws InterruptedException {
if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) { if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
return; return;
} }
@@ -110,14 +109,8 @@ public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRec
.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1)); .max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
final int newRateLimit = (int) Math final int newRateLimit = (int) Math
.min(RECORD_CACHING_LIMIT, Math.max(1, this.bufferMemoryLimit / newAvgRecordSizeInBytes)); .min(RECORD_CACHING_LIMIT, Math.max(1, this.bufferMemoryLimit / newAvgRecordSizeInBytes));
// System.out.println("recordSizeInBytes:" + recordSizeInBytes + ":newAvgRecordSizeInBytes:" +
// newAvgRecordSizeInBytes
// + ":newRateLimit:" + newRateLimit + ":currentRateLimit:" + currentRateLimit +
// ":numSamples:" + numSamples
// + ":avgRecordSizeInBytes:" + avgRecordSizeInBytes);
// If there is any change in number of records to cache then we will either release (if it // If there is any change in number of records to cache then we will either release (if it increased) or acquire
// increased) or acquire
// (if it decreased) to adjust rate limiting to newly computed value. // (if it decreased) to adjust rate limiting to newly computed value.
if (newRateLimit > currentRateLimit) { if (newRateLimit > currentRateLimit) {
rateLimiter.release(newRateLimit - currentRateLimit); rateLimiter.release(newRateLimit - currentRateLimit);
@@ -132,19 +125,19 @@ public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRec
// inserts record into internal buffer. It also fetches insert value from the record to offload // inserts record into internal buffer. It also fetches insert value from the record to offload
// computation work on to // computation work on to
// buffering thread. // buffering thread.
private void insertRecord(T t) throws Exception { private void insertRecord(I t) throws Exception {
rateLimiter.acquire(); rateLimiter.acquire();
adjustBufferSizeIfNeeded(t); adjustBufferSizeIfNeeded(t);
// We are retrieving insert value in the record buffering thread to offload computation // We are retrieving insert value in the record buffering thread to offload computation
// around schema validation // around schema validation
// and record creation to it. // and record creation to it.
final BufferedIteratorPayload<T> payload = new BufferedIteratorPayload<>(t, this.schema); final O payload = bufferedIteratorTransform.apply(t);
buffer.put(Optional.of(payload)); buffer.put(Optional.of(payload));
} }
private void readNextRecord() { private void readNextRecord() {
rateLimiter.release(); rateLimiter.release();
Optional<BufferedIteratorPayload<T>> newRecord; Optional<O> newRecord;
while (true) { while (true) {
try { try {
throwExceptionIfFailed(); throwExceptionIfFailed();
@@ -194,9 +187,9 @@ public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRec
} }
@Override @Override
public BufferedIteratorPayload<T> next() { public O next() {
Preconditions.checkState(hasNext() && this.nextRecord != null); Preconditions.checkState(hasNext() && this.nextRecord != null);
final BufferedIteratorPayload<T> ret = this.nextRecord; final O ret = this.nextRecord;
this.nextRecord = null; this.nextRecord = null;
return ret; return ret;
} }
@@ -213,23 +206,4 @@ public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRec
// get it. // get it.
this.rateLimiter.release(RECORD_CACHING_LIMIT + 1); this.rateLimiter.release(RECORD_CACHING_LIMIT + 1);
} }
// Used for caching HoodieRecord along with insertValue. We need this to offload computation
// work to buffering thread.
static class BufferedIteratorPayload<T extends HoodieRecord> {
public T record;
public Optional<IndexedRecord> insertValue;
// It caches the exception seen while fetching insert value.
public Optional<Exception> exception = Optional.empty();
public BufferedIteratorPayload(T record, Schema schema) {
this.record = record;
try {
this.insertValue = record.getData().getInsertValue(schema);
} catch (Exception e) {
this.exception = Optional.of(e);
}
}
}
} }

View File

@@ -0,0 +1,89 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.func;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
/**
* Executor for a BufferedIterator operation. This class takes as input the input iterator which
* needs to be buffered, the runnable function that needs to be executed in the reader thread and
* return the transformed output based on the writer function
*/
public class BufferedIteratorExecutor<I, O, E> {
private static Logger logger = LogManager.getLogger(BufferedIteratorExecutor.class);
// Executor service used for launching writer thread.
final ExecutorService writerService;
// Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
final BufferedIterator<I, O> bufferedIterator;
// Need to set current spark thread's TaskContext into newly launched thread so that new
// thread can access
// TaskContext properties.
final TaskContext sparkThreadTaskContext;
public BufferedIteratorExecutor(final HoodieWriteConfig hoodieConfig, final Iterator<I> inputItr,
final Function<I, O> bufferedIteratorTransform,
final ExecutorService writerService) {
this.sparkThreadTaskContext = TaskContext.get();
this.writerService = writerService;
this.bufferedIterator = new BufferedIterator<>(inputItr, hoodieConfig.getWriteBufferLimitBytes(),
bufferedIteratorTransform);
}
/**
* Starts buffering and executing the writer function
*/
public Future<E> start(Function<BufferedIterator, E> writerFunction) {
try {
Future<E> future = writerService.submit(
() -> {
logger.info("starting hoodie writer thread");
// Passing parent thread's TaskContext to newly launched thread for it to access original TaskContext
// properties.
TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext);
try {
E result = writerFunction.apply(bufferedIterator);
logger.info("hoodie write is done; notifying reader thread");
return result;
} catch (Exception e) {
logger.error("error writing hoodie records", e);
bufferedIterator.markAsFailed(e);
throw e;
}
});
bufferedIterator.startBuffering();
return future;
} catch (Exception e) {
throw new HoodieException(e);
}
}
public boolean isRemaining() {
return bufferedIterator.hasNext();
}
}

View File

@@ -21,6 +21,8 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.func.payload.AbstractBufferedIteratorPayload;
import com.uber.hoodie.func.payload.HoodieRecordBufferedIteratorPayload;
import com.uber.hoodie.io.HoodieCreateHandle; import com.uber.hoodie.io.HoodieCreateHandle;
import com.uber.hoodie.io.HoodieIOHandle; import com.uber.hoodie.io.HoodieIOHandle;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
@@ -29,14 +31,15 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.log4j.LogManager; import java.util.function.Function;
import org.apache.log4j.Logger; import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext; import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
/** /**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
@@ -45,7 +48,6 @@ import org.apache.spark.TaskContext$;
public class LazyInsertIterable<T extends HoodieRecordPayload> extends public class LazyInsertIterable<T extends HoodieRecordPayload> extends
LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> { LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
private static Logger logger = LogManager.getLogger(LazyInsertIterable.class);
private final HoodieWriteConfig hoodieConfig; private final HoodieWriteConfig hoodieConfig;
private final String commitTime; private final String commitTime;
private final HoodieTable<T> hoodieTable; private final HoodieTable<T> hoodieTable;
@@ -65,44 +67,35 @@ public class LazyInsertIterable<T extends HoodieRecordPayload> extends
protected void start() { protected void start() {
} }
/**
* Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some
* expensive operations of transformation to the reader thread.
* @param schema
* @param <T>
* @return
*/
public static <T extends HoodieRecordPayload> Function<HoodieRecord<T>, AbstractBufferedIteratorPayload>
bufferedItrPayloadTransform(Schema schema) {
return (hoodieRecord) -> new HoodieRecordBufferedIteratorPayload(hoodieRecord, schema);
}
@Override @Override
protected List<WriteStatus> computeNext() { protected List<WriteStatus> computeNext() {
// Need to set current spark thread's TaskContext into newly launched thread so that new
// thread can access
// TaskContext properties.
final TaskContext sparkThreadTaskContext = TaskContext.get();
// Executor service used for launching writer thread. // Executor service used for launching writer thread.
final ExecutorService writerService = Executors.newFixedThreadPool(1); final ExecutorService writerService = Executors.newFixedThreadPool(1);
try { try {
// Used for buffering records which is controlled by Function<BufferedIterator, List<WriteStatus>> function = (bufferedIterator) -> {
// HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
final BufferedIterator<T, HoodieRecord<T>> bufferedIterator = new BufferedIterator<>(inputItr,
hoodieConfig.getWriteBufferLimitBytes(),
HoodieIOHandle.createHoodieWriteSchema(hoodieConfig));
Future<List<WriteStatus>> writerResult = writerService.submit(() -> {
logger.info("starting hoodie writer thread");
// Passing parent thread's TaskContext to newly launched thread for it to access original
// TaskContext
// properties.
TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext);
List<WriteStatus> statuses = new LinkedList<>(); List<WriteStatus> statuses = new LinkedList<>();
try { statuses.addAll(handleWrite(bufferedIterator));
statuses.addAll(handleWrite(bufferedIterator)); return statuses;
logger.info("hoodie write is done; notifying reader thread"); };
return statuses; BufferedIteratorExecutor<HoodieRecord<T>, AbstractBufferedIteratorPayload, List<WriteStatus>>
} catch (Exception e) { bufferedIteratorExecutor = new BufferedIteratorExecutor(hoodieConfig, inputItr,
logger.error("error writing hoodie records", e); bufferedItrPayloadTransform(HoodieIOHandle.createHoodieWriteSchema(hoodieConfig)),
bufferedIterator.markAsFailed(e); writerService);
throw e; Future<List<WriteStatus>> writerResult = bufferedIteratorExecutor.start(function);
}
});
// Buffering records into internal buffer. This can throw exception either if reading
// records from spark fails or
// if writing buffered records into parquet file fails.
bufferedIterator.startBuffering();
logger.info("waiting for hoodie write to finish");
final List<WriteStatus> result = writerResult.get(); final List<WriteStatus> result = writerResult.get();
assert result != null && !result.isEmpty() && !bufferedIterator.hasNext(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result; return result;
} catch (Exception e) { } catch (Exception e) {
throw new HoodieException(e); throw new HoodieException(e);
@@ -112,38 +105,38 @@ public class LazyInsertIterable<T extends HoodieRecordPayload> extends
} }
private List<WriteStatus> handleWrite( private List<WriteStatus> handleWrite(
final BufferedIterator<T, HoodieRecord<T>> bufferedIterator) { final BufferedIterator<HoodieRecord<T>, AbstractBufferedIteratorPayload> bufferedIterator) {
List<WriteStatus> statuses = new ArrayList<>(); List<WriteStatus> statuses = new ArrayList<>();
while (bufferedIterator.hasNext()) { while (bufferedIterator.hasNext()) {
final BufferedIterator.BufferedIteratorPayload<HoodieRecord<T>> payload = bufferedIterator final HoodieRecordBufferedIteratorPayload payload = (HoodieRecordBufferedIteratorPayload) bufferedIterator
.next(); .next();
final HoodieRecord insertPayload = (HoodieRecord) payload.getInputPayload();
// clean up any partial failures // clean up any partial failures
if (!partitionsCleaned.contains(payload.record.getPartitionPath())) { if (!partitionsCleaned
.contains(insertPayload.getPartitionPath())) {
// This insert task could fail multiple times, but Spark will faithfully retry with // This insert task could fail multiple times, but Spark will faithfully retry with
// the same data again. Thus, before we open any files under a given partition, we // the same data again. Thus, before we open any files under a given partition, we
// first delete any files in the same partitionPath written by same Spark partition // first delete any files in the same partitionPath written by same Spark partition
HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig, commitTime, HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig, commitTime, insertPayload.getPartitionPath(),
payload.record.getPartitionPath(), TaskContext.getPartitionId(), hoodieTable); TaskContext.getPartitionId(), hoodieTable);
partitionsCleaned.add(payload.record.getPartitionPath()); partitionsCleaned.add(insertPayload.getPartitionPath());
} }
// lazily initialize the handle, for the first time // lazily initialize the handle, for the first time
if (handle == null) { if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath());
payload.record.getPartitionPath());
} }
if (handle.canWrite(payload.record)) { if (handle.canWrite(((HoodieRecord) payload.getInputPayload()))) {
// write the payload, if the handle has capacity // write the payload, if the handle has capacity
handle.write(payload.record, payload.insertValue, payload.exception); handle.write(insertPayload, (Optional<IndexedRecord>) payload.getOutputPayload(), payload.exception);
} else { } else {
// handle is full. // handle is full.
statuses.add(handle.close()); statuses.add(handle.close());
// Need to handle the rejected payload & open new handle // Need to handle the rejected payload & open new handle
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath());
payload.record.getPartitionPath()); handle.write(insertPayload,
handle.write(payload.record, payload.insertValue, (Optional<IndexedRecord>) payload.getOutputPayload(),
payload.exception); // we should be able to write 1 payload. payload.exception); // we should be able to write 1 payload.
} }
} }

View File

@@ -0,0 +1,72 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.func;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Iterator;
import org.apache.parquet.hadoop.ParquetReader;
/**
* This class wraps a parquet reader and provides an iterator based api to
* read from a parquet file. This is used in {@link BufferedIterator}
*/
public class ParquetReaderIterator<T> implements Iterator<T> {
// Parquet reader for an existing parquet file
private final ParquetReader<T> parquetReader;
// Holds the next entry returned by the parquet reader
private T next;
public ParquetReaderIterator(ParquetReader<T> parquetReader) {
this.parquetReader = parquetReader;
}
@Override
public boolean hasNext() {
try {
// To handle when hasNext() is called multiple times for idempotency and/or the first time
if (this.next == null) {
this.next = parquetReader.read();
}
return this.next != null;
} catch (IOException io) {
throw new HoodieIOException("unable to read next record from parquet file ", io);
}
}
@Override
public T next() {
try {
// To handle case when next() is called before hasNext()
if (this.next == null) {
if (!hasNext()) {
throw new HoodieIOException("No more records left to read from parquet file");
}
}
T retVal = this.next;
this.next = parquetReader.read();
return retVal;
} catch (IOException io) {
throw new HoodieIOException("unable to read next record from parquet file ", io);
}
}
public void close() throws IOException {
parquetReader.close();
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.func.payload;
/**
* @param <I> Input data type for BufferedIterator
* @param <O> Output data type for BufferedIterator
*/
public abstract class AbstractBufferedIteratorPayload<I, O> {
// input payload for iterator
protected I inputPayload;
// output payload for iterator, this is used in cases where the output payload is computed
// from the input payload and most of this computation is off-loaded to the reader
protected O outputPayload;
public AbstractBufferedIteratorPayload(I record) {
this.inputPayload = record;
}
public I getInputPayload() {
return inputPayload;
}
public O getOutputPayload() {
return outputPayload;
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.func.payload;
import org.apache.avro.generic.GenericRecord;
/**
* BufferedIteratorPayload that takes GenericRecord as input and GenericRecord as output
*/
public class GenericRecordBufferedIteratorPayload
extends AbstractBufferedIteratorPayload<GenericRecord, GenericRecord> {
public GenericRecordBufferedIteratorPayload(GenericRecord record) {
super(record);
this.outputPayload = record;
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.func.payload;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
/**
* BufferedIteratorPayload that takes HoodieRecord as input and transforms to output Optional<IndexedRecord>
* @param <T>
*/
public class HoodieRecordBufferedIteratorPayload<T extends HoodieRecordPayload>
extends AbstractBufferedIteratorPayload<HoodieRecord<T>, Optional<IndexedRecord>> {
// It caches the exception seen while fetching insert value.
public Optional<Exception> exception = Optional.empty();
public HoodieRecordBufferedIteratorPayload(HoodieRecord record, Schema schema) {
super(record);
try {
this.outputPayload = record.getData().getInsertValue(schema);
} catch (Exception e) {
this.exception = Optional.of(e);
}
}
public Optional<Exception> getException() {
return exception;
}
}

View File

@@ -135,8 +135,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
try { try {
// Load the new records in a map // Load the new records in a map
logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge()); logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), Optional.empty(),
Optional.empty(), new StringConverter(), new HoodieRecordConverter(schema, config.getPayloadClass())); new StringConverter(), new HoodieRecordConverter(schema, config.getPayloadClass()));
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
} }
@@ -148,7 +148,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
// update the new location of the record, so we know where to find it next // update the new location of the record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, fileId)); record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
} }
logger.debug("Number of entries in MemoryBasedMap => " logger.info("Number of entries in MemoryBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
+ "Total size in bytes of MemoryBasedMap => " + "Total size in bytes of MemoryBasedMap => "
+ ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize()
@@ -156,7 +156,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
+ ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries()
+ "Size of file spilled to disk => " + "Size of file spilled to disk => "
+ ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
return partitionPath; return partitionPath;
} }
@@ -186,7 +185,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
} }
/** /**
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file. * Go through an old record. Here if we detect a newer version shows up, we write the new one to
* the file.
*/ */
public void write(GenericRecord oldRecord) { public void write(GenericRecord oldRecord) {
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();

View File

@@ -34,10 +34,16 @@ import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.BufferedIterator;
import com.uber.hoodie.func.BufferedIteratorExecutor;
import com.uber.hoodie.func.LazyInsertIterable; import com.uber.hoodie.func.LazyInsertIterable;
import com.uber.hoodie.func.ParquetReaderIterator;
import com.uber.hoodie.func.payload.AbstractBufferedIteratorPayload;
import com.uber.hoodie.func.payload.GenericRecordBufferedIteratorPayload;
import com.uber.hoodie.io.HoodieCleanHelper; import com.uber.hoodie.io.HoodieCleanHelper;
import com.uber.hoodie.io.HoodieMergeHandle; import com.uber.hoodie.io.HoodieMergeHandle;
import java.io.IOException; import java.io.IOException;
@@ -52,6 +58,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
@@ -173,8 +182,19 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return handleUpdateInternal(upsertHandle, commitTime, fileLoc); return handleUpdateInternal(upsertHandle, commitTime, fileLoc);
} }
/**
* Transformer function to help transform a GenericRecord. This transformer is used by BufferedIterator to offload
* some expensive operations of transformation to the reader thread.
*
*/
public static java.util.function.Function<GenericRecord, AbstractBufferedIteratorPayload>
bufferedItrPayloadTransform() {
return (genericRecord) -> new GenericRecordBufferedIteratorPayload(genericRecord);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle,
String commitTime, String fileLoc) throws IOException { String commitTime, String fileLoc)
throws IOException {
if (upsertHandle.getOldFilePath() == null) { if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException( throw new HoodieUpsertException(
"Error in finding the old file path at commit " + commitTime + " at fileLoc: " + fileLoc); "Error in finding the old file path at commit " + commitTime + " at fileLoc: " + fileLoc);
@@ -182,32 +202,44 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema());
ParquetReader<IndexedRecord> reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) ParquetReader<IndexedRecord> reader = AvroParquetReader.builder(upsertHandle.getOldFilePath())
.withConf(getHadoopConf()).build(); .withConf(getHadoopConf()).build();
final ExecutorService writerService = Executors.newFixedThreadPool(1);
try { try {
IndexedRecord record; java.util.function.Function<BufferedIterator, Void> runnableFunction = (bufferedIterator) -> {
while ((record = reader.read()) != null) { handleWrite(bufferedIterator, upsertHandle);
// Two types of writes here (new record, and old record). return null;
// We have already catch the exception during writing new records. };
// But for old records, we should fail if any exception happens. BufferedIteratorExecutor<GenericRecord, AbstractBufferedIteratorPayload, Void> wrapper =
upsertHandle.write((GenericRecord) record); new BufferedIteratorExecutor(config, new ParquetReaderIterator(reader), bufferedItrPayloadTransform(),
} writerService);
} catch (IOException e) { Future writerResult = wrapper.start(runnableFunction);
throw new HoodieUpsertException( writerResult.get();
"Failed to read record from " + upsertHandle.getOldFilePath() + " with new Schema " } catch (Exception e) {
+ upsertHandle.getSchema(), e); throw new HoodieException(e);
} finally { } finally {
reader.close(); reader.close();
upsertHandle.close(); upsertHandle.close();
writerService.shutdownNow();
} }
} }
//TODO(vc): This needs to be revisited //TODO(vc): This needs to be revisited
if (upsertHandle.getWriteStatus().getPartitionPath() == null) { if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath()
+ upsertHandle.getWriteStatus()); + ", " + upsertHandle.getWriteStatus());
} }
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())) return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus()))
.iterator(); .iterator();
} }
private void handleWrite(final BufferedIterator<GenericRecord, GenericRecord> bufferedIterator,
final HoodieMergeHandle upsertHandle) {
while (bufferedIterator.hasNext()) {
final GenericRecordBufferedIteratorPayload payload = (GenericRecordBufferedIteratorPayload) bufferedIterator
.next();
upsertHandle.write(payload.getOutputPayload());
}
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc, protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc,
Iterator<HoodieRecord<T>> recordItr) { Iterator<HoodieRecord<T>> recordItr) {
return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc); return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc);

View File

@@ -23,6 +23,8 @@ import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.func.payload.AbstractBufferedIteratorPayload;
import com.uber.hoodie.func.payload.HoodieRecordBufferedIteratorPayload;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@@ -66,7 +68,7 @@ public class TestBufferedIterator {
final int numRecords = 128; final int numRecords = 128;
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords);
final BufferedIterator bufferedIterator = new BufferedIterator(hoodieRecords.iterator(), FileUtils.ONE_KB, final BufferedIterator bufferedIterator = new BufferedIterator(hoodieRecords.iterator(), FileUtils.ONE_KB,
HoodieTestDataGenerator.avroSchema); LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema));
Future<Boolean> result = recordReader.submit(() -> { Future<Boolean> result = recordReader.submit(() -> {
bufferedIterator.startBuffering(); bufferedIterator.startBuffering();
return true; return true;
@@ -77,11 +79,12 @@ public class TestBufferedIterator {
final HoodieRecord originalRecord = originalRecordIterator.next(); final HoodieRecord originalRecord = originalRecordIterator.next();
final Optional<IndexedRecord> originalInsertValue = originalRecord.getData() final Optional<IndexedRecord> originalInsertValue = originalRecord.getData()
.getInsertValue(HoodieTestDataGenerator.avroSchema); .getInsertValue(HoodieTestDataGenerator.avroSchema);
final BufferedIterator.BufferedIteratorPayload payload = bufferedIterator.next(); final HoodieRecordBufferedIteratorPayload payload = (HoodieRecordBufferedIteratorPayload) bufferedIterator.next();
// Ensure that record ordering is guaranteed. // Ensure that record ordering is guaranteed.
Assert.assertEquals(originalRecord, payload.record); Assert.assertEquals(originalRecord, payload.getInputPayload());
// cached insert value matches the expected insert value. // cached insert value matches the expected insert value.
Assert.assertEquals(originalInsertValue, payload.insertValue); Assert.assertEquals(originalInsertValue,
((HoodieRecord) payload.getInputPayload()).getData().getInsertValue(HoodieTestDataGenerator.avroSchema));
recordsRead++; recordsRead++;
} }
Assert.assertFalse(bufferedIterator.hasNext() || originalRecordIterator.hasNext()); Assert.assertFalse(bufferedIterator.hasNext() || originalRecordIterator.hasNext());
@@ -99,8 +102,9 @@ public class TestBufferedIterator {
// maximum number of records to keep in memory. // maximum number of records to keep in memory.
final int recordLimit = 5; final int recordLimit = 5;
final long memoryLimitInBytes = recordLimit * SizeEstimator.estimate(hoodieRecords.get(0)); final long memoryLimitInBytes = recordLimit * SizeEstimator.estimate(hoodieRecords.get(0));
final BufferedIterator bufferedIterator = new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, final BufferedIterator<HoodieRecord, AbstractBufferedIteratorPayload> bufferedIterator =
HoodieTestDataGenerator.avroSchema); new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes,
LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema));
Future<Boolean> result = recordReader.submit(() -> { Future<Boolean> result = recordReader.submit(() -> {
bufferedIterator.startBuffering(); bufferedIterator.startBuffering();
return true; return true;
@@ -115,8 +119,8 @@ public class TestBufferedIterator {
Assert.assertEquals(recordLimit - 1, bufferedIterator.samplingRecordCounter.get()); Assert.assertEquals(recordLimit - 1, bufferedIterator.samplingRecordCounter.get());
// try to read 2 records. // try to read 2 records.
Assert.assertEquals(hoodieRecords.get(0), bufferedIterator.next().record); Assert.assertEquals(hoodieRecords.get(0), bufferedIterator.next().getInputPayload());
Assert.assertEquals(hoodieRecords.get(1), bufferedIterator.next().record); Assert.assertEquals(hoodieRecords.get(1), bufferedIterator.next().getInputPayload());
// waiting for permits to expire. // waiting for permits to expire.
while (!isQueueFull(bufferedIterator.rateLimiter)) { while (!isQueueFull(bufferedIterator.rateLimiter)) {
@@ -145,7 +149,7 @@ public class TestBufferedIterator {
// stops and throws // stops and throws
// correct exception back. // correct exception back.
BufferedIterator bufferedIterator1 = new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, BufferedIterator bufferedIterator1 = new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes,
HoodieTestDataGenerator.avroSchema); LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema));
Future<Boolean> result = recordReader.submit(() -> { Future<Boolean> result = recordReader.submit(() -> {
bufferedIterator1.startBuffering(); bufferedIterator1.startBuffering();
return true; return true;
@@ -173,7 +177,7 @@ public class TestBufferedIterator {
when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
BufferedIterator bufferedIterator2 = new BufferedIterator(mockHoodieRecordsIterator, memoryLimitInBytes, BufferedIterator bufferedIterator2 = new BufferedIterator(mockHoodieRecordsIterator, memoryLimitInBytes,
HoodieTestDataGenerator.avroSchema); LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema));
Future<Boolean> result2 = recordReader.submit(() -> { Future<Boolean> result2 = recordReader.submit(() -> {
bufferedIterator2.startBuffering(); bufferedIterator2.startBuffering();
return true; return true;

View File

@@ -0,0 +1,79 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.func;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.config.HoodieWriteConfig;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestBufferedIteratorExecutor {
private final HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
private final String commitTime = HoodieActiveTimeline.createNewCommitTime();
private ExecutorService executorService = null;
@Before
public void beforeTest() {
this.executorService = Executors.newFixedThreadPool(1);
}
@After
public void afterTest() {
if (this.executorService != null) {
this.executorService.shutdownNow();
this.executorService = null;
}
}
@Test
public void testExecutor() throws Exception {
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, 100);
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
BufferedIteratorExecutor bufferedIteratorExecutor = new BufferedIteratorExecutor(hoodieWriteConfig,
hoodieRecords.iterator(), LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema),
executorService);
Function<BufferedIterator, Integer> function = (bufferedIterator) -> {
Integer count = 0;
while (bufferedIterator.hasNext()) {
count++;
bufferedIterator.next();
}
return count;
};
Future<Integer> future = bufferedIteratorExecutor.start(function);
// It should buffer and write 100 records
Assert.assertEquals((int) future.get(), 100);
// There should be no remaining records in the buffer
Assert.assertFalse(bufferedIteratorExecutor.isRemaining());
}
}

View File

@@ -0,0 +1,61 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.func;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import org.apache.parquet.hadoop.ParquetReader;
import org.junit.Assert;
import org.junit.Test;
public class TestParquetReaderIterator {
@Test
public void testParquetIteratorIdempotency() throws IOException {
ParquetReader reader = mock(ParquetReader.class);
// only 1 record in reader
when(reader.read()).thenReturn(1).thenReturn(null);
ParquetReaderIterator<Integer> iterator = new ParquetReaderIterator<>(reader);
int idempotencyCheckCounter = 0;
// call hasNext() 3 times
while (idempotencyCheckCounter < 3) {
Assert.assertTrue(iterator.hasNext());
idempotencyCheckCounter++;
}
}
@Test
public void testParquetIterator() throws IOException {
ParquetReader reader = mock(ParquetReader.class);
// only one record to read
when(reader.read()).thenReturn(1).thenReturn(null);
ParquetReaderIterator<Integer> iterator = new ParquetReaderIterator<>(reader);
// should return value even though hasNext() hasn't been called
Assert.assertTrue(iterator.next() == 1);
// no more entries to iterate on
Assert.assertFalse(iterator.hasNext());
try {
iterator.next();
} catch (HoodieIOException e) {
// should throw an exception since there is only 1 record
}
}
}

View File

@@ -26,6 +26,7 @@ import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
@@ -94,7 +95,9 @@ public final class DiskBasedMap<T, R> implements Map<T, R> {
} }
writeOnlyFileHandle.createNewFile(); writeOnlyFileHandle.createNewFile();
log.info("Spilling to file location " + writeOnlyFileHandle.getAbsolutePath()); log.info(
"Spilling to file location " + writeOnlyFileHandle.getAbsolutePath() + " in host (" + InetAddress.getLocalHost()
.getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName() + ")");
// Open file in readFromDisk-only mode // Open file in readFromDisk-only mode
readOnlyFileHandle = new RandomAccessFile(filePath, "r"); readOnlyFileHandle = new RandomAccessFile(filePath, "r");
readOnlyFileHandle.seek(0); readOnlyFileHandle.seek(0);