Spawning parallel writer thread to separate reading records from spark and writing records to parquet file
This commit is contained in:
committed by
vinoth chandar
parent
9dff8c2326
commit
c5b4cb1b75
@@ -25,6 +25,7 @@ import com.uber.hoodie.index.HoodieIndex;
|
|||||||
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
|
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
|
||||||
import com.uber.hoodie.metrics.MetricsReporterType;
|
import com.uber.hoodie.metrics.MetricsReporterType;
|
||||||
import org.apache.spark.storage.StorageLevel;
|
import org.apache.spark.storage.StorageLevel;
|
||||||
|
|
||||||
import javax.annotation.concurrent.Immutable;
|
import javax.annotation.concurrent.Immutable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
@@ -46,6 +47,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
|
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
|
||||||
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
|
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
|
||||||
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
|
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
|
||||||
|
private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
|
||||||
|
private static final String DEFAULT_WRITE_BUFFER_LIMIT_BYTES = String.valueOf(4*1024*1024);
|
||||||
private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
|
private static final String COMBINE_BEFORE_INSERT_PROP = "hoodie.combine.before.insert";
|
||||||
private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
|
private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
|
||||||
private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
|
private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
|
||||||
@@ -104,6 +107,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM));
|
return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getWriteBufferLimitBytes() {
|
||||||
|
return Integer.parseInt(props.getProperty(WRITE_BUFFER_LIMIT_BYTES, DEFAULT_WRITE_BUFFER_LIMIT_BYTES));
|
||||||
|
}
|
||||||
|
|
||||||
public boolean shouldCombineBeforeInsert() {
|
public boolean shouldCombineBeforeInsert() {
|
||||||
return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_INSERT_PROP));
|
return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_INSERT_PROP));
|
||||||
}
|
}
|
||||||
@@ -391,6 +398,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
|
||||||
|
props.setProperty(WRITE_BUFFER_LIMIT_BYTES, String.valueOf(writeBufferLimit));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder combineInput(boolean onInsert, boolean onUpsert) {
|
public Builder combineInput(boolean onInsert, boolean onUpsert) {
|
||||||
props.setProperty(COMBINE_BEFORE_INSERT_PROP, String.valueOf(onInsert));
|
props.setProperty(COMBINE_BEFORE_INSERT_PROP, String.valueOf(onInsert));
|
||||||
props.setProperty(COMBINE_BEFORE_UPSERT_PROP, String.valueOf(onUpsert));
|
props.setProperty(COMBINE_BEFORE_UPSERT_PROP, String.valueOf(onUpsert));
|
||||||
|
|||||||
@@ -0,0 +1,221 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.func;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
|
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||||
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.spark.util.SizeEstimator;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for buffering input records. Buffer limit is controlled by {@link #bufferMemoryLimit}. It internally samples
|
||||||
|
* every {@link #RECORD_SAMPLING_RATE}th record and adjusts number of records in buffer accordingly. This is done to
|
||||||
|
* ensure that we don't OOM.
|
||||||
|
*/
|
||||||
|
public class BufferedIterator<K extends HoodieRecordPayload, T extends HoodieRecord<K>>
|
||||||
|
implements Iterator<BufferedIterator.BufferedIteratorPayload<T>> {
|
||||||
|
|
||||||
|
private static Logger logger = LogManager.getLogger(BufferedIterator.class);
|
||||||
|
// interval used for polling records in the queue.
|
||||||
|
public static final int RECORD_POLL_INTERVAL_SEC = 5;
|
||||||
|
// rate used for sampling records to determine avg record size in bytes.
|
||||||
|
public static final int RECORD_SAMPLING_RATE = 64;
|
||||||
|
// maximum records that will be cached
|
||||||
|
private static final int RECORD_CACHING_LIMIT = 128 * 1024;
|
||||||
|
// It indicates number of records to cache. We will be using sampled record's average size to determine how many
|
||||||
|
// records we should cache and will change (increase/decrease) permits accordingly.
|
||||||
|
@VisibleForTesting
|
||||||
|
public final Semaphore rateLimiter = new Semaphore(1);
|
||||||
|
// used for sampling records with "RECORD_SAMPLING_RATE" frequency.
|
||||||
|
public final AtomicLong samplingRecordCounter = new AtomicLong(-1);
|
||||||
|
// indicates rate limit (number of records to cache). it is updated whenever there is a change in avg record size.
|
||||||
|
@VisibleForTesting
|
||||||
|
public int currentRateLimit = 1;
|
||||||
|
// internal buffer to cache buffered records.
|
||||||
|
private final LinkedBlockingQueue<Optional<BufferedIteratorPayload<T>>> buffer = new LinkedBlockingQueue<>();
|
||||||
|
// maximum amount of memory to be used for buffering records.
|
||||||
|
private final long bufferMemoryLimit;
|
||||||
|
// indicates avg record size in bytes. It is updated whenever a new record is sampled.
|
||||||
|
@VisibleForTesting
|
||||||
|
public long avgRecordSizeInBytes = 0;
|
||||||
|
// indicates number of samples collected so far.
|
||||||
|
private long numSamples = 0;
|
||||||
|
// original iterator from where records are read for buffering.
|
||||||
|
private final Iterator<T> inputIterator;
|
||||||
|
// it holds the root cause of the exception in case either buffering records (reading from inputIterator) fails or
|
||||||
|
// thread reading records from buffer fails.
|
||||||
|
private final AtomicReference<Exception> hasFailed = new AtomicReference(null);
|
||||||
|
// used for indicating that all the records from buffer are read successfully.
|
||||||
|
private final AtomicBoolean isDone = new AtomicBoolean(false);
|
||||||
|
// next record to be read from buffer.
|
||||||
|
private BufferedIteratorPayload<T> nextRecord;
|
||||||
|
// schema used for fetching insertValue from HoodieRecord.
|
||||||
|
private final Schema schema;
|
||||||
|
|
||||||
|
public BufferedIterator(final Iterator<T> iterator, final long bufferMemoryLimit, final Schema schema) {
|
||||||
|
this.inputIterator = iterator;
|
||||||
|
this.bufferMemoryLimit = bufferMemoryLimit;
|
||||||
|
this.schema = schema;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int size() {
|
||||||
|
return this.buffer.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
// It samples records with "RECORD_SAMPLING_RATE" frequency and computes average record size in bytes. It is used
|
||||||
|
// for determining how many maximum records to buffer. Based on change in avg size it may increase or decrease
|
||||||
|
// available permits.
|
||||||
|
private void adjustBufferSizeIfNeeded(final T record) throws InterruptedException {
|
||||||
|
if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final long recordSizeInBytes = SizeEstimator.estimate(record);
|
||||||
|
final long newAvgRecordSizeInBytes =
|
||||||
|
Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
|
||||||
|
final int newRateLimit =
|
||||||
|
(int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.bufferMemoryLimit / newAvgRecordSizeInBytes));
|
||||||
|
// System.out.println("recordSizeInBytes:" + recordSizeInBytes + ":newAvgRecordSizeInBytes:" + newAvgRecordSizeInBytes
|
||||||
|
// + ":newRateLimit:" + newRateLimit + ":currentRateLimit:" + currentRateLimit + ":numSamples:" + numSamples
|
||||||
|
// + ":avgRecordSizeInBytes:" + avgRecordSizeInBytes);
|
||||||
|
|
||||||
|
// If there is any change in number of records to cache then we will either release (if it increased) or acquire
|
||||||
|
// (if it decreased) to adjust rate limiting to newly computed value.
|
||||||
|
if (newRateLimit > currentRateLimit) {
|
||||||
|
rateLimiter.release(newRateLimit - currentRateLimit);
|
||||||
|
} else if (newRateLimit < currentRateLimit) {
|
||||||
|
rateLimiter.acquire(currentRateLimit - newRateLimit);
|
||||||
|
}
|
||||||
|
currentRateLimit = newRateLimit;
|
||||||
|
avgRecordSizeInBytes = newAvgRecordSizeInBytes;
|
||||||
|
numSamples++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// inserts record into internal buffer. It also fetches insert value from the record to offload computation work on to
|
||||||
|
// buffering thread.
|
||||||
|
private void insertRecord(T t) throws Exception {
|
||||||
|
rateLimiter.acquire();
|
||||||
|
adjustBufferSizeIfNeeded(t);
|
||||||
|
// We are retrieving insert value in the record buffering thread to offload computation around schema validation
|
||||||
|
// and record creation to it.
|
||||||
|
final BufferedIteratorPayload<T> payload = new BufferedIteratorPayload<>(t, this.schema);
|
||||||
|
buffer.put(Optional.of(payload));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readNextRecord() {
|
||||||
|
rateLimiter.release();
|
||||||
|
Optional<BufferedIteratorPayload<T>> newRecord;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
throwExceptionIfFailed();
|
||||||
|
newRecord = buffer.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
|
||||||
|
if (newRecord != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.error("error reading records from BufferedIterator", e);
|
||||||
|
throw new HoodieException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (newRecord.isPresent()) {
|
||||||
|
this.nextRecord = newRecord.get();
|
||||||
|
} else {
|
||||||
|
// We are done reading all the records from internal iterator.
|
||||||
|
this.isDone.set(true);
|
||||||
|
this.nextRecord = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void startBuffering() throws Exception {
|
||||||
|
logger.info("starting to buffer records");
|
||||||
|
try {
|
||||||
|
while (inputIterator.hasNext()) {
|
||||||
|
// We need to stop buffering if buffer-reader has failed and exited.
|
||||||
|
throwExceptionIfFailed();
|
||||||
|
insertRecord(inputIterator.next());
|
||||||
|
}
|
||||||
|
// done buffering records notifying buffer-reader.
|
||||||
|
buffer.put(Optional.empty());
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("error buffering records", e);
|
||||||
|
// Used for notifying buffer-reader thread of the failed operation.
|
||||||
|
markAsFailed(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
logger.info("finished buffering records");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
if (this.nextRecord == null && !this.isDone.get()) {
|
||||||
|
readNextRecord();
|
||||||
|
}
|
||||||
|
return !this.isDone.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BufferedIteratorPayload<T> next() {
|
||||||
|
Preconditions.checkState(hasNext() && this.nextRecord != null);
|
||||||
|
final BufferedIteratorPayload<T> ret = this.nextRecord;
|
||||||
|
this.nextRecord = null;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void throwExceptionIfFailed() {
|
||||||
|
if (this.hasFailed.get() != null) {
|
||||||
|
throw new HoodieException("operation has failed", this.hasFailed.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void markAsFailed(Exception e) {
|
||||||
|
this.hasFailed.set(e);
|
||||||
|
// release the permits so that if the buffering thread is waiting for permits then it will get it.
|
||||||
|
this.rateLimiter.release(RECORD_CACHING_LIMIT + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
|
||||||
|
static class BufferedIteratorPayload<T extends HoodieRecord> {
|
||||||
|
public T record;
|
||||||
|
public Optional<IndexedRecord> insertValue;
|
||||||
|
// It caches the exception seen while fetching insert value.
|
||||||
|
public Optional<Exception> exception = Optional.empty();
|
||||||
|
|
||||||
|
public BufferedIteratorPayload(T record, Schema schema) {
|
||||||
|
this.record = record;
|
||||||
|
try {
|
||||||
|
this.insertValue = record.getData().getInsertValue(schema);
|
||||||
|
} catch (Exception e) {
|
||||||
|
this.exception = Optional.of(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -20,15 +20,24 @@ import com.uber.hoodie.WriteStatus;
|
|||||||
import com.uber.hoodie.common.model.HoodieRecord;
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
import com.uber.hoodie.io.HoodieCreateHandle;
|
import com.uber.hoodie.io.HoodieCreateHandle;
|
||||||
import com.uber.hoodie.io.HoodieIOHandle;
|
import com.uber.hoodie.io.HoodieIOHandle;
|
||||||
import com.uber.hoodie.table.HoodieTable;
|
import com.uber.hoodie.table.HoodieTable;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.spark.TaskContext;
|
||||||
|
import org.apache.spark.TaskContext$;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.spark.TaskContext;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
|
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
|
||||||
@@ -37,6 +46,7 @@ import org.apache.spark.TaskContext;
|
|||||||
public class LazyInsertIterable<T extends HoodieRecordPayload> extends
|
public class LazyInsertIterable<T extends HoodieRecordPayload> extends
|
||||||
LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
|
LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
|
||||||
|
|
||||||
|
private static Logger logger = LogManager.getLogger(LazyInsertIterable.class);
|
||||||
private final HoodieWriteConfig hoodieConfig;
|
private final HoodieWriteConfig hoodieConfig;
|
||||||
private final String commitTime;
|
private final String commitTime;
|
||||||
private final HoodieTable<T> hoodieTable;
|
private final HoodieTable<T> hoodieTable;
|
||||||
@@ -56,57 +66,95 @@ public class LazyInsertIterable<T extends HoodieRecordPayload> extends
|
|||||||
protected void start() {
|
protected void start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<WriteStatus> computeNext() {
|
protected List<WriteStatus> computeNext() {
|
||||||
List<WriteStatus> statuses = new ArrayList<>();
|
// Need to set current spark thread's TaskContext into newly launched thread so that new thread can access
|
||||||
|
// TaskContext properties.
|
||||||
|
final TaskContext sparkThreadTaskContext = TaskContext.get();
|
||||||
|
// Executor service used for launching writer thread.
|
||||||
|
final ExecutorService writerService = Executors.newFixedThreadPool(1);
|
||||||
|
try {
|
||||||
|
// Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
|
||||||
|
final BufferedIterator<T, HoodieRecord<T>> bufferedIterator =
|
||||||
|
new BufferedIterator<>(inputItr, hoodieConfig.getWriteBufferLimitBytes(),
|
||||||
|
HoodieIOHandle.createHoodieWriteSchema(hoodieConfig));
|
||||||
|
Future<List<WriteStatus>> writerResult =
|
||||||
|
writerService.submit(
|
||||||
|
() -> {
|
||||||
|
logger.info("starting hoodie writer thread");
|
||||||
|
// Passing parent thread's TaskContext to newly launched thread for it to access original TaskContext
|
||||||
|
// properties.
|
||||||
|
TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext);
|
||||||
|
List<WriteStatus> statuses = new LinkedList<>();
|
||||||
|
try {
|
||||||
|
statuses.addAll(handleWrite(bufferedIterator));
|
||||||
|
logger.info("hoodie write is done; notifying reader thread");
|
||||||
|
return statuses;
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("error writing hoodie records", e);
|
||||||
|
bufferedIterator.markAsFailed(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Buffering records into internal buffer. This can throw exception either if reading records from spark fails or
|
||||||
|
// if writing buffered records into parquet file fails.
|
||||||
|
bufferedIterator.startBuffering();
|
||||||
|
logger.info("waiting for hoodie write to finish");
|
||||||
|
final List<WriteStatus> result = writerResult.get();
|
||||||
|
assert result != null && !result.isEmpty() && !bufferedIterator.hasNext();
|
||||||
|
return result;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new HoodieException(e);
|
||||||
|
} finally {
|
||||||
|
writerService.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while (inputItr.hasNext()) {
|
private List<WriteStatus> handleWrite(final BufferedIterator<T, HoodieRecord<T>> bufferedIterator) {
|
||||||
HoodieRecord record = inputItr.next();
|
List<WriteStatus> statuses = new ArrayList<>();
|
||||||
|
while (bufferedIterator.hasNext()) {
|
||||||
|
final BufferedIterator.BufferedIteratorPayload<HoodieRecord<T>> payload = bufferedIterator.next();
|
||||||
|
|
||||||
// clean up any partial failures
|
// clean up any partial failures
|
||||||
if (!partitionsCleaned.contains(record.getPartitionPath())) {
|
if (!partitionsCleaned.contains(payload.record.getPartitionPath())) {
|
||||||
// This insert task could fail multiple times, but Spark will faithfully retry with
|
// This insert task could fail multiple times, but Spark will faithfully retry with
|
||||||
// the same data again. Thus, before we open any files under a given partition, we
|
// the same data again. Thus, before we open any files under a given partition, we
|
||||||
// first delete any files in the same partitionPath written by same Spark partition
|
// first delete any files in the same partitionPath written by same Spark partition
|
||||||
HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig,
|
HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig,
|
||||||
commitTime,
|
commitTime,
|
||||||
record.getPartitionPath(),
|
payload.record.getPartitionPath(),
|
||||||
TaskContext.getPartitionId(),
|
TaskContext.getPartitionId(),
|
||||||
hoodieTable);
|
hoodieTable);
|
||||||
partitionsCleaned.add(record.getPartitionPath());
|
partitionsCleaned.add(payload.record.getPartitionPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
// lazily initialize the handle, for the first time
|
// lazily initialize the handle, for the first time
|
||||||
if (handle == null) {
|
if (handle == null) {
|
||||||
handle =
|
handle =
|
||||||
new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable,
|
new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, payload.record.getPartitionPath());
|
||||||
record.getPartitionPath());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle.canWrite(record)) {
|
if (handle.canWrite(payload.record)) {
|
||||||
// write the record, if the handle has capacity
|
// write the payload, if the handle has capacity
|
||||||
handle.write(record);
|
handle.write(payload.record, payload.insertValue, payload.exception);
|
||||||
} else {
|
} else {
|
||||||
// handle is full.
|
// handle is full.
|
||||||
statuses.add(handle.close());
|
statuses.add(handle.close());
|
||||||
// Need to handle the rejected record & open new handle
|
// Need to handle the rejected payload & open new handle
|
||||||
handle =
|
handle =
|
||||||
new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable,
|
new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, payload.record.getPartitionPath());
|
||||||
record.getPartitionPath());
|
handle.write(payload.record, payload.insertValue, payload.exception); // we should be able to write 1 payload.
|
||||||
handle.write(record); // we should be able to write 1 record.
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we exited out, because we ran out of records, just close the pending handle.
|
// If we exited out, because we ran out of records, just close the pending handle.
|
||||||
if (!inputItr.hasNext()) {
|
if (!bufferedIterator.hasNext()) {
|
||||||
if (handle != null) {
|
if (handle != null) {
|
||||||
statuses.add(handle.close());
|
statuses.add(handle.close());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert statuses.size() > 0; // should never return empty statuses
|
assert statuses.size() > 0 && !bufferedIterator.hasNext(); // should never return empty statuses
|
||||||
return statuses;
|
return statuses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -149,8 +149,10 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
recordsDeleted++;
|
recordsDeleted++;
|
||||||
}
|
}
|
||||||
|
|
||||||
hoodieRecord.deflate();
|
|
||||||
writeStatus.markSuccess(hoodieRecord, recordMetadata);
|
writeStatus.markSuccess(hoodieRecord, recordMetadata);
|
||||||
|
// deflate record payload after recording success. This will help users access payload as a part of marking
|
||||||
|
// record successful.
|
||||||
|
hoodieRecord.deflate();
|
||||||
return avroRecord;
|
return avroRecord;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Error writing record " + hoodieRecord, e);
|
logger.error("Error writing record " + hoodieRecord, e);
|
||||||
|
|||||||
@@ -93,11 +93,15 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
/**
|
/**
|
||||||
* Perform the actual writing of the given record into the backing file.
|
* Perform the actual writing of the given record into the backing file.
|
||||||
*/
|
*/
|
||||||
public void write(HoodieRecord record) {
|
public void write(HoodieRecord record, Optional<IndexedRecord> insertValue,
|
||||||
|
Optional<Exception> getInsertValueException) {
|
||||||
Optional recordMetadata = record.getData().getMetadata();
|
Optional recordMetadata = record.getData().getMetadata();
|
||||||
try {
|
try {
|
||||||
Optional<IndexedRecord> avroRecord = record.getData().getInsertValue(schema);
|
// throws exception if there was any exception while fetching insert value
|
||||||
|
if (getInsertValueException.isPresent()) {
|
||||||
|
throw getInsertValueException.get();
|
||||||
|
}
|
||||||
|
Optional<IndexedRecord> avroRecord = insertValue;
|
||||||
if (avroRecord.isPresent()) {
|
if (avroRecord.isPresent()) {
|
||||||
storageWriter.writeAvroWithMetadata(avroRecord.get(), record);
|
storageWriter.writeAvroWithMetadata(avroRecord.get(), record);
|
||||||
// update the new location of record, so we know where to find it next
|
// update the new location of record, so we know where to find it next
|
||||||
@@ -106,8 +110,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
} else {
|
} else {
|
||||||
recordsDeleted++;
|
recordsDeleted++;
|
||||||
}
|
}
|
||||||
record.deflate();
|
|
||||||
status.markSuccess(record, recordMetadata);
|
status.markSuccess(record, recordMetadata);
|
||||||
|
// deflate record payload after recording success. This will help users access payload as a part of marking
|
||||||
|
// record successful.
|
||||||
|
record.deflate();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
// Not throwing exception from here, since we don't want to fail the entire job
|
// Not throwing exception from here, since we don't want to fail the entire job
|
||||||
// for a single record
|
// for a single record
|
||||||
|
|||||||
@@ -50,8 +50,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
|||||||
this.fs = hoodieTable.getMetaClient().getFs();
|
this.fs = hoodieTable.getMetaClient().getFs();
|
||||||
this.hoodieTable = hoodieTable;
|
this.hoodieTable = hoodieTable;
|
||||||
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
|
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
|
||||||
this.schema =
|
this.schema = createHoodieWriteSchema(config);
|
||||||
HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) {
|
public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) {
|
||||||
@@ -101,4 +100,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
|||||||
public Schema getSchema() {
|
public Schema getSchema() {
|
||||||
return schema;
|
return schema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Schema createHoodieWriteSchema(HoodieWriteConfig config) {
|
||||||
|
return HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -167,8 +167,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
recordsDeleted++;
|
recordsDeleted++;
|
||||||
}
|
}
|
||||||
|
|
||||||
hoodieRecord.deflate();
|
|
||||||
writeStatus.markSuccess(hoodieRecord, recordMetadata);
|
writeStatus.markSuccess(hoodieRecord, recordMetadata);
|
||||||
|
// deflate record payload after recording success. This will help users access payload as a part of marking
|
||||||
|
// record successful.
|
||||||
|
hoodieRecord.deflate();
|
||||||
return true;
|
return true;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Error writing record " + hoodieRecord, e);
|
logger.error("Error writing record " + hoodieRecord, e);
|
||||||
|
|||||||
@@ -0,0 +1,207 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.func;
|
||||||
|
|
||||||
|
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
||||||
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
|
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||||
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.spark.util.SizeEstimator;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TestBufferedIterator {
|
||||||
|
|
||||||
|
private final HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
|
||||||
|
private final String commitTime = HoodieActiveTimeline.createNewCommitTime();
|
||||||
|
private ExecutorService recordReader = null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void beforeTest() {
|
||||||
|
this.recordReader = Executors.newFixedThreadPool(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void afterTest() {
|
||||||
|
if (this.recordReader != null) {
|
||||||
|
this.recordReader.shutdownNow();
|
||||||
|
this.recordReader = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test to ensure that we are reading all records from buffered iterator in the same order without any exceptions.
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRecordReading() throws IOException, ExecutionException, InterruptedException {
|
||||||
|
final int numRecords = 128;
|
||||||
|
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords);
|
||||||
|
final BufferedIterator bufferedIterator =
|
||||||
|
new BufferedIterator(hoodieRecords.iterator(), FileUtils.ONE_KB, HoodieTestDataGenerator.avroSchema);
|
||||||
|
Future<Boolean> result =
|
||||||
|
recordReader.submit(
|
||||||
|
() -> {
|
||||||
|
bufferedIterator.startBuffering();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
final Iterator<HoodieRecord> originalRecordIterator = hoodieRecords.iterator();
|
||||||
|
int recordsRead = 0;
|
||||||
|
while (bufferedIterator.hasNext()) {
|
||||||
|
final HoodieRecord originalRecord = originalRecordIterator.next();
|
||||||
|
final Optional<IndexedRecord> originalInsertValue =
|
||||||
|
originalRecord.getData().getInsertValue(HoodieTestDataGenerator.avroSchema);
|
||||||
|
final BufferedIterator.BufferedIteratorPayload payload = bufferedIterator.next();
|
||||||
|
// Ensure that record ordering is guaranteed.
|
||||||
|
Assert.assertEquals(originalRecord, payload.record);
|
||||||
|
// cached insert value matches the expected insert value.
|
||||||
|
Assert.assertEquals(originalInsertValue, payload.insertValue);
|
||||||
|
recordsRead++;
|
||||||
|
}
|
||||||
|
Assert.assertFalse(bufferedIterator.hasNext() || originalRecordIterator.hasNext());
|
||||||
|
// all the records should be read successfully.
|
||||||
|
Assert.assertEquals(numRecords, recordsRead);
|
||||||
|
// should not throw any exceptions.
|
||||||
|
Assert.assertTrue(result.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test to ensure that record buffering is throttled when we hit memory limit.
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testMemoryLimitForBuffering() throws IOException, InterruptedException {
|
||||||
|
final int numRecords = 128;
|
||||||
|
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords);
|
||||||
|
// maximum number of records to keep in memory.
|
||||||
|
final int recordLimit = 5;
|
||||||
|
final long memoryLimitInBytes = recordLimit * SizeEstimator.estimate(hoodieRecords.get(0));
|
||||||
|
final BufferedIterator bufferedIterator =
|
||||||
|
new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, HoodieTestDataGenerator.avroSchema);
|
||||||
|
Future<Boolean> result =
|
||||||
|
recordReader.submit(
|
||||||
|
() -> {
|
||||||
|
bufferedIterator.startBuffering();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
// waiting for permits to expire.
|
||||||
|
while (!isQueueFull(bufferedIterator.rateLimiter)) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(0, bufferedIterator.rateLimiter.availablePermits());
|
||||||
|
Assert.assertEquals(recordLimit, bufferedIterator.currentRateLimit);
|
||||||
|
Assert.assertEquals(recordLimit, bufferedIterator.size());
|
||||||
|
Assert.assertEquals(recordLimit - 1, bufferedIterator.samplingRecordCounter.get());
|
||||||
|
|
||||||
|
// try to read 2 records.
|
||||||
|
Assert.assertEquals(hoodieRecords.get(0), bufferedIterator.next().record);
|
||||||
|
Assert.assertEquals(hoodieRecords.get(1), bufferedIterator.next().record);
|
||||||
|
|
||||||
|
// waiting for permits to expire.
|
||||||
|
while (!isQueueFull(bufferedIterator.rateLimiter)) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
// No change is expected in rate limit or number of buffered records. We only expect buffering thread to read
|
||||||
|
// 2 more records into the buffer.
|
||||||
|
Assert.assertEquals(0, bufferedIterator.rateLimiter.availablePermits());
|
||||||
|
Assert.assertEquals(recordLimit, bufferedIterator.currentRateLimit);
|
||||||
|
Assert.assertEquals(recordLimit, bufferedIterator.size());
|
||||||
|
Assert.assertEquals(recordLimit - 1 + 2, bufferedIterator.samplingRecordCounter.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test to ensure that exception in either buffering thread or BufferedIterator-reader thread is propagated to
|
||||||
|
// another thread.
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testException() throws IOException, InterruptedException {
|
||||||
|
final int numRecords = 256;
|
||||||
|
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords);
|
||||||
|
// buffer memory limit
|
||||||
|
final long memoryLimitInBytes = 4 * SizeEstimator.estimate(hoodieRecords.get(0));
|
||||||
|
|
||||||
|
// first let us throw exception from bufferIterator reader and test that buffering thread stops and throws
|
||||||
|
// correct exception back.
|
||||||
|
BufferedIterator bufferedIterator1 =
|
||||||
|
new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, HoodieTestDataGenerator.avroSchema);
|
||||||
|
Future<Boolean> result =
|
||||||
|
recordReader.submit(
|
||||||
|
() -> {
|
||||||
|
bufferedIterator1.startBuffering();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
// waiting for permits to expire.
|
||||||
|
while (!isQueueFull(bufferedIterator1.rateLimiter)) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
|
// notify buffering thread of an exception and ensure that it exits.
|
||||||
|
final Exception e = new Exception("Failing it :)");
|
||||||
|
bufferedIterator1.markAsFailed(e);
|
||||||
|
try {
|
||||||
|
result.get();
|
||||||
|
Assert.fail("exception is expected");
|
||||||
|
} catch (ExecutionException e1) {
|
||||||
|
Assert.assertEquals(HoodieException.class, e1.getCause().getClass());
|
||||||
|
Assert.assertEquals(e, e1.getCause().getCause());
|
||||||
|
}
|
||||||
|
|
||||||
|
// second let us raise an exception while doing record buffering. this exception should get propagated to
|
||||||
|
// buffered iterator reader.
|
||||||
|
final RuntimeException expectedException = new RuntimeException("failing record reading");
|
||||||
|
final Iterator<HoodieRecord> mockHoodieRecordsIterator = mock(Iterator.class);
|
||||||
|
when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
|
||||||
|
when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
|
||||||
|
BufferedIterator bufferedIterator2 =
|
||||||
|
new BufferedIterator(mockHoodieRecordsIterator, memoryLimitInBytes, HoodieTestDataGenerator.avroSchema);
|
||||||
|
Future<Boolean> result2 =
|
||||||
|
recordReader.submit(
|
||||||
|
() -> {
|
||||||
|
bufferedIterator2.startBuffering();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
bufferedIterator2.hasNext();
|
||||||
|
Assert.fail("exception is expected");
|
||||||
|
} catch (Exception e1) {
|
||||||
|
Assert.assertEquals(expectedException, e1.getCause());
|
||||||
|
}
|
||||||
|
// buffering thread should also have exited. make sure that it is not running.
|
||||||
|
try {
|
||||||
|
result2.get();
|
||||||
|
Assert.fail("exception is expected");
|
||||||
|
} catch (ExecutionException e2) {
|
||||||
|
Assert.assertEquals(expectedException, e2.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isQueueFull(Semaphore rateLimiter) {
|
||||||
|
return (rateLimiter.availablePermits() == 0 && rateLimiter.hasQueuedThreads());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user