1
0

[HUDI-2029] Implement compression for DiskBasedMap in Spillable Map (#3128)

This commit is contained in:
rmahindra123
2021-07-14 19:57:38 -07:00
committed by GitHub
parent 75040ee9e5
commit d024439764
7 changed files with 167 additions and 54 deletions

View File

@@ -29,9 +29,12 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.net.InetAddress;
@@ -47,6 +50,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
/**
* This class provides a disk spillable only map implementation. All of the data is currenly written to one file,
@@ -59,27 +65,33 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
public static final int BUFFER_SIZE = 128 * 1024; // 128 KB
private static final Logger LOG = LogManager.getLogger(BitCaskDiskMap.class);
// Caching byte compression/decompression to avoid creating instances for every operation
private static final ThreadLocal<CompressionHandler> DISK_COMPRESSION_REF =
ThreadLocal.withInitial(CompressionHandler::new);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, ValueMetadata> valueMetadataMap;
// Enables compression for all values stored in the disk map
private final boolean isCompressionEnabled;
// Write only file
private File writeOnlyFile;
private final File writeOnlyFile;
// Write only OutputStream to be able to ONLY append to the file
private SizeAwareDataOutputStream writeOnlyFileHandle;
private final SizeAwareDataOutputStream writeOnlyFileHandle;
// FileOutputStream for the file handle to be able to force fsync
// since FileOutputStream's flush() does not force flush to disk
private FileOutputStream fileOutputStream;
private final FileOutputStream fileOutputStream;
// Current position in the file
private AtomicLong filePosition;
private final AtomicLong filePosition;
// FilePath to store the spilled data
private String filePath;
private final String filePath;
// Thread-safe random access file
private ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal<>();
private Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
private final ThreadLocal<BufferedRandomAccessFile> randomAccessFile = new ThreadLocal<>();
private final Queue<BufferedRandomAccessFile> openedAccessFiles = new ConcurrentLinkedQueue<>();
private transient Thread shutdownThread = null;
public BitCaskDiskMap(String baseFilePath) throws IOException {
public BitCaskDiskMap(String baseFilePath, boolean isCompressionEnabled) throws IOException {
this.valueMetadataMap = new ConcurrentHashMap<>();
this.isCompressionEnabled = isCompressionEnabled;
this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString());
this.filePath = writeOnlyFile.getPath();
initFile(writeOnlyFile);
@@ -88,6 +100,10 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
this.filePosition = new AtomicLong(0L);
}
public BitCaskDiskMap(String baseFilePath) throws IOException {
this(baseFilePath, false);
}
/**
* RandomAcessFile is not thread-safe. This API opens a new file handle per thread and returns.
*
@@ -147,7 +163,7 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
*/
@Override
public Iterator<R> iterator() {
return new LazyFileIterable(filePath, valueMetadataMap).iterator();
return new LazyFileIterable(filePath, valueMetadataMap, isCompressionEnabled).iterator();
}
/**
@@ -188,13 +204,16 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
}
private R get(ValueMetadata entry) {
return get(entry, getRandomAccessFile());
return get(entry, getRandomAccessFile(), isCompressionEnabled);
}
public static <R> R get(ValueMetadata entry, RandomAccessFile file) {
public static <R> R get(ValueMetadata entry, RandomAccessFile file, boolean isCompressionEnabled) {
try {
return SerializationUtils
.deserialize(SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue()));
byte[] bytesFromDisk = SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue());
if (isCompressionEnabled) {
return SerializationUtils.deserialize(DISK_COMPRESSION_REF.get().decompressBytes(bytesFromDisk));
}
return SerializationUtils.deserialize(bytesFromDisk);
} catch (IOException e) {
throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
}
@@ -202,7 +221,8 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
private synchronized R put(T key, R value, boolean flush) {
try {
byte[] val = SerializationUtils.serialize(value);
byte[] val = isCompressionEnabled ? DISK_COMPRESSION_REF.get().compressBytes(SerializationUtils.serialize(value)) :
SerializationUtils.serialize(value);
Integer valueSize = val.length;
Long timestamp = System.currentTimeMillis();
this.valueMetadataMap.put(key,
@@ -293,7 +313,7 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
@Override
public Stream<R> valueStream() {
final BufferedRandomAccessFile file = getRandomAccessFile();
return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file));
return valueMetadataMap.values().stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData, file, isCompressionEnabled));
}
@Override
@@ -399,4 +419,47 @@ public final class BitCaskDiskMap<T extends Serializable, R extends Serializable
return Long.compare(this.offsetOfValue, o.offsetOfValue);
}
}
private static class CompressionHandler implements Serializable {
private static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576;
private static final int DECOMPRESS_INTERMEDIATE_BUFFER_SIZE = 8192;
// Caching ByteArrayOutputStreams to avoid recreating it for every operation
private final ByteArrayOutputStream compressBaos;
private final ByteArrayOutputStream decompressBaos;
private final byte[] decompressIntermediateBuffer;
CompressionHandler() {
compressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE);
decompressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE);
decompressIntermediateBuffer = new byte[DECOMPRESS_INTERMEDIATE_BUFFER_SIZE];
}
private byte[] compressBytes(final byte[] value) throws IOException {
compressBaos.reset();
Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION);
DeflaterOutputStream dos = new DeflaterOutputStream(compressBaos, deflater);
try {
dos.write(value);
} finally {
dos.close();
deflater.end();
}
return compressBaos.toByteArray();
}
private byte[] decompressBytes(final byte[] bytes) throws IOException {
decompressBaos.reset();
InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes));
try {
int len;
while ((len = in.read(decompressIntermediateBuffer)) > 0) {
decompressBaos.write(decompressIntermediateBuffer, 0, len);
}
return decompressBaos.toByteArray();
} catch (IOException e) {
throw new HoodieIOException("IOException while decompressing bytes", e);
}
}
}
}

View File

@@ -72,6 +72,8 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
private final SizeEstimator<R> valueSizeEstimator;
// Type of the disk map
private final DiskMapType diskMapType;
// Enables compression of values stored in disc
private final boolean isCompressionEnabled;
// current space occupied by this map in-memory
private Long currentInMemoryMapSize;
// An estimate of the size of each payload written to this map
@@ -88,6 +90,11 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator, DiskMapType diskMapType) throws IOException {
this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, valueSizeEstimator, diskMapType, false);
}
public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator,
SizeEstimator<R> valueSizeEstimator, DiskMapType diskMapType, boolean isCompressionEnabled) throws IOException {
this.inMemoryMap = new HashMap<>();
this.baseFilePath = baseFilePath;
this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap);
@@ -95,6 +102,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
this.keySizeEstimator = keySizeEstimator;
this.valueSizeEstimator = valueSizeEstimator;
this.diskMapType = diskMapType;
this.isCompressionEnabled = isCompressionEnabled;
}
private DiskMap<T, R> getDiskBasedMap() {
@@ -108,7 +116,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
break;
case BITCASK:
default:
diskBasedMap = new BitCaskDiskMap<>(baseFilePath);
diskBasedMap = new BitCaskDiskMap<>(baseFilePath, isCompressionEnabled);
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);

View File

@@ -37,12 +37,19 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
private final String filePath;
// Stores the key and corresponding value's latest metadata spilled to disk
private final Map<T, BitCaskDiskMap.ValueMetadata> inMemoryMetadataOfSpilledData;
// Was compressions enabled for the values when inserted into the file/ map
private final boolean isCompressionEnabled;
private transient Thread shutdownThread = null;
public LazyFileIterable(String filePath, Map<T, BitCaskDiskMap.ValueMetadata> map) {
this(filePath, map, false);
}
public LazyFileIterable(String filePath, Map<T, BitCaskDiskMap.ValueMetadata> map, boolean isCompressionEnabled) {
this.filePath = filePath;
this.inMemoryMetadataOfSpilledData = map;
this.isCompressionEnabled = isCompressionEnabled;
}
@Override
@@ -91,7 +98,7 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
throw new IllegalStateException("next() called on EOF'ed stream. File :" + filePath);
}
Map.Entry<T, BitCaskDiskMap.ValueMetadata> entry = this.metadataIterator.next();
return BitCaskDiskMap.get(entry.getValue(), readOnlyFileHandle);
return BitCaskDiskMap.get(entry.getValue(), readOnlyFileHandle, isCompressionEnabled);
}
@Override