diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIterator.java b/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIterator.java index e69c09b1a..586164084 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIterator.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIterator.java @@ -18,8 +18,6 @@ package com.uber.hoodie.func; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.uber.hoodie.common.model.HoodieRecord; -import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.exception.HoodieException; import java.util.Iterator; import java.util.Optional; @@ -29,8 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; +import java.util.function.Function; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.util.SizeEstimator; @@ -39,9 +36,11 @@ import org.apache.spark.util.SizeEstimator; * Used for buffering input records. Buffer limit is controlled by {@link #bufferMemoryLimit}. It * internally samples every {@link #RECORD_SAMPLING_RATE}th record and adjusts number of records in * buffer accordingly. This is done to ensure that we don't OOM. + * + * @param input payload data type + * @param output payload data type */ -public class BufferedIterator> implements - Iterator> { +public class BufferedIterator implements Iterator { // interval used for polling records in the queue. public static final int RECORD_POLL_INTERVAL_SEC = 5; @@ -58,20 +57,18 @@ public class BufferedIterator>> buffer = new + private final LinkedBlockingQueue> buffer = new LinkedBlockingQueue<>(); // maximum amount of memory to be used for buffering records. private final long bufferMemoryLimit; // original iterator from where records are read for buffering. - private final Iterator inputIterator; + private final Iterator inputIterator; // it holds the root cause of the exception in case either buffering records (reading from // inputIterator) fails or // thread reading records from buffer fails. private final AtomicReference hasFailed = new AtomicReference(null); // used for indicating that all the records from buffer are read successfully. private final AtomicBoolean isDone = new AtomicBoolean(false); - // schema used for fetching insertValue from HoodieRecord. - private final Schema schema; // indicates rate limit (number of records to cache). it is updated whenever there is a change // in avg record size. @VisibleForTesting @@ -82,13 +79,15 @@ public class BufferedIterator nextRecord; + private O nextRecord; + // Function to transform the input payload to the expected output payload + private Function bufferedIteratorTransform; - public BufferedIterator(final Iterator iterator, final long bufferMemoryLimit, - final Schema schema) { + public BufferedIterator(final Iterator iterator, final long bufferMemoryLimit, + final Function bufferedIteratorTransform) { this.inputIterator = iterator; this.bufferMemoryLimit = bufferMemoryLimit; - this.schema = schema; + this.bufferedIteratorTransform = bufferedIteratorTransform; } @VisibleForTesting @@ -101,7 +100,7 @@ public class BufferedIterator currentRateLimit) { rateLimiter.release(newRateLimit - currentRateLimit); @@ -132,19 +125,19 @@ public class BufferedIterator payload = new BufferedIteratorPayload<>(t, this.schema); + final O payload = bufferedIteratorTransform.apply(t); buffer.put(Optional.of(payload)); } private void readNextRecord() { rateLimiter.release(); - Optional> newRecord; + Optional newRecord; while (true) { try { throwExceptionIfFailed(); @@ -194,9 +187,9 @@ public class BufferedIterator next() { + public O next() { Preconditions.checkState(hasNext() && this.nextRecord != null); - final BufferedIteratorPayload ret = this.nextRecord; + final O ret = this.nextRecord; this.nextRecord = null; return ret; } @@ -213,23 +206,4 @@ public class BufferedIterator { - - public T record; - public Optional insertValue; - // It caches the exception seen while fetching insert value. - public Optional exception = Optional.empty(); - - public BufferedIteratorPayload(T record, Schema schema) { - this.record = record; - try { - this.insertValue = record.getData().getInsertValue(schema); - } catch (Exception e) { - this.exception = Optional.of(e); - } - } - } } \ No newline at end of file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIteratorExecutor.java b/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIteratorExecutor.java new file mode 100644 index 000000000..fea0f28bb --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/BufferedIteratorExecutor.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.uber.hoodie.func; + +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieException; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Function; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; + +/** + * Executor for a BufferedIterator operation. This class takes as input the input iterator which + * needs to be buffered, the runnable function that needs to be executed in the reader thread and + * return the transformed output based on the writer function + */ +public class BufferedIteratorExecutor { + + private static Logger logger = LogManager.getLogger(BufferedIteratorExecutor.class); + + // Executor service used for launching writer thread. + final ExecutorService writerService; + // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES. + final BufferedIterator bufferedIterator; + // Need to set current spark thread's TaskContext into newly launched thread so that new + // thread can access + // TaskContext properties. + final TaskContext sparkThreadTaskContext; + + public BufferedIteratorExecutor(final HoodieWriteConfig hoodieConfig, final Iterator inputItr, + final Function bufferedIteratorTransform, + final ExecutorService writerService) { + this.sparkThreadTaskContext = TaskContext.get(); + this.writerService = writerService; + this.bufferedIterator = new BufferedIterator<>(inputItr, hoodieConfig.getWriteBufferLimitBytes(), + bufferedIteratorTransform); + } + + /** + * Starts buffering and executing the writer function + */ + public Future start(Function writerFunction) { + try { + Future future = writerService.submit( + () -> { + logger.info("starting hoodie writer thread"); + // Passing parent thread's TaskContext to newly launched thread for it to access original TaskContext + // properties. + TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext); + try { + E result = writerFunction.apply(bufferedIterator); + logger.info("hoodie write is done; notifying reader thread"); + return result; + } catch (Exception e) { + logger.error("error writing hoodie records", e); + bufferedIterator.markAsFailed(e); + throw e; + } + }); + bufferedIterator.startBuffering(); + return future; + } catch (Exception e) { + throw new HoodieException(e); + } + } + + public boolean isRemaining() { + return bufferedIterator.hasNext(); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java index 6e1ae02c4..36aba5305 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/LazyInsertIterable.java @@ -21,6 +21,8 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.func.payload.AbstractBufferedIteratorPayload; +import com.uber.hoodie.func.payload.HoodieRecordBufferedIteratorPayload; import com.uber.hoodie.io.HoodieCreateHandle; import com.uber.hoodie.io.HoodieIOHandle; import com.uber.hoodie.table.HoodieTable; @@ -29,14 +31,15 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import java.util.function.Function; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; import org.apache.spark.TaskContext; -import org.apache.spark.TaskContext$; /** * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new @@ -45,7 +48,6 @@ import org.apache.spark.TaskContext$; public class LazyInsertIterable extends LazyIterableIterator, List> { - private static Logger logger = LogManager.getLogger(LazyInsertIterable.class); private final HoodieWriteConfig hoodieConfig; private final String commitTime; private final HoodieTable hoodieTable; @@ -65,44 +67,35 @@ public class LazyInsertIterable extends protected void start() { } + /** + * Transformer function to help transform a HoodieRecord. This transformer is used by BufferedIterator to offload some + * expensive operations of transformation to the reader thread. + * @param schema + * @param + * @return + */ + public static Function, AbstractBufferedIteratorPayload> + bufferedItrPayloadTransform(Schema schema) { + return (hoodieRecord) -> new HoodieRecordBufferedIteratorPayload(hoodieRecord, schema); + } + @Override protected List computeNext() { - // Need to set current spark thread's TaskContext into newly launched thread so that new - // thread can access - // TaskContext properties. - final TaskContext sparkThreadTaskContext = TaskContext.get(); // Executor service used for launching writer thread. final ExecutorService writerService = Executors.newFixedThreadPool(1); try { - // Used for buffering records which is controlled by - // HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES. - final BufferedIterator> bufferedIterator = new BufferedIterator<>(inputItr, - hoodieConfig.getWriteBufferLimitBytes(), - HoodieIOHandle.createHoodieWriteSchema(hoodieConfig)); - Future> writerResult = writerService.submit(() -> { - logger.info("starting hoodie writer thread"); - // Passing parent thread's TaskContext to newly launched thread for it to access original - // TaskContext - // properties. - TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext); + Function> function = (bufferedIterator) -> { List statuses = new LinkedList<>(); - try { - statuses.addAll(handleWrite(bufferedIterator)); - logger.info("hoodie write is done; notifying reader thread"); - return statuses; - } catch (Exception e) { - logger.error("error writing hoodie records", e); - bufferedIterator.markAsFailed(e); - throw e; - } - }); - // Buffering records into internal buffer. This can throw exception either if reading - // records from spark fails or - // if writing buffered records into parquet file fails. - bufferedIterator.startBuffering(); - logger.info("waiting for hoodie write to finish"); + statuses.addAll(handleWrite(bufferedIterator)); + return statuses; + }; + BufferedIteratorExecutor, AbstractBufferedIteratorPayload, List> + bufferedIteratorExecutor = new BufferedIteratorExecutor(hoodieConfig, inputItr, + bufferedItrPayloadTransform(HoodieIOHandle.createHoodieWriteSchema(hoodieConfig)), + writerService); + Future> writerResult = bufferedIteratorExecutor.start(function); final List result = writerResult.get(); - assert result != null && !result.isEmpty() && !bufferedIterator.hasNext(); + assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; } catch (Exception e) { throw new HoodieException(e); @@ -112,38 +105,38 @@ public class LazyInsertIterable extends } private List handleWrite( - final BufferedIterator> bufferedIterator) { + final BufferedIterator, AbstractBufferedIteratorPayload> bufferedIterator) { List statuses = new ArrayList<>(); while (bufferedIterator.hasNext()) { - final BufferedIterator.BufferedIteratorPayload> payload = bufferedIterator + final HoodieRecordBufferedIteratorPayload payload = (HoodieRecordBufferedIteratorPayload) bufferedIterator .next(); - + final HoodieRecord insertPayload = (HoodieRecord) payload.getInputPayload(); // clean up any partial failures - if (!partitionsCleaned.contains(payload.record.getPartitionPath())) { + if (!partitionsCleaned + .contains(insertPayload.getPartitionPath())) { // This insert task could fail multiple times, but Spark will faithfully retry with // the same data again. Thus, before we open any files under a given partition, we // first delete any files in the same partitionPath written by same Spark partition - HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig, commitTime, - payload.record.getPartitionPath(), TaskContext.getPartitionId(), hoodieTable); - partitionsCleaned.add(payload.record.getPartitionPath()); + HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig, commitTime, insertPayload.getPartitionPath(), + TaskContext.getPartitionId(), hoodieTable); + partitionsCleaned.add(insertPayload.getPartitionPath()); } // lazily initialize the handle, for the first time if (handle == null) { - handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, - payload.record.getPartitionPath()); + handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath()); } - if (handle.canWrite(payload.record)) { + if (handle.canWrite(((HoodieRecord) payload.getInputPayload()))) { // write the payload, if the handle has capacity - handle.write(payload.record, payload.insertValue, payload.exception); + handle.write(insertPayload, (Optional) payload.getOutputPayload(), payload.exception); } else { // handle is full. statuses.add(handle.close()); // Need to handle the rejected payload & open new handle - handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, - payload.record.getPartitionPath()); - handle.write(payload.record, payload.insertValue, + handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath()); + handle.write(insertPayload, + (Optional) payload.getOutputPayload(), payload.exception); // we should be able to write 1 payload. } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/ParquetReaderIterator.java b/hoodie-client/src/main/java/com/uber/hoodie/func/ParquetReaderIterator.java new file mode 100644 index 000000000..ed81da035 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/ParquetReaderIterator.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.func; + +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import java.util.Iterator; +import org.apache.parquet.hadoop.ParquetReader; + +/** + * This class wraps a parquet reader and provides an iterator based api to + * read from a parquet file. This is used in {@link BufferedIterator} + */ +public class ParquetReaderIterator implements Iterator { + + // Parquet reader for an existing parquet file + private final ParquetReader parquetReader; + // Holds the next entry returned by the parquet reader + private T next; + + public ParquetReaderIterator(ParquetReader parquetReader) { + this.parquetReader = parquetReader; + } + + @Override + public boolean hasNext() { + try { + // To handle when hasNext() is called multiple times for idempotency and/or the first time + if (this.next == null) { + this.next = parquetReader.read(); + } + return this.next != null; + } catch (IOException io) { + throw new HoodieIOException("unable to read next record from parquet file ", io); + } + } + + @Override + public T next() { + try { + // To handle case when next() is called before hasNext() + if (this.next == null) { + if (!hasNext()) { + throw new HoodieIOException("No more records left to read from parquet file"); + } + } + T retVal = this.next; + this.next = parquetReader.read(); + return retVal; + } catch (IOException io) { + throw new HoodieIOException("unable to read next record from parquet file ", io); + } + } + + public void close() throws IOException { + parquetReader.close(); + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/payload/AbstractBufferedIteratorPayload.java b/hoodie-client/src/main/java/com/uber/hoodie/func/payload/AbstractBufferedIteratorPayload.java new file mode 100644 index 000000000..4a7e32224 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/payload/AbstractBufferedIteratorPayload.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.func.payload; + +/** + * @param Input data type for BufferedIterator + * @param Output data type for BufferedIterator + */ +public abstract class AbstractBufferedIteratorPayload { + + // input payload for iterator + protected I inputPayload; + // output payload for iterator, this is used in cases where the output payload is computed + // from the input payload and most of this computation is off-loaded to the reader + protected O outputPayload; + + public AbstractBufferedIteratorPayload(I record) { + this.inputPayload = record; + } + + public I getInputPayload() { + return inputPayload; + } + + public O getOutputPayload() { + return outputPayload; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/payload/GenericRecordBufferedIteratorPayload.java b/hoodie-client/src/main/java/com/uber/hoodie/func/payload/GenericRecordBufferedIteratorPayload.java new file mode 100644 index 000000000..9d934d9ea --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/payload/GenericRecordBufferedIteratorPayload.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.func.payload; + +import org.apache.avro.generic.GenericRecord; + +/** + * BufferedIteratorPayload that takes GenericRecord as input and GenericRecord as output + */ +public class GenericRecordBufferedIteratorPayload + extends AbstractBufferedIteratorPayload { + + public GenericRecordBufferedIteratorPayload(GenericRecord record) { + super(record); + this.outputPayload = record; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/payload/HoodieRecordBufferedIteratorPayload.java b/hoodie-client/src/main/java/com/uber/hoodie/func/payload/HoodieRecordBufferedIteratorPayload.java new file mode 100644 index 000000000..a79f0e01a --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/func/payload/HoodieRecordBufferedIteratorPayload.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.func.payload; + +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import java.util.Optional; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; + +/** + * BufferedIteratorPayload that takes HoodieRecord as input and transforms to output Optional + * @param + */ +public class HoodieRecordBufferedIteratorPayload + extends AbstractBufferedIteratorPayload, Optional> { + + // It caches the exception seen while fetching insert value. + public Optional exception = Optional.empty(); + + public HoodieRecordBufferedIteratorPayload(HoodieRecord record, Schema schema) { + super(record); + try { + this.outputPayload = record.getData().getInsertValue(schema); + } catch (Exception e) { + this.exception = Optional.of(e); + } + } + + public Optional getException() { + return exception; + } +} diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index dc2930aae..f5592c381 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -135,8 +135,8 @@ public class HoodieMergeHandle extends HoodieIOHa try { // Load the new records in a map logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge()); - this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), - Optional.empty(), new StringConverter(), new HoodieRecordConverter(schema, config.getPayloadClass())); + this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(), Optional.empty(), + new StringConverter(), new HoodieRecordConverter(schema, config.getPayloadClass())); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } @@ -148,7 +148,7 @@ public class HoodieMergeHandle extends HoodieIOHa // update the new location of the record, so we know where to find it next record.setNewLocation(new HoodieRecordLocation(commitTime, fileId)); } - logger.debug("Number of entries in MemoryBasedMap => " + logger.info("Number of entries in MemoryBasedMap => " + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() + "Total size in bytes of MemoryBasedMap => " + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() @@ -156,7 +156,6 @@ public class HoodieMergeHandle extends HoodieIOHa + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); - return partitionPath; } @@ -186,7 +185,8 @@ public class HoodieMergeHandle extends HoodieIOHa } /** - * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file. + * Go through an old record. Here if we detect a newer version shows up, we write the new one to + * the file. */ public void write(GenericRecord oldRecord) { String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index db32a908d..bdf40f79a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -34,10 +34,16 @@ import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.exception.HoodieUpsertException; +import com.uber.hoodie.func.BufferedIterator; +import com.uber.hoodie.func.BufferedIteratorExecutor; import com.uber.hoodie.func.LazyInsertIterable; +import com.uber.hoodie.func.ParquetReaderIterator; +import com.uber.hoodie.func.payload.AbstractBufferedIteratorPayload; +import com.uber.hoodie.func.payload.GenericRecordBufferedIteratorPayload; import com.uber.hoodie.io.HoodieCleanHelper; import com.uber.hoodie.io.HoodieMergeHandle; import java.io.IOException; @@ -52,6 +58,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -173,8 +182,19 @@ public class HoodieCopyOnWriteTable extends Hoodi return handleUpdateInternal(upsertHandle, commitTime, fileLoc); } + /** + * Transformer function to help transform a GenericRecord. This transformer is used by BufferedIterator to offload + * some expensive operations of transformation to the reader thread. + * + */ + public static java.util.function.Function + bufferedItrPayloadTransform() { + return (genericRecord) -> new GenericRecordBufferedIteratorPayload(genericRecord); + } + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, - String commitTime, String fileLoc) throws IOException { + String commitTime, String fileLoc) + throws IOException { if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException( "Error in finding the old file path at commit " + commitTime + " at fileLoc: " + fileLoc); @@ -182,32 +202,44 @@ public class HoodieCopyOnWriteTable extends Hoodi AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) .withConf(getHadoopConf()).build(); + final ExecutorService writerService = Executors.newFixedThreadPool(1); try { - IndexedRecord record; - while ((record = reader.read()) != null) { - // Two types of writes here (new record, and old record). - // We have already catch the exception during writing new records. - // But for old records, we should fail if any exception happens. - upsertHandle.write((GenericRecord) record); - } - } catch (IOException e) { - throw new HoodieUpsertException( - "Failed to read record from " + upsertHandle.getOldFilePath() + " with new Schema " - + upsertHandle.getSchema(), e); + java.util.function.Function runnableFunction = (bufferedIterator) -> { + handleWrite(bufferedIterator, upsertHandle); + return null; + }; + BufferedIteratorExecutor wrapper = + new BufferedIteratorExecutor(config, new ParquetReaderIterator(reader), bufferedItrPayloadTransform(), + writerService); + Future writerResult = wrapper.start(runnableFunction); + writerResult.get(); + } catch (Exception e) { + throw new HoodieException(e); } finally { reader.close(); upsertHandle.close(); + writerService.shutdownNow(); } } + //TODO(vc): This needs to be revisited if (upsertHandle.getWriteStatus().getPartitionPath() == null) { - logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " - + upsertHandle.getWriteStatus()); + logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + + ", " + upsertHandle.getWriteStatus()); } return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())) .iterator(); } + private void handleWrite(final BufferedIterator bufferedIterator, + final HoodieMergeHandle upsertHandle) { + while (bufferedIterator.hasNext()) { + final GenericRecordBufferedIteratorPayload payload = (GenericRecordBufferedIteratorPayload) bufferedIterator + .next(); + upsertHandle.write(payload.getOutputPayload()); + } + } + protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc, Iterator> recordItr) { return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc); @@ -792,4 +824,4 @@ public class HoodieCopyOnWriteTable extends Hoodi } } } -} +} \ No newline at end of file diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIterator.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIterator.java index 6c47f2d4c..e55db1b05 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIterator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIterator.java @@ -23,6 +23,8 @@ import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.func.payload.AbstractBufferedIteratorPayload; +import com.uber.hoodie.func.payload.HoodieRecordBufferedIteratorPayload; import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -66,7 +68,7 @@ public class TestBufferedIterator { final int numRecords = 128; final List hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); final BufferedIterator bufferedIterator = new BufferedIterator(hoodieRecords.iterator(), FileUtils.ONE_KB, - HoodieTestDataGenerator.avroSchema); + LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema)); Future result = recordReader.submit(() -> { bufferedIterator.startBuffering(); return true; @@ -77,11 +79,12 @@ public class TestBufferedIterator { final HoodieRecord originalRecord = originalRecordIterator.next(); final Optional originalInsertValue = originalRecord.getData() .getInsertValue(HoodieTestDataGenerator.avroSchema); - final BufferedIterator.BufferedIteratorPayload payload = bufferedIterator.next(); + final HoodieRecordBufferedIteratorPayload payload = (HoodieRecordBufferedIteratorPayload) bufferedIterator.next(); // Ensure that record ordering is guaranteed. - Assert.assertEquals(originalRecord, payload.record); + Assert.assertEquals(originalRecord, payload.getInputPayload()); // cached insert value matches the expected insert value. - Assert.assertEquals(originalInsertValue, payload.insertValue); + Assert.assertEquals(originalInsertValue, + ((HoodieRecord) payload.getInputPayload()).getData().getInsertValue(HoodieTestDataGenerator.avroSchema)); recordsRead++; } Assert.assertFalse(bufferedIterator.hasNext() || originalRecordIterator.hasNext()); @@ -99,8 +102,9 @@ public class TestBufferedIterator { // maximum number of records to keep in memory. final int recordLimit = 5; final long memoryLimitInBytes = recordLimit * SizeEstimator.estimate(hoodieRecords.get(0)); - final BufferedIterator bufferedIterator = new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, - HoodieTestDataGenerator.avroSchema); + final BufferedIterator bufferedIterator = + new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, + LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema)); Future result = recordReader.submit(() -> { bufferedIterator.startBuffering(); return true; @@ -115,8 +119,8 @@ public class TestBufferedIterator { Assert.assertEquals(recordLimit - 1, bufferedIterator.samplingRecordCounter.get()); // try to read 2 records. - Assert.assertEquals(hoodieRecords.get(0), bufferedIterator.next().record); - Assert.assertEquals(hoodieRecords.get(1), bufferedIterator.next().record); + Assert.assertEquals(hoodieRecords.get(0), bufferedIterator.next().getInputPayload()); + Assert.assertEquals(hoodieRecords.get(1), bufferedIterator.next().getInputPayload()); // waiting for permits to expire. while (!isQueueFull(bufferedIterator.rateLimiter)) { @@ -145,7 +149,7 @@ public class TestBufferedIterator { // stops and throws // correct exception back. BufferedIterator bufferedIterator1 = new BufferedIterator(hoodieRecords.iterator(), memoryLimitInBytes, - HoodieTestDataGenerator.avroSchema); + LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema)); Future result = recordReader.submit(() -> { bufferedIterator1.startBuffering(); return true; @@ -173,7 +177,7 @@ public class TestBufferedIterator { when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); BufferedIterator bufferedIterator2 = new BufferedIterator(mockHoodieRecordsIterator, memoryLimitInBytes, - HoodieTestDataGenerator.avroSchema); + LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema)); Future result2 = recordReader.submit(() -> { bufferedIterator2.startBuffering(); return true; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIteratorExecutor.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIteratorExecutor.java new file mode 100644 index 000000000..5c3695b8b --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestBufferedIteratorExecutor.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.func; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; +import com.uber.hoodie.config.HoodieWriteConfig; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Function; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestBufferedIteratorExecutor { + + private final HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator(); + private final String commitTime = HoodieActiveTimeline.createNewCommitTime(); + private ExecutorService executorService = null; + + @Before + public void beforeTest() { + this.executorService = Executors.newFixedThreadPool(1); + } + + @After + public void afterTest() { + if (this.executorService != null) { + this.executorService.shutdownNow(); + this.executorService = null; + } + } + + @Test + public void testExecutor() throws Exception { + + final List hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, 100); + + HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); + when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); + BufferedIteratorExecutor bufferedIteratorExecutor = new BufferedIteratorExecutor(hoodieWriteConfig, + hoodieRecords.iterator(), LazyInsertIterable.bufferedItrPayloadTransform(HoodieTestDataGenerator.avroSchema), + executorService); + Function function = (bufferedIterator) -> { + Integer count = 0; + while (bufferedIterator.hasNext()) { + count++; + bufferedIterator.next(); + } + return count; + }; + Future future = bufferedIteratorExecutor.start(function); + // It should buffer and write 100 records + Assert.assertEquals((int) future.get(), 100); + // There should be no remaining records in the buffer + Assert.assertFalse(bufferedIteratorExecutor.isRemaining()); + } +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestParquetReaderIterator.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestParquetReaderIterator.java new file mode 100644 index 000000000..a082894fe --- /dev/null +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestParquetReaderIterator.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.func; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.uber.hoodie.exception.HoodieIOException; +import java.io.IOException; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.Assert; +import org.junit.Test; + +public class TestParquetReaderIterator { + + @Test + public void testParquetIteratorIdempotency() throws IOException { + ParquetReader reader = mock(ParquetReader.class); + // only 1 record in reader + when(reader.read()).thenReturn(1).thenReturn(null); + ParquetReaderIterator iterator = new ParquetReaderIterator<>(reader); + int idempotencyCheckCounter = 0; + // call hasNext() 3 times + while (idempotencyCheckCounter < 3) { + Assert.assertTrue(iterator.hasNext()); + idempotencyCheckCounter++; + } + } + + @Test + public void testParquetIterator() throws IOException { + + ParquetReader reader = mock(ParquetReader.class); + // only one record to read + when(reader.read()).thenReturn(1).thenReturn(null); + ParquetReaderIterator iterator = new ParquetReaderIterator<>(reader); + // should return value even though hasNext() hasn't been called + Assert.assertTrue(iterator.next() == 1); + // no more entries to iterate on + Assert.assertFalse(iterator.hasNext()); + try { + iterator.next(); + } catch (HoodieIOException e) { + // should throw an exception since there is only 1 record + } + } +} diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java index 0c3ab8626..7ae620ef6 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.net.InetAddress; import java.util.AbstractMap; import java.util.Collection; import java.util.Date; @@ -94,7 +95,9 @@ public final class DiskBasedMap implements Map { } writeOnlyFileHandle.createNewFile(); - log.info("Spilling to file location " + writeOnlyFileHandle.getAbsolutePath()); + log.info( + "Spilling to file location " + writeOnlyFileHandle.getAbsolutePath() + " in host (" + InetAddress.getLocalHost() + .getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName() + ")"); // Open file in readFromDisk-only mode readOnlyFileHandle = new RandomAccessFile(filePath, "r"); readOnlyFileHandle.seek(0);