1
0

[HUDI-335] Improvements to DiskBasedMap used by ExternalSpillableMap, for write and random/sequential read paths, by introducing bufferedRandmomAccessFile

This commit is contained in:
Balajee Nagasubramaniam
2020-01-13 12:40:06 -08:00
committed by n3nash
parent 7aa3ce31e6
commit dd09abb56d
6 changed files with 454 additions and 22 deletions

View File

@@ -0,0 +1,411 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
/**
* Use a private buffer for the read/write/seek operations of the RandomAccessFile
* to reduce the number of disk operations.
*
* S C L/E
* Buffer: |----+---|
* File: +---------------D-------------------+
* S. startPosition - file position of start of the buffer.
* C. currentPosition - file position of current pointer in the buffer.
* L. validLastPosition - file position of the last byte in the buffer. (This is same as
* endPosition of the buffer. Except for the last file block).
* S-C--L--E
* (A)
* Buffer: |-+--+--|
* File: +-----------------------------------D
* E. endPosition() - end file position of the current buffer.
* D. DiskPosition - Position in the file pointed by underlying RandomAccessFile.
* When reading from the file, diskPosition aligns with startPosition. When writing to
* the file, diskPosition(D) aligns with validLastPosition(L/E).
*
* A. AvailableSpace - Space between validLastPosition(L) and EndPosition(E)
* is the available space in buffer, when writing/appending records
* into the buffer/file.
*
* Note: Based on BufferedRandomAccessFile implementation in Apache/Cassandra.
* - adopted from org.apache.cassandra.io
* Copyright: 2015-2019 The Apache Software Foundation
* Home page: http://cassandra.apache.org/
* License: http://www.apache.org/licenses/LICENSE-2.0
*/
public final class BufferedRandomAccessFile extends RandomAccessFile {
private static final Logger LOG = Logger.getLogger(BufferedRandomAccessFile.class);
static final int DEFAULT_BUFFER_SIZE = (1 << 16); // 64K buffer
static final int BUFFER_BOUNDARY_MASK = ~(DEFAULT_BUFFER_SIZE - 1);
private int capacity;
private ByteBuffer dataBuffer;
private long startPosition = 0L;
private long currentPosition = 0L;
private long validLastPosition = 0L;
private long diskPosition = 0L;
private boolean isDirty = false;
private boolean isClosed = false;
private boolean isEOF = false;
/**
*
* @param file - file name
* @param mode - "r" for read only; "rw" for read write
* @throws IOException
*/
public BufferedRandomAccessFile(File file, String mode) throws IOException {
super(file, mode);
this.init(0);
}
/**
*
* @param file - file name
* @param mode - "r" for read only; "rw" for read write
* @param size - size/capacity of the buffer.
* @throws IOException
*/
public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
super(file, mode);
this.init(size);
}
/**
*
* @param name - name of the file
* @param mode - "r" for read only; "rw" for read write
* @throws IOException
*/
public BufferedRandomAccessFile(String name, String mode) throws IOException {
super(name, mode);
this.init(0);
}
/**
*
* @param name - name of the file
* @param mode - "r" for read only; "rw" for read write
* @param size - size/capacity of the buffer
* @throws FileNotFoundException
*/
public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
super(name, mode);
this.init(size);
}
/**
*
* @param size - capacity of the buffer
*/
private void init(int size) {
this.capacity = Math.max(DEFAULT_BUFFER_SIZE, size);
this.dataBuffer = ByteBuffer.wrap(new byte[this.capacity]);
}
/**
* Close the file, after flushing data in the buffer.
* @throws IOException
*/
public void close() throws IOException {
if (!isClosed) {
this.flush();
super.close();
this.isClosed = true;
}
}
/**
* If the file is writable, flush any bytes in the buffer that have not yet been written to disk.
* @throws IOException
*/
public void flush() throws IOException {
this.flushBuffer();
}
/**
* Flush any dirty bytes in the buffer to disk.
* @throws IOException
*/
private void flushBuffer() throws IOException {
if (this.isDirty) {
alignDiskPositionToBufferStartIfNeeded();
int len = (int) (this.currentPosition - this.startPosition);
super.write(this.dataBuffer.array(), 0, len);
this.diskPosition = this.currentPosition;
this.isDirty = false;
}
}
/**
* read ahead file contents to buffer.
* @return number of bytes filled
* @throws IOException
*/
private int fillBuffer() throws IOException {
int cnt = 0;
int bytesToRead = this.capacity;
/** blocking read, until buffer is filled or EOF reached */
while (bytesToRead > 0) {
int n = super.read(this.dataBuffer.array(), cnt, bytesToRead);
if (n < 0) {
break;
}
cnt += n;
bytesToRead -= n;
}
this.isEOF = (cnt < this.dataBuffer.array().length);
this.diskPosition += cnt;
return cnt;
}
/**
* If the diskPosition differs from the startPosition, flush the data in the buffer
* and realign/fill the buffer at startPosition.
* @throws IOException
*/
private void alignDiskPositionToBufferStartIfNeeded() throws IOException {
if (this.diskPosition != this.startPosition) {
super.seek(this.startPosition);
this.diskPosition = this.startPosition;
}
}
/**
* If the new seek position is in the buffer, adjust the currentPostion.
* If the new seek position is outside of the buffer, flush the contents to
* the file and reload the buffer corresponding to the position.
*
* We logically view the file as group blocks, where each block will perfectly
* fit into the buffer (except for the last block). Given a position to seek,
* we identify the block to be loaded using BUFFER_BOUNDARY_MASK.
*
* When dealing with the last block, we will have extra space between validLastPosition
* and endPosition of the buffer.
*
* @param pos - position in the file to be loaded to the buffer.
* @throws IOException
*/
public void seek(long pos) throws IOException {
if (pos >= this.validLastPosition || pos < this.startPosition) {
// seeking outside of current buffer -- flush and read
this.flushBuffer();
this.startPosition = pos & BUFFER_BOUNDARY_MASK; // start at BuffSz boundary
alignDiskPositionToBufferStartIfNeeded();
int n = this.fillBuffer();
this.validLastPosition = this.startPosition + (long) n;
} else {
// seeking inside current buffer -- no read required
if (pos < this.currentPosition) {
// if seeking backwards, flush buffer.
this.flushBuffer();
}
}
this.currentPosition = pos;
}
/**
* @return current file position
*/
public long getFilePointer() {
return this.currentPosition;
}
/**
* Returns the length of the file, depending on whether buffer has more data (to be flushed).
* @return - length of the file (including data yet to be flushed to the file).
* @throws IOException
*/
public long length() throws IOException {
return Math.max(this.currentPosition, super.length());
}
/**
* @return whether currentPosition has reached the end of valid buffer.
*/
private boolean endOfBufferReached() {
return this.currentPosition >= this.validLastPosition;
}
/**
* Load a new data block. Returns false, when EOF is reached.
* @return - whether new data block was loaded or not
* @throws IOException
*/
private boolean loadNewBlockToBuffer() throws IOException {
if (this.isEOF) {
return false;
}
// read next block into buffer
this.seek(this.currentPosition);
// if currentPosition is at start, EOF has been reached
if (this.currentPosition == this.validLastPosition) {
return false;
}
return true;
}
/**
* @return - returns a byte as an integer.
* @throws IOException
*/
public int read() throws IOException {
if (endOfBufferReached()) {
if (!loadNewBlockToBuffer()) {
return -1;
}
}
byte res = this.dataBuffer.array()[(int) (this.currentPosition - this.startPosition)];
this.currentPosition++;
return ((int) res) & 0xFF; // convert byte -> int
}
/**
* @param b - byte array into which to read data.
* @return - returns number of bytes read.
* @throws IOException
*/
public int read(byte[] b) throws IOException {
return this.read(b, 0, b.length);
}
/**
* Read specified number of bytes into given array starting at given offset.
* @param b - byte array
* @param off - start offset
* @param len - length of bytes to be read
* @return - number of bytes read.
* @throws IOException
*/
public int read(byte[] b, int off, int len) throws IOException {
if (endOfBufferReached()) {
if (!loadNewBlockToBuffer()) {
return -1;
}
}
// copy data from buffer
len = Math.min(len, (int) (this.validLastPosition - this.currentPosition));
int buffOff = (int) (this.currentPosition - this.startPosition);
System.arraycopy(this.dataBuffer.array(), buffOff, b, off, len);
this.currentPosition += len;
return len;
}
/**
* @return endPosition of the buffer. For the last file block, this may not be a valid position.
*/
private long endPosition() {
return this.startPosition + this.capacity;
}
/**
* @return - whether space is available at the end of the buffer.
*/
private boolean spaceAvailableInBuffer() {
return (this.isEOF && (this.validLastPosition < this.endPosition()));
}
/**
* write a byte to the buffer/file.
* @param v - value to be written
* @throws IOException
*/
public void write(int v) throws IOException {
byte [] b = new byte[1];
b[0] = (byte) v;
this.write(b, 0, b.length);
}
/**
* write an array of bytes to the buffer/file.
* @param b - byte array with data to be written
* @throws IOException
*/
public void write(byte[] b) throws IOException {
this.write(b, 0, b.length);
}
/**
* Write specified number of bytes into buffer/file, with given starting offset and length.
* @param b - byte array with data to be written
* @param off - starting offset.
* @param len - length of bytes to be written
* @throws IOException
*/
public void write(byte[] b, int off, int len) throws IOException {
// As all data may not fit into the buffer, more than one write would be required.
while (len > 0) {
int n = this.writeAtMost(b, off, len);
off += n;
len -= n;
this.isDirty = true;
}
}
/**
* If space is available at the end of the buffer, start using it. Otherwise,
* flush the unwritten data into the file and load the buffer corresponding to startPosition.
* @throws IOException
*/
private void expandBufferToCapacityIfNeeded() throws IOException {
if (spaceAvailableInBuffer()) {
// space available at end of buffer -- adjust validLastPosition
this.validLastPosition = this.endPosition();
} else {
loadNewBlockToBuffer();
// appending to EOF, adjust validLastPosition.
if (this.currentPosition == this.validLastPosition) {
this.validLastPosition = this.endPosition();
}
}
}
/**
* Given a byte array, offset in the array and length of bytes to be written,
* update the buffer/file.
* @param b - byte array of data to be written
* @param off - starting offset.
* @param len - length of bytes to be written
* @return - number of bytes written
* @throws IOException
*/
private int writeAtMost(byte[] b, int off, int len) throws IOException {
if (endOfBufferReached()) {
expandBufferToCapacityIfNeeded();
}
// copy data to buffer, until all data is copied or to buffer capacity.
len = Math.min(len, (int) (this.validLastPosition - this.currentPosition));
int buffOff = (int) (this.currentPosition - this.startPosition);
System.arraycopy(b, off, this.dataBuffer.array(), buffOff, len);
this.currentPosition += len;
return len;
}
}

View File

@@ -56,12 +56,12 @@ public class SpillableMapUtils {
int keySize = file.readInt();
int valueSize = file.readInt();
byte[] key = new byte[keySize];
file.read(key, 0, keySize);
file.readFully(key, 0, keySize);
byte[] value = new byte[valueSize];
if (valueSize != valueLength) {
throw new HoodieCorruptedDataException("unequal size of payload written to external file, data may be corrupted");
}
file.read(value, 0, valueSize);
file.readFully(value, 0, valueSize);
long crcOfReadValue = generateChecksum(value);
if (crc != crcOfReadValue) {
throw new HoodieCorruptedDataException(

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.util.collection;
import org.apache.hudi.common.util.BufferedRandomAccessFile;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.io.storage.SizeAwareDataOutputStream;
@@ -54,6 +55,7 @@ import java.util.stream.Stream;
*/
public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
public static int BUFFER_SIZE = 128 * 1024; // 128 KB
private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, ValueMetadata> valueMetadataMap;
@@ -69,8 +71,8 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
// FilePath to store the spilled data
private String filePath;
// Thread-safe random access file
private ThreadLocal<RandomAccessFile> randomAccessFile = new ThreadLocal<>();
private Queue<RandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
private ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal<>();
private Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
public DiskBasedMap(String baseFilePath) throws IOException {
this.valueMetadataMap = new ConcurrentHashMap<>();
@@ -78,7 +80,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
this.filePath = writeOnlyFile.getPath();
initFile(writeOnlyFile);
this.fileOutputStream = new FileOutputStream(writeOnlyFile, true);
this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream);
this.writeOnlyFileHandle = new SizeAwareDataOutputStream(fileOutputStream, BUFFER_SIZE);
this.filePosition = new AtomicLong(0L);
}
@@ -87,11 +89,11 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
*
* @return
*/
private RandomAccessFile getRandomAccessFile() {
private BufferedRandomAccessFile getRandomAccessFile() {
try {
RandomAccessFile readHandle = randomAccessFile.get();
BufferedRandomAccessFile readHandle = randomAccessFile.get();
if (readHandle == null) {
readHandle = new RandomAccessFile(filePath, "r");
readHandle = new BufferedRandomAccessFile(filePath, "r");
readHandle.seek(0);
randomAccessFile.set(readHandle);
openedAccessFiles.offer(readHandle);
@@ -135,7 +137,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
}
while (!openedAccessFiles.isEmpty()) {
RandomAccessFile file = openedAccessFiles.poll();
BufferedRandomAccessFile file = openedAccessFiles.poll();
if (null != file) {
try {
file.close();
@@ -153,6 +155,14 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
});
}
private void flushToDisk() {
try {
writeOnlyFileHandle.flush();
} catch (IOException e) {
throw new HoodieIOException("Failed to flush to DiskBasedMap file", e);
}
}
/**
* Custom iterator to iterate over values written to disk.
*/
@@ -210,8 +220,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
}
}
@Override
public synchronized R put(T key, R value) {
private synchronized R put(T key, R value, boolean flush) {
try {
byte[] val = SerializationUtils.serialize(value);
Integer valueSize = val.length;
@@ -222,12 +231,20 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
filePosition
.set(SpillableMapUtils.spillToDisk(writeOnlyFileHandle, new FileEntry(SpillableMapUtils.generateChecksum(val),
serializedKey.length, valueSize, serializedKey, val, timestamp)));
if (flush) {
flushToDisk();
}
} catch (IOException io) {
throw new HoodieIOException("Unable to store data in Disk Based map", io);
}
return value;
}
@Override
public R put(T key, R value) {
return put(key, value, true);
}
@Override
public R remove(Object key) {
R value = get(key);
@@ -238,8 +255,9 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
@Override
public void putAll(Map<? extends T, ? extends R> m) {
for (Map.Entry<? extends T, ? extends R> entry : m.entrySet()) {
put(entry.getKey(), entry.getValue());
put(entry.getKey(), entry.getValue(), false);
}
flushToDisk();
}
@Override
@@ -260,7 +278,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
}
public Stream<R> valueStream() {
final RandomAccessFile file = getRandomAccessFile();
final BufferedRandomAccessFile file = getRandomAccessFile();
return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file));
}

View File

@@ -18,17 +18,17 @@
package org.apache.hudi.common.util.collection;
import org.apache.hudi.common.util.BufferedRandomAccessFile;
import org.apache.hudi.exception.HoodieException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* Iterable to lazily fetch values spilled to disk. This class uses RandomAccessFile to randomly access the position of
* Iterable to lazily fetch values spilled to disk. This class uses BufferedRandomAccessFile to randomly access the position of
* the latest value for a key spilled to disk and returns the result.
*/
public class LazyFileIterable<T, R> implements Iterable<R> {
@@ -58,12 +58,12 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
public class LazyFileIterator<T, R> implements Iterator<R> {
private final String filePath;
private RandomAccessFile readOnlyFileHandle;
private BufferedRandomAccessFile readOnlyFileHandle;
private final Iterator<Map.Entry<T, DiskBasedMap.ValueMetadata>> metadataIterator;
public LazyFileIterator(String filePath, Map<T, DiskBasedMap.ValueMetadata> map) throws IOException {
this.filePath = filePath;
this.readOnlyFileHandle = new RandomAccessFile(filePath, "r");
this.readOnlyFileHandle = new BufferedRandomAccessFile(filePath, "r", DiskBasedMap.BUFFER_SIZE);
readOnlyFileHandle.seek(0);
// sort the map in increasing order of offset of value so disk seek is only in one(forward) direction

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.util.collection.io.storage;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -33,8 +34,8 @@ public class SizeAwareDataOutputStream {
// Counter to keep track of number of bytes written
private AtomicLong size;
public SizeAwareDataOutputStream(FileOutputStream fileOutputStream) {
this.outputStream = new DataOutputStream(fileOutputStream);
public SizeAwareDataOutputStream(FileOutputStream fileOutputStream, int cacheSize) {
this.outputStream = new DataOutputStream(new BufferedOutputStream(fileOutputStream, cacheSize));
this.size = new AtomicLong(0L);
}