From dd09abb56d9e4074f5c2558fcfd001a33570e98c Mon Sep 17 00:00:00 2001 From: Balajee Nagasubramaniam Date: Mon, 13 Jan 2020 12:40:06 -0800 Subject: [PATCH] [HUDI-335] Improvements to DiskBasedMap used by ExternalSpillableMap, for write and random/sequential read paths, by introducing bufferedRandmomAccessFile --- .../org/apache/hudi/io/HoodieMergeHandle.java | 8 +- .../common/util/BufferedRandomAccessFile.java | 411 ++++++++++++++++++ .../hudi/common/util/SpillableMapUtils.java | 4 +- .../common/util/collection/DiskBasedMap.java | 40 +- .../util/collection/LazyFileIterable.java | 8 +- .../io/storage/SizeAwareDataOutputStream.java | 5 +- 6 files changed, 454 insertions(+), 22 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 518b88334..630ba3741 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -311,9 +311,11 @@ public class HoodieMergeHandle extends HoodieWrit public WriteStatus close() { try { // write out any pending records (this can happen when inserts are turned into updates) - for (String key : keyToNewRecords.keySet()) { - if (!writtenRecordKeys.contains(key)) { - HoodieRecord hoodieRecord = keyToNewRecords.get(key); + Iterator> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap) + ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator(); + while (newRecordsItr.hasNext()) { + HoodieRecord hoodieRecord = newRecordsItr.next(); + if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) { if (useWriterSchema) { writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema)); } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java new file mode 100644 index 000000000..c99f96170 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java @@ -0,0 +1,411 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.log4j.Logger; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; + +/** + * Use a private buffer for the read/write/seek operations of the RandomAccessFile + * to reduce the number of disk operations. + * + * S C L/E + * Buffer: |----+---| + * File: +---------------D-------------------+ + * S. startPosition - file position of start of the buffer. + * C. currentPosition - file position of current pointer in the buffer. + * L. validLastPosition - file position of the last byte in the buffer. (This is same as + * endPosition of the buffer. Except for the last file block). + * S-C--L--E + * (A) + * Buffer: |-+--+--| + * File: +-----------------------------------D + * E. endPosition() - end file position of the current buffer. + * D. DiskPosition - Position in the file pointed by underlying RandomAccessFile. + * When reading from the file, diskPosition aligns with startPosition. When writing to + * the file, diskPosition(D) aligns with validLastPosition(L/E). + * + * A. AvailableSpace - Space between validLastPosition(L) and EndPosition(E) + * is the available space in buffer, when writing/appending records + * into the buffer/file. + * + * Note: Based on BufferedRandomAccessFile implementation in Apache/Cassandra. + * - adopted from org.apache.cassandra.io + * Copyright: 2015-2019 The Apache Software Foundation + * Home page: http://cassandra.apache.org/ + * License: http://www.apache.org/licenses/LICENSE-2.0 + */ +public final class BufferedRandomAccessFile extends RandomAccessFile { + private static final Logger LOG = Logger.getLogger(BufferedRandomAccessFile.class); + static final int DEFAULT_BUFFER_SIZE = (1 << 16); // 64K buffer + static final int BUFFER_BOUNDARY_MASK = ~(DEFAULT_BUFFER_SIZE - 1); + + private int capacity; + private ByteBuffer dataBuffer; + private long startPosition = 0L; + private long currentPosition = 0L; + private long validLastPosition = 0L; + private long diskPosition = 0L; + private boolean isDirty = false; + private boolean isClosed = false; + private boolean isEOF = false; + + /** + * + * @param file - file name + * @param mode - "r" for read only; "rw" for read write + * @throws IOException + */ + public BufferedRandomAccessFile(File file, String mode) throws IOException { + super(file, mode); + this.init(0); + } + + /** + * + * @param file - file name + * @param mode - "r" for read only; "rw" for read write + * @param size - size/capacity of the buffer. + * @throws IOException + */ + public BufferedRandomAccessFile(File file, String mode, int size) throws IOException { + super(file, mode); + this.init(size); + } + + /** + * + * @param name - name of the file + * @param mode - "r" for read only; "rw" for read write + * @throws IOException + */ + public BufferedRandomAccessFile(String name, String mode) throws IOException { + super(name, mode); + this.init(0); + } + + /** + * + * @param name - name of the file + * @param mode - "r" for read only; "rw" for read write + * @param size - size/capacity of the buffer + * @throws FileNotFoundException + */ + public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException { + super(name, mode); + this.init(size); + } + + /** + * + * @param size - capacity of the buffer + */ + private void init(int size) { + this.capacity = Math.max(DEFAULT_BUFFER_SIZE, size); + this.dataBuffer = ByteBuffer.wrap(new byte[this.capacity]); + } + + /** + * Close the file, after flushing data in the buffer. + * @throws IOException + */ + public void close() throws IOException { + if (!isClosed) { + this.flush(); + super.close(); + this.isClosed = true; + } + } + + /** + * If the file is writable, flush any bytes in the buffer that have not yet been written to disk. + * @throws IOException + */ + public void flush() throws IOException { + this.flushBuffer(); + } + + /** + * Flush any dirty bytes in the buffer to disk. + * @throws IOException + */ + private void flushBuffer() throws IOException { + if (this.isDirty) { + alignDiskPositionToBufferStartIfNeeded(); + int len = (int) (this.currentPosition - this.startPosition); + super.write(this.dataBuffer.array(), 0, len); + this.diskPosition = this.currentPosition; + this.isDirty = false; + } + } + + /** + * read ahead file contents to buffer. + * @return number of bytes filled + * @throws IOException + */ + private int fillBuffer() throws IOException { + int cnt = 0; + int bytesToRead = this.capacity; + /** blocking read, until buffer is filled or EOF reached */ + while (bytesToRead > 0) { + int n = super.read(this.dataBuffer.array(), cnt, bytesToRead); + if (n < 0) { + break; + } + cnt += n; + bytesToRead -= n; + } + this.isEOF = (cnt < this.dataBuffer.array().length); + this.diskPosition += cnt; + return cnt; + } + + /** + * If the diskPosition differs from the startPosition, flush the data in the buffer + * and realign/fill the buffer at startPosition. + * @throws IOException + */ + private void alignDiskPositionToBufferStartIfNeeded() throws IOException { + if (this.diskPosition != this.startPosition) { + super.seek(this.startPosition); + this.diskPosition = this.startPosition; + } + } + + /** + * If the new seek position is in the buffer, adjust the currentPostion. + * If the new seek position is outside of the buffer, flush the contents to + * the file and reload the buffer corresponding to the position. + * + * We logically view the file as group blocks, where each block will perfectly + * fit into the buffer (except for the last block). Given a position to seek, + * we identify the block to be loaded using BUFFER_BOUNDARY_MASK. + * + * When dealing with the last block, we will have extra space between validLastPosition + * and endPosition of the buffer. + * + * @param pos - position in the file to be loaded to the buffer. + * @throws IOException + */ + public void seek(long pos) throws IOException { + if (pos >= this.validLastPosition || pos < this.startPosition) { + // seeking outside of current buffer -- flush and read + this.flushBuffer(); + this.startPosition = pos & BUFFER_BOUNDARY_MASK; // start at BuffSz boundary + alignDiskPositionToBufferStartIfNeeded(); + int n = this.fillBuffer(); + this.validLastPosition = this.startPosition + (long) n; + } else { + // seeking inside current buffer -- no read required + if (pos < this.currentPosition) { + // if seeking backwards, flush buffer. + this.flushBuffer(); + } + } + this.currentPosition = pos; + } + + /** + * @return current file position + */ + public long getFilePointer() { + return this.currentPosition; + } + + /** + * Returns the length of the file, depending on whether buffer has more data (to be flushed). + * @return - length of the file (including data yet to be flushed to the file). + * @throws IOException + */ + public long length() throws IOException { + return Math.max(this.currentPosition, super.length()); + } + + /** + * @return whether currentPosition has reached the end of valid buffer. + */ + private boolean endOfBufferReached() { + return this.currentPosition >= this.validLastPosition; + } + + /** + * Load a new data block. Returns false, when EOF is reached. + * @return - whether new data block was loaded or not + * @throws IOException + */ + private boolean loadNewBlockToBuffer() throws IOException { + if (this.isEOF) { + return false; + } + + // read next block into buffer + this.seek(this.currentPosition); + + // if currentPosition is at start, EOF has been reached + if (this.currentPosition == this.validLastPosition) { + return false; + } + + return true; + } + + /** + * @return - returns a byte as an integer. + * @throws IOException + */ + public int read() throws IOException { + if (endOfBufferReached()) { + if (!loadNewBlockToBuffer()) { + return -1; + } + } + byte res = this.dataBuffer.array()[(int) (this.currentPosition - this.startPosition)]; + this.currentPosition++; + return ((int) res) & 0xFF; // convert byte -> int + } + + /** + * @param b - byte array into which to read data. + * @return - returns number of bytes read. + * @throws IOException + */ + public int read(byte[] b) throws IOException { + return this.read(b, 0, b.length); + } + + /** + * Read specified number of bytes into given array starting at given offset. + * @param b - byte array + * @param off - start offset + * @param len - length of bytes to be read + * @return - number of bytes read. + * @throws IOException + */ + public int read(byte[] b, int off, int len) throws IOException { + if (endOfBufferReached()) { + if (!loadNewBlockToBuffer()) { + return -1; + } + } + + // copy data from buffer + len = Math.min(len, (int) (this.validLastPosition - this.currentPosition)); + int buffOff = (int) (this.currentPosition - this.startPosition); + System.arraycopy(this.dataBuffer.array(), buffOff, b, off, len); + this.currentPosition += len; + return len; + } + + /** + * @return endPosition of the buffer. For the last file block, this may not be a valid position. + */ + private long endPosition() { + return this.startPosition + this.capacity; + } + + /** + * @return - whether space is available at the end of the buffer. + */ + private boolean spaceAvailableInBuffer() { + return (this.isEOF && (this.validLastPosition < this.endPosition())); + } + + /** + * write a byte to the buffer/file. + * @param v - value to be written + * @throws IOException + */ + public void write(int v) throws IOException { + byte [] b = new byte[1]; + b[0] = (byte) v; + this.write(b, 0, b.length); + } + + /** + * write an array of bytes to the buffer/file. + * @param b - byte array with data to be written + * @throws IOException + */ + public void write(byte[] b) throws IOException { + this.write(b, 0, b.length); + } + + /** + * Write specified number of bytes into buffer/file, with given starting offset and length. + * @param b - byte array with data to be written + * @param off - starting offset. + * @param len - length of bytes to be written + * @throws IOException + */ + public void write(byte[] b, int off, int len) throws IOException { + // As all data may not fit into the buffer, more than one write would be required. + while (len > 0) { + int n = this.writeAtMost(b, off, len); + off += n; + len -= n; + this.isDirty = true; + } + } + + /** + * If space is available at the end of the buffer, start using it. Otherwise, + * flush the unwritten data into the file and load the buffer corresponding to startPosition. + * @throws IOException + */ + private void expandBufferToCapacityIfNeeded() throws IOException { + if (spaceAvailableInBuffer()) { + // space available at end of buffer -- adjust validLastPosition + this.validLastPosition = this.endPosition(); + } else { + loadNewBlockToBuffer(); + // appending to EOF, adjust validLastPosition. + if (this.currentPosition == this.validLastPosition) { + this.validLastPosition = this.endPosition(); + } + } + } + + /** + * Given a byte array, offset in the array and length of bytes to be written, + * update the buffer/file. + * @param b - byte array of data to be written + * @param off - starting offset. + * @param len - length of bytes to be written + * @return - number of bytes written + * @throws IOException + */ + private int writeAtMost(byte[] b, int off, int len) throws IOException { + if (endOfBufferReached()) { + expandBufferToCapacityIfNeeded(); + } + + // copy data to buffer, until all data is copied or to buffer capacity. + len = Math.min(len, (int) (this.validLastPosition - this.currentPosition)); + int buffOff = (int) (this.currentPosition - this.startPosition); + System.arraycopy(b, off, this.dataBuffer.array(), buffOff, len); + this.currentPosition += len; + return len; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index 55ecbdca8..06e84a9ec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -56,12 +56,12 @@ public class SpillableMapUtils { int keySize = file.readInt(); int valueSize = file.readInt(); byte[] key = new byte[keySize]; - file.read(key, 0, keySize); + file.readFully(key, 0, keySize); byte[] value = new byte[valueSize]; if (valueSize != valueLength) { throw new HoodieCorruptedDataException("unequal size of payload written to external file, data may be corrupted"); } - file.read(value, 0, valueSize); + file.readFully(value, 0, valueSize); long crcOfReadValue = generateChecksum(value); if (crc != crcOfReadValue) { throw new HoodieCorruptedDataException( diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java index e764a170d..07ad5f27a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util.collection; +import org.apache.hudi.common.util.BufferedRandomAccessFile; import org.apache.hudi.common.util.SerializationUtils; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.io.storage.SizeAwareDataOutputStream; @@ -54,6 +55,7 @@ import java.util.stream.Stream; */ public final class DiskBasedMap implements Map, Iterable { + public static int BUFFER_SIZE = 128 * 1024; // 128 KB private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class); // Stores the key and corresponding value's latest metadata spilled to disk private final Map valueMetadataMap; @@ -69,8 +71,8 @@ public final class DiskBasedMap // FilePath to store the spilled data private String filePath; // Thread-safe random access file - private ThreadLocal randomAccessFile = new ThreadLocal<>(); - private Queue openedAccessFiles = new ConcurrentLinkedQueue<>(); + private ThreadLocal randomAccessFile = new ThreadLocal<>(); + private Queue openedAccessFiles = new ConcurrentLinkedQueue<>(); public DiskBasedMap(String baseFilePath) throws IOException { this.valueMetadataMap = new ConcurrentHashMap<>(); @@ -78,7 +80,7 @@ public final class DiskBasedMap this.filePath = writeOnlyFile.getPath(); initFile(writeOnlyFile); this.fileOutputStream = new FileOutputStream(writeOnlyFile, true); - this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream); + this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream, BUFFER_SIZE); this.filePosition = new AtomicLong(0L); } @@ -87,11 +89,11 @@ public final class DiskBasedMap * * @return */ - private RandomAccessFile getRandomAccessFile() { + private BufferedRandomAccessFile getRandomAccessFile() { try { - RandomAccessFile readHandle = randomAccessFile.get(); + BufferedRandomAccessFile readHandle = randomAccessFile.get(); if (readHandle == null) { - readHandle = new RandomAccessFile(filePath, "r"); + readHandle = new BufferedRandomAccessFile(filePath, "r"); readHandle.seek(0); randomAccessFile.set(readHandle); openedAccessFiles.offer(readHandle); @@ -135,7 +137,7 @@ public final class DiskBasedMap } while (!openedAccessFiles.isEmpty()) { - RandomAccessFile file = openedAccessFiles.poll(); + BufferedRandomAccessFile file = openedAccessFiles.poll(); if (null != file) { try { file.close(); @@ -153,6 +155,14 @@ public final class DiskBasedMap }); } + private void flushToDisk() { + try { + writeOnlyFileHandle.flush(); + } catch (IOException e) { + throw new HoodieIOException("Failed to flush to DiskBasedMap file", e); + } + } + /** * Custom iterator to iterate over values written to disk. */ @@ -210,8 +220,7 @@ public final class DiskBasedMap } } - @Override - public synchronized R put(T key, R value) { + private synchronized R put(T key, R value, boolean flush) { try { byte[] val = SerializationUtils.serialize(value); Integer valueSize = val.length; @@ -222,12 +231,20 @@ public final class DiskBasedMap filePosition .set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val), serializedKey.length, valueSize, serializedKey, val, timestamp))); + if (flush) { + flushToDisk(); + } } catch (IOException io) { throw new HoodieIOException("Unable to store data in Disk Based map", io); } return value; } + @Override + public R put(T key, R value) { + return put(key, value, true); + } + @Override public R remove(Object key) { R value = get(key); @@ -238,8 +255,9 @@ public final class DiskBasedMap @Override public void putAll(Map m) { for (Map.Entry entry : m.entrySet()) { - put(entry.getKey(), entry.getValue()); + put(entry.getKey(), entry.getValue(), false); } + flushToDisk(); } @Override @@ -260,7 +278,7 @@ public final class DiskBasedMap } public Stream valueStream() { - final RandomAccessFile file = getRandomAccessFile(); + final BufferedRandomAccessFile file = getRandomAccessFile(); return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java index 4b8e71684..927c992a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java @@ -18,17 +18,17 @@ package org.apache.hudi.common.util.collection; +import org.apache.hudi.common.util.BufferedRandomAccessFile; import org.apache.hudi.exception.HoodieException; import java.io.IOException; -import java.io.RandomAccessFile; import java.util.Iterator; import java.util.Map; import java.util.function.Consumer; import java.util.stream.Collectors; /** - * Iterable to lazily fetch values spilled to disk. This class uses RandomAccessFile to randomly access the position of + * Iterable to lazily fetch values spilled to disk. This class uses BufferedRandomAccessFile to randomly access the position of * the latest value for a key spilled to disk and returns the result. */ public class LazyFileIterable implements Iterable { @@ -58,12 +58,12 @@ public class LazyFileIterable implements Iterable { public class LazyFileIterator implements Iterator { private final String filePath; - private RandomAccessFile readOnlyFileHandle; + private BufferedRandomAccessFile readOnlyFileHandle; private final Iterator> metadataIterator; public LazyFileIterator(String filePath, Map map) throws IOException { this.filePath = filePath; - this.readOnlyFileHandle = new RandomAccessFile(filePath, "r"); + this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", DiskBasedMap.BUFFER_SIZE); readOnlyFileHandle.seek(0); // sort the map in increasing order of offset of value so disk seek is only in one(forward) direction diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/io/storage/SizeAwareDataOutputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/io/storage/SizeAwareDataOutputStream.java index cf5240430..ddf67407b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/io/storage/SizeAwareDataOutputStream.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/io/storage/SizeAwareDataOutputStream.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util.collection.io.storage; +import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.FileOutputStream; import java.io.IOException; @@ -33,8 +34,8 @@ public class SizeAwareDataOutputStream { // Counter to keep track of number of bytes written private AtomicLong size; - public SizeAwareDataOutputStream(FileOutputStream fileOutputStream) { - this.outputStream = new DataOutputStream(fileOutputStream); + public SizeAwareDataOutputStream(FileOutputStream fileOutputStream, int cacheSize) { + this.outputStream = new DataOutputStream(new BufferedOutputStream(fileOutputStream, cacheSize)); this.size = new AtomicLong(0L); }