1
0

[HUDI-431] Adding support for Parquet in MOR LogBlocks (#4333)

- Adding support for Parquet in MOR tables Log blocks

Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
Alexey Kudinkin
2022-02-02 11:35:05 -08:00
committed by GitHub
parent caef3d5c58
commit a68e1dc2db
45 changed files with 1613 additions and 670 deletions

View File

@@ -18,17 +18,6 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.avro.Conversions.DecimalConversion;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
@@ -50,15 +39,22 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
@@ -67,8 +63,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
/**
* Helper class to do common stuff across Avro.
@@ -343,7 +337,7 @@ public class HoodieAvroUtils {
}
/**
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new
* Given an Avro record with a given schema, rewrites it into the new schema while setting fields only from the new
* schema.
* NOTE: Here, the assumption is that you cannot go from an evolved schema (schema with (N) fields)
* to an older schema (schema with (N-1) fields). All fields present in the older record schema MUST be present in the
@@ -377,6 +371,16 @@ public class HoodieAvroUtils {
return newRecord;
}
/**
* Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the
* provided {@code newSchema}.
*
* To better understand conversion rules please check {@link #rewriteRecord(GenericRecord, Schema)}
*/
public static List<GenericRecord> rewriteRecords(List<GenericRecord> records, Schema newSchema) {
return records.stream().map(r -> rewriteRecord(r, newSchema)).collect(Collectors.toList());
}
private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) {
// cache the result of oldRecord.get() to save CPU expensive hash lookup
Schema oldSchema = oldRecord.getSchema();
@@ -392,33 +396,6 @@ public class HoodieAvroUtils {
}
}
public static byte[] compress(String text) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
OutputStream out = new DeflaterOutputStream(baos);
out.write(text.getBytes(StandardCharsets.UTF_8));
out.close();
} catch (IOException e) {
throw new HoodieIOException("IOException while compressing text " + text, e);
}
return baos.toByteArray();
}
public static String decompress(byte[] bytes) {
InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
byte[] buffer = new byte[8192];
int len;
while ((len = in.read(buffer)) > 0) {
baos.write(buffer, 0, len);
}
return new String(baos.toByteArray(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new HoodieIOException("IOException while decompressing text", e);
}
}
/**
* Generate a reader schema off the provided writeSchema, to just project out the provided columns.
*/

View File

@@ -18,11 +18,10 @@
package org.apache.hudi.avro;
import org.apache.avro.Schema;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.util.Option;
import org.apache.avro.Schema;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;

View File

@@ -135,6 +135,17 @@ public class FSUtils {
return providedPath;
}
/**
* Makes path qualified w/ {@link FileSystem}'s URI
*
* @param fs instance of {@link FileSystem} path belongs to
* @param path path to be qualified
* @return qualified path, prefixed w/ the URI of the target FS object provided
*/
public static Path makeQualified(FileSystem fs, Path path) {
return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
}
/**
* A write token uniquely identifies an attempt at one of the IOHandle operations (Merge/Create/Append).
*/

View File

@@ -136,7 +136,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
}
}
private static Path convertPathWithScheme(Path oldPath, String newScheme) {
public static Path convertPathWithScheme(Path oldPath, String newScheme) {
URI oldURI = oldPath.toUri();
URI newURI;
try {

View File

@@ -19,10 +19,11 @@
package org.apache.hudi.common.fs.inline;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.ValidationUtils;
import java.io.File;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
/**
* Utils to parse InLineFileSystem paths.
* Inline FS format:
@@ -61,10 +62,10 @@ public class InLineFSUtils {
/**
* InlineFS Path format:
* "inlinefs://path/to/outer/file/outer_file_schema/?start_offset=start_offset>&length=<length>"
* "inlinefs://path/to/outer/file/outer_file_scheme/?start_offset=start_offset>&length=<length>"
* <p>
* Outer File Path format:
* "outer_file_schema://path/to/outer/file"
* "outer_file_scheme://path/to/outer/file"
* <p>
* Example
* Input: "inlinefs://file1/s3a/?start_offset=20&length=40".
@@ -74,40 +75,48 @@ public class InLineFSUtils {
* @return Outer file Path from the InLineFS Path
*/
public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
final String scheme = inlineFSPath.getParent().getName();
assertInlineFSPath(inlineFSPath);
final String outerFileScheme = inlineFSPath.getParent().getName();
final Path basePath = inlineFSPath.getParent().getParent();
ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
"Invalid InLineFSPath: " + inlineFSPath);
checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
"Invalid InLineFS path: " + inlineFSPath);
final String pathExceptScheme = basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 1);
final String fullPath = scheme + SCHEME_SEPARATOR
+ (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
final String fullPath = outerFileScheme + SCHEME_SEPARATOR
+ (outerFileScheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
+ pathExceptScheme;
return new Path(fullPath);
}
/**
* Eg input : "inlinefs://file1/s3a/?start_offset=20&length=40".
* output: 20
* Returns start offset w/in the base for the block identified by the given InlineFS path
*
* @param inlinePath
* @return
* input: "inlinefs://file1/s3a/?start_offset=20&length=40".
* output: 20
*/
public static int startOffset(Path inlinePath) {
String[] slices = inlinePath.toString().split("[?&=]");
public static int startOffset(Path inlineFSPath) {
assertInlineFSPath(inlineFSPath);
String[] slices = inlineFSPath.toString().split("[?&=]");
return Integer.parseInt(slices[slices.length - 3]);
}
/**
* Eg input : "inlinefs:/file1/s3a/?start_offset=20&length=40".
* Output: 40
* Returns length of the block (embedded w/in the base file) identified by the given InlineFS path
*
* @param inlinePath
* @return
* input: "inlinefs:/file1/s3a/?start_offset=20&length=40".
* output: 40
*/
public static int length(Path inlinePath) {
assertInlineFSPath(inlinePath);
String[] slices = inlinePath.toString().split("[?&=]");
return Integer.parseInt(slices[slices.length - 1]);
}
private static void assertInlineFSPath(Path inlinePath) {
String scheme = inlinePath.toUri().getScheme();
checkArgument(InLineFileSystem.SCHEME.equals(scheme));
}
}

View File

@@ -57,6 +57,7 @@ public class InLineFileSystem extends FileSystem {
return URI.create(getScheme());
}
@Override
public String getScheme() {
return SCHEME;
}
@@ -129,5 +130,4 @@ public class InLineFileSystem extends FileSystem {
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
throw new UnsupportedOperationException("Can't set working directory");
}
}

View File

@@ -18,11 +18,10 @@
package org.apache.hudi.common.model;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import java.io.IOException;
import java.io.Serializable;
@@ -60,7 +59,7 @@ public class HoodieLogFile implements Serializable {
public HoodieLogFile(Path logPath) {
this.fileStatus = null;
this.pathStr = logPath.toString();
this.fileLen = 0;
this.fileLen = -1;
}
public HoodieLogFile(Path logPath, Long fileLen) {

View File

@@ -18,6 +18,11 @@
package org.apache.hudi.common.table;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex;
import org.apache.hudi.common.config.ConfigClassProperty;
@@ -36,12 +41,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
@@ -230,6 +231,7 @@ public abstract class AbstractHoodieLogRecordReader {
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
+ logBlock.getLogBlockHeader().get(INSTANT_TIME));
if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
@@ -426,6 +428,9 @@ public abstract class AbstractHoodieLogRecordReader {
case HFILE_DATA_BLOCK:
processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
break;
case PARQUET_DATA_BLOCK:
processDataBlock((HoodieParquetDataBlock) lastBlock, keys);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
break;

View File

@@ -18,6 +18,13 @@
package org.apache.hudi.common.table.log;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
@@ -31,21 +38,15 @@ import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
@@ -53,6 +54,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Scans a log file and provides block level iterator on the log file Loads the entire block contents in memory Can emit
* either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one is found).
@@ -63,6 +67,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB
private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
private final Configuration hadoopConf;
private final FSDataInputStream inputStream;
private final HoodieLogFile logFile;
private final byte[] magicBuffer = new byte[6];
@@ -72,7 +77,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
private boolean enableInlineReading;
private boolean enableRecordLookups;
private boolean closed = false;
private transient Thread shutdownThread = null;
@@ -88,76 +93,26 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading,
boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups,
String keyField) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
this.logFile = logFile;
this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
this.hadoopConf = fs.getConf();
// NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path
// is prefixed with an appropriate scheme given that we're not propagating the FS
// further
this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs, logFile.getPath()), logFile.getFileSize());
this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
this.enableInlineReading = enableInlineReading;
this.enableRecordLookups = enableRecordLookups;
this.keyField = keyField;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = logFile.getFileSize();
this.reverseLogFilePosition = this.lastReverseLogFilePosition = this.logFile.getFileSize();
}
addShutDownHook();
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
}
/**
* Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams.
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param fs instance of {@link FileSystem} in use.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, bufferSize), true);
}
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
return fsDataInputStream;
}
/**
* GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be
* used by wrapping with required input streams.
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, int bufferSize) {
// incase of GCS FS, there are two flows.
// a. fsDataInputStream.getWrappedStream() instanceof FSInputStream
// b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream.
// (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
&& ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
return new TimedFSDataInputStream(logFile.getPath(),
new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize)));
}
return fsDataInputStream;
}
@Override
public HoodieLogFile getLogFile() {
return logFile;
@@ -181,15 +136,10 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
// TODO : convert content and block length to long by using ByteBuffer, raw byte [] allows
// for max of Integer size
private HoodieLogBlock readBlock() throws IOException {
int blocksize;
int type;
HoodieLogBlockType blockType = null;
Map<HeaderMetadataType, String> header = null;
int blockSize;
try {
// 1 Read the total size of the block
blocksize = (int) inputStream.readLong();
blockSize = (int) inputStream.readLong();
} catch (EOFException | CorruptedLogFileException e) {
// An exception reading any of the above indicates a corrupt block
// Create a corrupt block by finding the next MAGIC marker or EOF
@@ -197,9 +147,9 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
}
// We may have had a crash which could have written this block partially
// Skip blocksize in the stream and we should either find a sync marker (start of the next
// Skip blockSize in the stream and we should either find a sync marker (start of the next
// block) or EOF. If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupt(blocksize);
boolean isCorrupted = isBlockCorrupted(blockSize);
if (isCorrupted) {
return createCorruptBlock();
}
@@ -208,71 +158,85 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
// 3. Read the block type for a log block
if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) {
type = inputStream.readInt();
ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length, "Invalid block byte type found " + type);
blockType = HoodieLogBlockType.values()[type];
}
HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
// 4. Read the header for a log block, if present
if (nextBlockVersion.hasHeader()) {
header = HoodieLogBlock.getLogMetadata(inputStream);
}
int contentLength = blocksize;
Map<HeaderMetadataType, String> header =
nextBlockVersion.hasHeader() ? HoodieLogBlock.getLogMetadata(inputStream) : null;
// 5. Read the content length for the content
if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) {
contentLength = (int) inputStream.readLong();
}
// Fallback to full-block size if no content-length
// TODO replace w/ hasContentLength
int contentLength =
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : blockSize;
// 6. Read the content or skip content based on IO vs Memory trade-off by client
// TODO - have a max block size and reuse this buffer in the ByteBuffer
// (hard to guess max block size for now)
long contentPosition = inputStream.getPos();
byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, contentLength, readBlockLazily);
boolean shouldReadLazily = readBlockLazily && nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, contentLength, shouldReadLazily);
// 7. Read footer if any
Map<HeaderMetadataType, String> footer = null;
if (nextBlockVersion.hasFooter()) {
footer = HoodieLogBlock.getLogMetadata(inputStream);
}
Map<HeaderMetadataType, String> footer =
nextBlockVersion.hasFooter() ? HoodieLogBlock.getLogMetadata(inputStream) : null;
// 8. Read log block length, if present. This acts as a reverse pointer when traversing a
// log file in reverse
@SuppressWarnings("unused")
long logBlockLength = 0;
if (nextBlockVersion.hasLogBlockLength()) {
logBlockLength = inputStream.readLong();
inputStream.readLong();
}
// 9. Read the log block end position in the log file
long blockEndPos = inputStream.getPos();
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, contentPosition, contentLength, blockEndPos);
switch (Objects.requireNonNull(blockType)) {
// based on type read the block
case AVRO_DATA_BLOCK:
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return HoodieAvroDataBlock.getBlock(content, readerSchema);
return HoodieAvroDataBlock.getBlock(content.get(), readerSchema);
} else {
return new HoodieAvroDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, readerSchema, header, footer, keyField);
return new HoodieAvroDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer, keyField);
}
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, readerSchema,
header, footer, enableInlineReading, keyField);
checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("HFile block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieHFileDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer, enableRecordLookups);
case PARQUET_DATA_BLOCK:
checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("Parquet block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieParquetDataBlock(inputStream, content, readBlockLazily, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer, keyField);
case DELETE_BLOCK:
return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);
return new HoodieDeleteBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer);
case COMMAND_BLOCK:
return HoodieCommandBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);
return new HoodieCommandBlock(content, inputStream, readBlockLazily, Option.of(logBlockContentLoc), header, footer);
default:
throw new HoodieNotSupportedException("Unsupported Block " + blockType);
}
}
@Nullable
private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion blockVersion) throws IOException {
if (blockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return null;
}
int type = inputStream.readInt();
checkArgument(type < HoodieLogBlockType.values().length, "Invalid block byte type found " + type);
return HoodieLogBlockType.values()[type];
}
private HoodieLogBlock createCorruptBlock() throws IOException {
LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
long currentPos = inputStream.getPos();
@@ -282,12 +246,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
long contentPosition = inputStream.getPos();
byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily);
return HoodieCorruptBlock.getBlock(logFile, inputStream, Option.ofNullable(corruptedBytes), readBlockLazily,
contentPosition, corruptedBlockSize, nextBlockOffset, new HashMap<>(), new HashMap<>());
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, contentPosition, corruptedBlockSize, nextBlockOffset);
return new HoodieCorruptBlock(corruptedBytes, inputStream, readBlockLazily, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>());
}
private boolean isBlockCorrupt(int blocksize) throws IOException {
private boolean isBlockCorrupted(int blocksize) throws IOException {
long currentPos = inputStream.getPos();
try {
inputStream.seek(currentPos + blocksize);
@@ -481,4 +446,59 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
public void remove() {
throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
}
/**
* Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams.
* @param fs instance of {@link FileSystem} in use.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private static FSDataInputStream getFSDataInputStream(FileSystem fs,
HoodieLogFile logFile,
int bufferSize) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get EOF exception
return new SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, logFile, bufferSize), true);
}
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize work?
return fsDataInputStream;
}
/**
* GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be
* used by wrapping with required input streams.
* @param fsDataInputStream original instance of {@link FSDataInputStream}.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
private static FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream,
HoodieLogFile logFile,
int bufferSize) {
// incase of GCS FS, there are two flows.
// a. fsDataInputStream.getWrappedStream() instanceof FSInputStream
// b. fsDataInputStream.getWrappedStream() not an instanceof FSInputStream, but an instance of FSDataInputStream.
// (a) is handled in the first if block and (b) is handled in the second if block. If not, we fallback to original fsDataInputStream
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)));
}
if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream
&& ((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
FSInputStream inputStream = (FSInputStream)((FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream();
return new TimedFSDataInputStream(logFile.getPath(),
new FSDataInputStream(new BufferedFSInputStream(inputStream, bufferSize)));
}
return fsDataInputStream;
}
}

View File

@@ -60,13 +60,6 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";
/**
* @param fs
* @param logFile
* @param bufferSize
* @param replication
* @param sizeThreshold
*/
HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, String rolloverLogWriteToken) {
this.fs = fs;
this.logFile = logFile;

View File

@@ -18,13 +18,6 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
@@ -36,59 +29,64 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import javax.annotation.Nonnull;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* HoodieAvroDataBlock contains a list of records serialized using Avro. It is used with the Parquet base file format.
*/
public class HoodieAvroDataBlock extends HoodieDataBlock {
private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
private final ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
public HoodieAvroDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
FSDataInputStream inputStream, boolean readBlockLazily) {
super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
public HoodieAvroDataBlock(FSDataInputStream inputStream,
Option<byte[]> content,
boolean readBlockLazily,
HoodieLogBlockContentLocation logBlockContentLocation,
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
String keyField) {
super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false);
}
public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer, String keyField) {
super(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header,
footer, keyField);
}
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType,
String> header, String keyField) {
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records,
@Nonnull Map<HeaderMetadataType, String> header,
@Nonnull String keyField
) {
super(records, header, new HashMap<>(), keyField);
}
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) {
super(records, header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.AVRO_DATA_BLOCK;
}
@Override
protected byte[] serializeRecords() throws IOException {
protected byte[] serializeRecords(List<IndexedRecord> records) throws IOException {
Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -118,7 +116,6 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
output.writeInt(size);
// Write the content
output.write(temp.toByteArray());
itr.remove();
} catch (IOException e) {
throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e);
}
@@ -130,9 +127,11 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
// TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used
// TODO (na) - Implement a recordItr instead of recordList
@Override
protected void deserializeRecords() throws IOException {
protected List<IndexedRecord> deserializeRecords(byte[] content) throws IOException {
checkState(readerSchema != null, "Reader's schema has to be non-null");
SizeAwareDataInputStream dis =
new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(getContent().get())));
new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));
// 1. Read version for this data block
int version = dis.readInt();
@@ -141,12 +140,8 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
// If readerSchema was not present, use writerSchema
if (schema == null) {
schema = writerSchema;
}
GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, schema);
// 2. Get the total records
int totalRecords = 0;
if (logBlockVersion.hasRecordCount()) {
@@ -157,17 +152,17 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
// 3. Read the content
for (int i = 0; i < totalRecords; i++) {
int recordLength = dis.readInt();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(getContent().get(), dis.getNumberOfBytesRead(),
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(content, dis.getNumberOfBytesRead(),
recordLength, decoderCache.get());
decoderCache.set(decoder);
IndexedRecord record = reader.read(null, decoder);
records.add(record);
dis.skipBytes(recordLength);
}
dis.close();
this.records = records;
// Free up content to be GC'd, deflate
deflate();
return records;
}
//----------------------------------------------------------------------------------------
@@ -183,9 +178,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
*/
@Deprecated
public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema) {
super(new HashMap<>(), new HashMap<>(), Option.empty(), Option.empty(), null, false);
this.records = records;
this.schema = schema;
super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA, schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
/**
@@ -201,7 +194,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
int schemaLength = dis.readInt();
byte[] compressedSchema = new byte[schemaLength];
dis.readFully(compressedSchema, 0, schemaLength);
Schema writerSchema = new Schema.Parser().parse(HoodieAvroUtils.decompress(compressedSchema));
Schema writerSchema = new Schema.Parser().parse(decompress(compressedSchema));
if (readerSchema == null) {
readerSchema = writerSchema;
@@ -224,6 +217,33 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
return new HoodieAvroDataBlock(records, readerSchema);
}
private static byte[] compress(String text) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
OutputStream out = new DeflaterOutputStream(baos);
out.write(text.getBytes(StandardCharsets.UTF_8));
out.close();
} catch (IOException e) {
throw new HoodieIOException("IOException while compressing text " + text, e);
}
return baos.toByteArray();
}
private static String decompress(byte[] bytes) {
InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
byte[] buffer = new byte[8192];
int len;
while ((len = in.read(buffer)) > 0) {
baos.write(buffer, 0, len);
}
return new String(baos.toByteArray(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new HoodieIOException("IOException while decompressing text", e);
}
}
@Deprecated
public byte[] getBytes(Schema schema) throws IOException {
@@ -232,10 +252,12 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
DataOutputStream output = new DataOutputStream(baos);
// 2. Compress and Write schema out
byte[] schemaContent = HoodieAvroUtils.compress(schema.toString());
byte[] schemaContent = compress(schema.toString());
output.writeInt(schemaContent.length);
output.write(schemaContent);
List<IndexedRecord> records = getRecords();
// 3. Write total number of records
output.writeInt(records.size());

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -44,9 +43,9 @@ public class HoodieCommandBlock extends HoodieLogBlock {
this(Option.empty(), null, false, Option.empty(), header, new HashMap<>());
}
private HoodieCommandBlock(Option<byte[]> content, FSDataInputStream inputStream, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
public HoodieCommandBlock(Option<byte[]> content, FSDataInputStream inputStream, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, content, inputStream, readBlockLazily);
this.type =
HoodieCommandBlockTypeEnum.values()[Integer.parseInt(header.get(HeaderMetadataType.COMMAND_BLOCK_TYPE))];
@@ -65,12 +64,4 @@ public class HoodieCommandBlock extends HoodieLogBlock {
public byte[] getContentBytes() {
return new byte[0];
}
public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long blockEndPos, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
return new HoodieCommandBlock(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), header, footer);
}
}

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -32,15 +31,14 @@ import java.util.Map;
*/
public class HoodieCorruptBlock extends HoodieLogBlock {
private HoodieCorruptBlock(Option<byte[]> corruptedBytes, FSDataInputStream inputStream, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
public HoodieCorruptBlock(Option<byte[]> corruptedBytes, FSDataInputStream inputStream, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, corruptedBytes, inputStream, readBlockLazily);
}
@Override
public byte[] getContentBytes() throws IOException {
if (!getContent().isPresent() && readBlockLazily) {
// read content from disk
inflate();
@@ -53,11 +51,4 @@ public class HoodieCorruptBlock extends HoodieLogBlock {
return HoodieLogBlockType.CORRUPT_BLOCK;
}
public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream,
Option<byte[]> corruptedBytes, boolean readBlockLazily, long position, long blockSize, long blockEndPos,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
return new HoodieCorruptBlock(corruptedBytes, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), header, footer);
}
}

View File

@@ -18,25 +18,24 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import javax.annotation.Nonnull;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* DataBlock contains a list of records serialized using formats compatible with the base file format.
* For each base file format there is a corresponding DataBlock format.
*
* <p>
* The Datablock contains:
* 1. Data Block version
* 2. Total number of records in the block
@@ -44,125 +43,151 @@ import java.util.Map;
*/
public abstract class HoodieDataBlock extends HoodieLogBlock {
protected List<IndexedRecord> records;
protected Schema schema;
protected String keyField;
// TODO rebase records/content to leverage Either to warrant
// that they are mutex (used by read/write flows respectively)
private Option<List<IndexedRecord>> records;
public HoodieDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
FSDataInputStream inputStream, boolean readBlockLazily) {
super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily);
this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
}
/**
* Key field's name w/in the record's schema
*/
private final String keyFieldName;
public HoodieDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
@Nonnull Map<HeaderMetadataType, String> footer, String keyField) {
this(header, footer, Option.empty(), Option.empty(), null, false);
this.records = records;
this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
this.keyField = keyField;
}
private final boolean enablePointLookups;
protected HoodieDataBlock(Option<byte[]> content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Schema readerSchema,
@Nonnull Map<HeaderMetadataType, String> headers, @Nonnull Map<HeaderMetadataType,
String> footer, String keyField) {
this(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
this.schema = readerSchema;
this.keyField = keyField;
protected final Schema readerSchema;
/**
* NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
*/
public HoodieDataBlock(List<IndexedRecord> records,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
String keyFieldName) {
super(header, footer, Option.empty(), Option.empty(), null, false);
this.records = Option.of(records);
this.keyFieldName = keyFieldName;
// If no reader-schema has been provided assume writer-schema as one
this.readerSchema = getWriterSchema(super.getLogBlockHeader());
this.enablePointLookups = false;
}
/**
* Util method to get a data block for the requested type.
*
* @param logDataBlockFormat - Data block type
* @param recordList - List of records that goes in the data block
* @param header - data block header
* @return Data block of the requested type.
* NOTE: This ctor is used on the write-path (ie when records ought to be written into the log)
*/
public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
Map<HeaderMetadataType, String> header) {
return getBlock(logDataBlockFormat, recordList, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
/**
* Util method to get a data block for the requested type.
*
* @param logDataBlockFormat - Data block type
* @param recordList - List of records that goes in the data block
* @param header - data block header
* @param keyField - FieldId to get the key from the records
* @return Data block of the requested type.
*/
public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List<IndexedRecord> recordList,
Map<HeaderMetadataType, String> header, String keyField) {
switch (logDataBlockFormat) {
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(recordList, header, keyField);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(recordList, header, keyField);
default:
throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
}
protected HoodieDataBlock(Option<byte[]> content,
FSDataInputStream inputStream,
boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation,
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> headers,
Map<HeaderMetadataType, String> footer,
String keyFieldName,
boolean enablePointLookups) {
super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
this.records = Option.empty();
this.keyFieldName = keyFieldName;
// If no reader-schema has been provided assume writer-schema as one
this.readerSchema = readerSchema.orElseGet(() -> getWriterSchema(super.getLogBlockHeader()));
this.enablePointLookups = enablePointLookups;
}
@Override
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing records from content
if (getContent().isPresent()) {
return getContent().get();
} else if (readBlockLazily && !getContent().isPresent() && records == null) {
// read block lazily
createRecordsFromContentBytes();
Option<byte[]> content = getContent();
checkState(content.isPresent() || records.isPresent(), "Block is in invalid state");
if (content.isPresent()) {
return content.get();
}
return serializeRecords();
return serializeRecords(records.get());
}
public abstract HoodieLogBlockType getBlockType();
protected static Schema getWriterSchema(Map<HeaderMetadataType, String> logBlockHeader) {
return new Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
}
public List<IndexedRecord> getRecords() {
if (records == null) {
/**
* Returns all the records contained w/in this block
*/
public final List<IndexedRecord> getRecords() {
if (!records.isPresent()) {
try {
// in case records are absent, read content lazily and then convert to IndexedRecords
createRecordsFromContentBytes();
records = Option.of(readRecordsFromBlockPayload());
} catch (IOException io) {
throw new HoodieIOException("Unable to convert content bytes to records", io);
}
}
return records;
return records.get();
}
public Schema getSchema() {
return readerSchema;
}
/**
* Batch get of keys of interest. Implementation can choose to either do full scan and return matched entries or
* do a seek based parsing and return matched entries.
*
* @param keys keys of interest.
* @return List of IndexedRecords for the keys of interest.
* @throws IOException
* @throws IOException in case of failures encountered when reading/parsing records
*/
public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
throw new UnsupportedOperationException("On demand batch get based on interested keys not supported");
}
public Schema getSchema() {
// if getSchema was invoked before converting byte [] to records
if (records == null) {
getRecords();
public final List<IndexedRecord> getRecords(List<String> keys) throws IOException {
boolean fullScan = keys.isEmpty();
if (enablePointLookups && !fullScan) {
return lookupRecords(keys);
}
return schema;
// Otherwise, we fetch all the records and filter out all the records, but the
// ones requested
List<IndexedRecord> allRecords = getRecords();
if (fullScan) {
return allRecords;
}
HashSet<String> keySet = new HashSet<>(keys);
return allRecords.stream()
.filter(record -> keySet.contains(getRecordKey(record).orElse(null)))
.collect(Collectors.toList());
}
protected void createRecordsFromContentBytes() throws IOException {
protected List<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();
}
deserializeRecords();
try {
return deserializeRecords(getContent().get());
} finally {
// Free up content to be GC'd by deflating the block
deflate();
}
}
protected abstract byte[] serializeRecords() throws IOException;
protected List<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
throw new UnsupportedOperationException(
String.format("Point lookups are not supported by this Data block type (%s)", getBlockType())
);
}
protected abstract void deserializeRecords() throws IOException;
protected abstract byte[] serializeRecords(List<IndexedRecord> records) throws IOException;
protected abstract List<IndexedRecord> deserializeRecords(byte[] content) throws IOException;
public abstract HoodieLogBlockType getBlockType();
protected Option<Schema.Field> getKeyField(Schema schema) {
return Option.ofNullable(schema.getField(keyFieldName));
}
protected Option<String> getRecordKey(IndexedRecord record) {
return getKeyField(record.getSchema())
.map(keyField -> record.get(keyField.pos()))
.map(Object::toString);
}
}

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.exception.HoodieIOException;
@@ -47,7 +46,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
this.keysToDelete = keysToDelete;
}
private HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream inputStream, boolean readBlockLazily,
public HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream inputStream, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation> blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, content, inputStream, readBlockLazily);
@@ -55,11 +54,12 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
@Override
public byte[] getContentBytes() throws IOException {
Option<byte[]> content = getContent();
// In case this method is called before realizing keys from content
if (getContent().isPresent()) {
return getContent().get();
} else if (readBlockLazily && !getContent().isPresent() && keysToDelete == null) {
if (content.isPresent()) {
return content.get();
} else if (readBlockLazily && keysToDelete == null) {
// read block lazily
getKeysToDelete();
}
@@ -100,11 +100,4 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
return HoodieLogBlockType.DELETE_BLOCK;
}
public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long blockEndPos, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) throws IOException {
return new HoodieDeleteBlock(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), header, footer);
}
}

View File

@@ -18,19 +18,7 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -43,11 +31,18 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nonnull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
@@ -58,29 +53,36 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile
* base file format.
*/
public class HoodieHFileDataBlock extends HoodieDataBlock {
private static final Logger LOG = LogManager.getLogger(HoodieHFileDataBlock.class);
private static Compression.Algorithm compressionAlgorithm = Compression.Algorithm.GZ;
private static int blockSize = 1 * 1024 * 1024;
private boolean enableInlineReading = false;
public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long blockEndpos,
Schema readerSchema, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer, boolean enableInlineReading, String keyField) {
super(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)),
readerSchema, header, footer, keyField);
this.enableInlineReading = enableInlineReading;
private static final int DEFAULT_BLOCK_SIZE = 1024 * 1024;
private final Option<Compression.Algorithm> compressionAlgorithm;
public HoodieHFileDataBlock(FSDataInputStream inputStream,
Option<byte[]> content,
boolean readBlockLazily,
HoodieLogBlockContentLocation logBlockContentLocation,
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
boolean enablePointLookups) {
super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, HoodieHFileReader.KEY_FIELD_NAME, enablePointLookups);
this.compressionAlgorithm = Option.empty();
}
public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header,
String keyField) {
super(records, header, new HashMap<>(), keyField);
public HoodieHFileDataBlock(List<IndexedRecord> records,
Map<HeaderMetadataType, String> header,
Compression.Algorithm compressionAlgorithm) {
super(records, header, new HashMap<>(), HoodieHFileReader.KEY_FIELD_NAME);
this.compressionAlgorithm = Option.of(compressionAlgorithm);
}
@Override
@@ -89,43 +91,45 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
@Override
protected byte[] serializeRecords() throws IOException {
HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm)
protected byte[] serializeRecords(List<IndexedRecord> records) throws IOException {
HFileContext context = new HFileContextBuilder()
.withBlockSize(DEFAULT_BLOCK_SIZE)
.withCompression(compressionAlgorithm.get())
.build();
Configuration conf = new Configuration();
CacheConfig cacheConfig = new CacheConfig(conf);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
.withOutputStream(ostream).withFileContext(context).withComparator(new HoodieHBaseKVComparator()).create();
// Use simple incrementing counter as a key
boolean useIntegerKey = !getRecordKey(records.get(0)).isPresent();
// This is set here to avoid re-computing this in the loop
int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 1 : -1;
// Serialize records into bytes
Map<String, byte[]> sortedRecordsMap = new TreeMap<>();
Iterator<IndexedRecord> itr = records.iterator();
boolean useIntegerKey = false;
int key = 0;
int keySize = 0;
final Field keyFieldSchema = records.get(0).getSchema().getField(HoodieHFileReader.KEY_FIELD_NAME);
if (keyFieldSchema == null) {
// Missing key metadata field so we should use an integer sequence key
useIntegerKey = true;
keySize = (int) Math.ceil(Math.log(records.size())) + 1;
}
int id = 0;
while (itr.hasNext()) {
IndexedRecord record = itr.next();
String recordKey;
if (useIntegerKey) {
recordKey = String.format("%" + keySize + "s", key++);
recordKey = String.format("%" + keyWidth + "s", id++);
} else {
recordKey = record.get(keyFieldSchema.pos()).toString();
recordKey = getRecordKey(record).get();
}
final byte[] recordBytes = serializeRecord(record, Option.ofNullable(keyFieldSchema));
final byte[] recordBytes = serializeRecord(record);
ValidationUtils.checkState(!sortedRecordsMap.containsKey(recordKey),
"Writing multiple records with same key not supported for " + this.getClass().getName());
sortedRecordsMap.put(recordKey, recordBytes);
}
HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
.withOutputStream(ostream).withFileContext(context).withComparator(new HoodieHBaseKVComparator()).create();
// Write the records
sortedRecordsMap.forEach((recordKey, recordBytes) -> {
try {
@@ -144,79 +148,52 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
@Override
protected void createRecordsFromContentBytes() throws IOException {
if (enableInlineReading) {
getRecords(Collections.emptyList());
} else {
super.createRecordsFromContentBytes();
protected List<IndexedRecord> deserializeRecords(byte[] content) throws IOException {
checkState(readerSchema != null, "Reader's schema has to be non-null");
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
// Read the content
HoodieHFileReader<IndexedRecord> reader = new HoodieHFileReader<>(content);
List<Pair<String, IndexedRecord>> records = reader.readAllRecords(writerSchema, readerSchema);
return records.stream().map(Pair::getSecond).collect(Collectors.toList());
}
// TODO abstract this w/in HoodieDataBlock
@Override
protected List<IndexedRecord> lookupRecords(List<String> keys) throws IOException {
HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
// NOTE: It's important to extend Hadoop configuration here to make sure configuration
// is appropriately carried over
Configuration inlineConf = new Configuration(blockContentLoc.getHadoopConf());
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
Path inlinePath = InLineFSUtils.getInlineFilePath(
blockContentLoc.getLogFile().getPath(),
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
blockContentLoc.getContentPositionInLogFile(),
blockContentLoc.getBlockSize());
// HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
Collections.sort(keys);
try (HoodieHFileReader<IndexedRecord> reader =
new HoodieHFileReader<>(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) {
// Get writer's schema from the header
List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys, readerSchema);
return logRecords.stream().map(Pair::getSecond).collect(Collectors.toList());
}
}
@Override
public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
readWithInlineFS(keys);
return records;
}
/**
* Serialize the record to byte buffer.
*
* @param record - Record to serialize
* @param keyField - Key field in the schema
* @return Serialized byte buffer for the record
*/
private byte[] serializeRecord(final IndexedRecord record, final Option<Field> keyField) {
private byte[] serializeRecord(IndexedRecord record) {
Option<Schema.Field> keyField = getKeyField(record.getSchema());
// Reset key value w/in the record to avoid duplicating the key w/in payload
if (keyField.isPresent()) {
record.put(keyField.get().pos(), StringUtils.EMPTY_STRING);
}
return HoodieAvroUtils.indexedRecordToBytes(record);
}
private void readWithInlineFS(List<String> keys) throws IOException {
boolean enableFullScan = keys.isEmpty();
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
// If readerSchema was not present, use writerSchema
if (schema == null) {
schema = writerSchema;
}
Configuration conf = new Configuration();
CacheConfig cacheConf = new CacheConfig(conf);
Configuration inlineConf = new Configuration();
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
Path inlinePath = InLineFSUtils.getInlineFilePath(
getBlockContentLocation().get().getLogFile().getPath(),
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
getBlockContentLocation().get().getContentPositionInLogFile(),
getBlockContentLocation().get().getBlockSize());
if (!enableFullScan) {
// HFile read will be efficient if keys are sorted, since on storage, records are sorted by key. This will avoid unnecessary seeks.
Collections.sort(keys);
}
HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, cacheConf, inlinePath.getFileSystem(inlineConf));
List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords = enableFullScan ? reader.readAllRecords(writerSchema, schema) :
reader.readRecords(keys, schema);
reader.close();
this.records = logRecords.stream().map(t -> t.getSecond()).collect(Collectors.toList());
}
@Override
protected void deserializeRecords() throws IOException {
// Get schema from the header
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
// If readerSchema was not present, use writerSchema
if (schema == null) {
schema = writerSchema;
}
// Read the content
HoodieHFileReader reader = new HoodieHFileReader<>(getContent().get());
List<Pair<String, IndexedRecord>> records = reader.readAllRecords(writerSchema, schema);
this.records = records.stream().map(t -> t.getSecond()).collect(Collectors.toList());
// Free up content to be GC'd, deflate
deflate();
}
}

View File

@@ -18,15 +18,18 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypeUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -36,6 +39,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Abstract class defining a block in HoodieLogFile.
*/
@@ -58,14 +63,17 @@ public abstract class HoodieLogBlock {
// TODO : change this to just InputStream so this works for any FileSystem
// create handlers to return specific type of inputstream based on FS
// input stream corresponding to the log file where this logBlock belongs
protected FSDataInputStream inputStream;
private final FSDataInputStream inputStream;
// Toggle flag, whether to read blocks lazily (I/O intensive) or not (Memory intensive)
protected boolean readBlockLazily;
public HoodieLogBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
public HoodieLogBlock(
@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content,
FSDataInputStream inputStream, boolean readBlockLazily) {
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
@Nonnull Option<byte[]> content,
@Nullable FSDataInputStream inputStream,
boolean readBlockLazily) {
this.logBlockHeader = logBlockHeader;
this.logBlockFooter = logBlockFooter;
this.blockContentLocation = blockContentLocation;
@@ -109,7 +117,25 @@ public abstract class HoodieLogBlock {
* Type of the log block WARNING: This enum is serialized as the ordinal. Only add new enums at the end.
*/
public enum HoodieLogBlockType {
COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK
COMMAND_BLOCK(":command"),
DELETE_BLOCK(":delete"),
CORRUPT_BLOCK(":corrupted"),
AVRO_DATA_BLOCK("avro"),
HFILE_DATA_BLOCK("hfile"),
PARQUET_DATA_BLOCK("parquet");
private static final Map<String, HoodieLogBlockType> ID_TO_ENUM_MAP =
TypeUtils.getValueToEnumMap(HoodieLogBlockType.class, e -> e.id);
private final String id;
HoodieLogBlockType(String id) {
this.id = id;
}
public static HoodieLogBlockType fromId(String id) {
return ID_TO_ENUM_MAP.get(id);
}
}
/**
@@ -132,7 +158,8 @@ public abstract class HoodieLogBlock {
* intensive CompactedScanner, the location helps to lazily read contents from the log file
*/
public static final class HoodieLogBlockContentLocation {
// Hadoop Config required to access the file
private final Configuration hadoopConf;
// The logFile that contains this block
private final HoodieLogFile logFile;
// The filePosition in the logFile for the contents of this block
@@ -142,14 +169,22 @@ public abstract class HoodieLogBlock {
// The final position where the complete block ends
private final long blockEndPos;
HoodieLogBlockContentLocation(HoodieLogFile logFile, long contentPositionInLogFile, long blockSize,
long blockEndPos) {
public HoodieLogBlockContentLocation(Configuration hadoopConf,
HoodieLogFile logFile,
long contentPositionInLogFile,
long blockSize,
long blockEndPos) {
this.hadoopConf = hadoopConf;
this.logFile = logFile;
this.contentPositionInLogFile = contentPositionInLogFile;
this.blockSize = blockSize;
this.blockEndPos = blockEndPos;
}
public Configuration getHadoopConf() {
return hadoopConf;
}
public HoodieLogFile getLogFile() {
return logFile;
}
@@ -210,24 +245,27 @@ public abstract class HoodieLogBlock {
* Read or Skip block content of a log block in the log file. Depends on lazy reading enabled in
* {@link HoodieMergedLogRecordScanner}
*/
public static byte[] readOrSkipContent(FSDataInputStream inputStream, Integer contentLength, boolean readBlockLazily)
public static Option<byte[]> tryReadContent(FSDataInputStream inputStream, Integer contentLength, boolean readLazily)
throws IOException {
byte[] content = null;
if (!readBlockLazily) {
// Read the contents in memory
content = new byte[contentLength];
inputStream.readFully(content, 0, contentLength);
} else {
if (readLazily) {
// Seek to the end of the content block
inputStream.seek(inputStream.getPos() + contentLength);
return Option.empty();
}
return content;
// TODO re-use buffer if stream is backed by buffer
// Read the contents in memory
byte[] content = new byte[contentLength];
inputStream.readFully(content, 0, contentLength);
return Option.of(content);
}
/**
* When lazyReading of blocks is turned on, inflate the content of a log block from disk.
*/
protected void inflate() throws HoodieIOException {
checkState(!content.isPresent(), "Block has already been inflated");
checkState(inputStream != null, "Block should have input-stream provided");
try {
content = Option.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]);

View File

@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.log.block;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import javax.annotation.Nonnull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* HoodieParquetDataBlock contains a list of records serialized using Parquet.
*/
public class HoodieParquetDataBlock extends HoodieDataBlock {
private final Option<CompressionCodecName> compressionCodecName;
public HoodieParquetDataBlock(FSDataInputStream inputStream,
Option<byte[]> content,
boolean readBlockLazily,
HoodieLogBlockContentLocation logBlockContentLocation,
Option<Schema> readerSchema,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
String keyField) {
super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false);
this.compressionCodecName = Option.empty();
}
public HoodieParquetDataBlock(
@Nonnull List<IndexedRecord> records,
@Nonnull Map<HeaderMetadataType, String> header,
@Nonnull String keyField,
@Nonnull CompressionCodecName compressionCodecName
) {
super(records, header, new HashMap<>(), keyField);
this.compressionCodecName = Option.of(compressionCodecName);
}
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.PARQUET_DATA_BLOCK;
}
@Override
protected byte[] serializeRecords(List<IndexedRecord> records) throws IOException {
if (records.size() == 0) {
return new byte[0];
}
Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty());
HoodieAvroParquetConfig avroParquetConfig =
new HoodieAvroParquetConfig(
writeSupport,
compressionCodecName.get(),
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
1024 * 1024 * 1024,
new Configuration(),
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
for (IndexedRecord record : records) {
String recordKey = getRecordKey(record).orElse(null);
parquetWriter.writeAvro(recordKey, record);
}
outputStream.flush();
}
}
return baos.toByteArray();
}
public static Iterator<IndexedRecord> getProjectedParquetRecordsIterator(Configuration conf,
Schema readerSchema,
InputFile inputFile) throws IOException {
AvroReadSupport.setAvroReadSchema(conf, readerSchema);
AvroReadSupport.setRequestedProjection(conf, readerSchema);
ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(inputFile).withConf(conf).build();
return new ParquetReaderIterator<>(reader);
}
/**
* NOTE: We're overriding the whole reading sequence to make sure we properly respect
* the requested Reader's schema and only fetch the columns that have been explicitly
* requested by the caller (providing projected Reader's schema)
*/
@Override
protected List<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();
// NOTE: It's important to extend Hadoop configuration here to make sure configuration
// is appropriately carried over
Configuration inlineConf = new Configuration(blockContentLoc.getHadoopConf());
inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", InLineFileSystem.class.getName());
Path inlineLogFilePath = InLineFSUtils.getInlineFilePath(
blockContentLoc.getLogFile().getPath(),
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
blockContentLoc.getContentPositionInLogFile(),
blockContentLoc.getBlockSize());
ArrayList<IndexedRecord> records = new ArrayList<>();
getProjectedParquetRecordsIterator(
inlineConf,
readerSchema,
HadoopInputFile.fromPath(inlineLogFilePath, inlineConf)
)
.forEachRemaining(records::add);
return records;
}
@Override
protected List<IndexedRecord> deserializeRecords(byte[] content) throws IOException {
throw new UnsupportedOperationException("Should not be invoked");
}
}

View File

@@ -108,14 +108,31 @@ public final class Option<T> implements Serializable {
}
}
/**
* Returns this {@link Option} if not empty, otherwise evaluates the provided supplier
* and returns the alternative
*/
public Option<T> or(Supplier<? extends Option<T>> other) {
return val != null ? this : other.get();
}
/**
* Identical to {@code Optional.orElse}
*/
public T orElse(T other) {
return val != null ? val : other;
}
/**
* Identical to {@code Optional.orElseGet}
*/
public T orElseGet(Supplier<? extends T> other) {
return val != null ? val : other.get();
}
/**
* Identical to {@code Optional.orElseThrow}
*/
public <X extends Throwable> T orElseThrow(Supplier<? extends X> exceptionSupplier) throws X {
if (val != null) {
return val;

View File

@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util.io;
import javax.annotation.Nonnull;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* Instance of {@link InputStream} backed by {@link ByteBuffer}, implementing following
* functionality (on top of what's required by {@link InputStream})
*
* <ol>
* <li>Seeking: enables random access by allowing to seek to an arbitrary position w/in the stream</li>
* <li>(Thread-safe) Copying: enables to copy from the underlying buffer not modifying the state of the stream</li>
* </ol>
*
* NOTE: Generally methods of this class are NOT thread-safe, unless specified otherwise
*/
public class ByteBufferBackedInputStream extends InputStream {
private final ByteBuffer buffer;
private final int bufferOffset;
public ByteBufferBackedInputStream(ByteBuffer buf) {
this.buffer = buf.duplicate();
// We're marking current buffer position, so that we will be able
// to reset it later on appropriately (to support seek operations)
this.buffer.mark();
this.bufferOffset = buffer.position();
}
public ByteBufferBackedInputStream(byte[] array) {
this(array, 0, array.length);
}
public ByteBufferBackedInputStream(byte[] array, int offset, int length) {
this(ByteBuffer.wrap(array, offset, length));
}
@Override
public int read() {
if (!buffer.hasRemaining()) {
throw new IllegalArgumentException("Reading past backed buffer boundary");
}
return buffer.get() & 0xFF;
}
@Override
public int read(@Nonnull byte[] bytes, int offset, int length) {
if (!buffer.hasRemaining()) {
throw new IllegalArgumentException("Reading past backed buffer boundary");
}
// Determine total number of bytes available to read
int available = Math.min(length, buffer.remaining());
// Copy bytes into the target buffer
buffer.get(bytes, offset, available);
return available;
}
/**
* Returns current position of the stream
*/
public int getPosition() {
return buffer.position() - bufferOffset;
}
/**
* Seeks to a position w/in the stream
*
* NOTE: Position is relative to the start of the stream (ie its absolute w/in this stream),
* with following invariant being assumed:
* <p>0 <= pos <= length (of the stream)</p>
*
* This method is NOT thread-safe
*
* @param pos target position to seek to w/in the holding buffer
*/
public void seek(long pos) {
buffer.reset(); // to mark
int offset = buffer.position();
// NOTE: That the new pos is still relative to buffer's offset
int newPos = offset + (int) pos;
if (newPos > buffer.limit() || newPos < offset) {
throw new IllegalArgumentException(
String.format("Can't seek past the backing buffer (limit %d, offset %d, new %d)", buffer.limit(), offset, newPos)
);
}
buffer.position(newPos);
}
/**
* Copies at most {@code length} bytes starting from position {@code pos} into the target
* buffer with provided {@code offset}. Returns number of bytes copied from the backing buffer
*
* NOTE: This does not change the current position of the stream and is thread-safe
*
* @param pos absolute position w/in stream to read from
* @param targetBuffer target buffer to copy into
* @param offset target buffer offset to copy at
* @param length length of the sequence to copy
* @return number of bytes copied
*/
public int copyFrom(long pos, byte[] targetBuffer, int offset, int length) {
int bufferPos = bufferOffset + (int) pos;
if (bufferPos > buffer.limit()) {
throw new IllegalArgumentException(
String.format("Can't read past the backing buffer boundary (offset %d, length %d)", pos, buffer.limit() - bufferOffset)
);
} else if (length > targetBuffer.length) {
throw new IllegalArgumentException(
String.format("Target buffer is too small (length %d, buffer size %d)", length, targetBuffer.length)
);
}
// Determine total number of bytes available to read
int available = Math.min(length, buffer.limit() - bufferPos);
// Get current buffer position in the backing array
System.arraycopy(buffer.array(), bufferPos, targetBuffer, offset, available);
return available;
}
}

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
/**
* ParquetConfig for writing avro records in Parquet files.
*/
public class HoodieAvroParquetConfig extends HoodieBaseParquetConfig<HoodieAvroWriteSupport> {
public HoodieAvroParquetConfig(HoodieAvroWriteSupport writeSupport, CompressionCodecName compressionCodecName,
int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf,
double compressionRatio) {
super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio);
}
public HoodieAvroParquetConfig(HoodieAvroWriteSupport writeSupport, CompressionCodecName compressionCodecName,
int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf,
double compressionRatio, boolean directoryEnabled) {
super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, directoryEnabled);
}
}

View File

@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
/**
* Base ParquetConfig to hold config params for writing to Parquet.
* @param <T>
*/
public class HoodieBaseParquetConfig<T> {
private final T writeSupport;
private final CompressionCodecName compressionCodecName;
private final int blockSize;
private final int pageSize;
private final long maxFileSize;
private final Configuration hadoopConf;
private final double compressionRatio;
private final boolean dictionaryEnabled;
public HoodieBaseParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize,
int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) {
this(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, false);
}
public HoodieBaseParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize,
int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) {
this.writeSupport = writeSupport;
this.compressionCodecName = compressionCodecName;
this.blockSize = blockSize;
this.pageSize = pageSize;
this.maxFileSize = maxFileSize;
this.hadoopConf = hadoopConf;
this.compressionRatio = compressionRatio;
this.dictionaryEnabled = dictionaryEnabled;
}
public CompressionCodecName getCompressionCodecName() {
return compressionCodecName;
}
public int getBlockSize() {
return blockSize;
}
public int getPageSize() {
return pageSize;
}
public long getMaxFileSize() {
return maxFileSize;
}
public Configuration getHadoopConf() {
return hadoopConf;
}
public double getCompressionRatio() {
return compressionRatio;
}
public T getWriteSupport() {
return writeSupport;
}
public boolean dictionaryEnabled() {
return dictionaryEnabled;
}
}

View File

@@ -27,7 +27,7 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.util.Option;
public interface HoodieFileReader<R extends IndexedRecord> {
public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable {
public String[] readMinMaxRecordKeys();

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.io.storage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -51,6 +50,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -77,11 +77,11 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
this.reader = HFile.createReader(FSUtils.getFs(path.toString(), configuration), path, cacheConfig, conf);
}
public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem inlineFs) throws IOException {
public HoodieHFileReader(Configuration configuration, Path path, CacheConfig cacheConfig, FileSystem fs) throws IOException {
this.conf = configuration;
this.path = path;
this.fsDataInputStream = inlineFs.open(path);
this.reader = HFile.createReader(inlineFs, path, cacheConfig, configuration);
this.fsDataInputStream = fs.open(path);
this.reader = HFile.createReader(fs, path, cacheConfig, configuration);
}
public HoodieHFileReader(byte[] content) throws IOException {
@@ -332,28 +332,14 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
}
}
static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
static class SeekableByteArrayInputStream extends ByteBufferBackedInputStream implements Seekable, PositionedReadable {
public SeekableByteArrayInputStream(byte[] buf) {
super(buf);
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public void seek(long pos) throws IOException {
if (mark != 0) {
throw new IllegalStateException();
}
reset();
long skipped = skip(pos);
if (skipped != pos) {
throw new IOException();
}
return getPosition();
}
@Override
@@ -363,19 +349,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
if (position >= buf.length) {
throw new IllegalArgumentException();
}
if (position + length > buf.length) {
throw new IllegalArgumentException();
}
if (length > buffer.length) {
throw new IllegalArgumentException();
}
System.arraycopy(buf, (int) position, buffer, offset, length);
return length;
return copyFrom(position, buffer, offset, length);
}
@Override

View File

@@ -34,9 +34,9 @@ import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader {
private Path path;
private Configuration conf;
public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader<R> {
private final Path path;
private final Configuration conf;
private final BaseFileUtils parquetUtils;
public HoodieParquetReader(Configuration configuration, Path path) {
@@ -45,6 +45,7 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
}
@Override
public String[] readMinMaxRecordKeys() {
return parquetUtils.readMinMaxRecordKeys(conf, path);
}
@@ -55,15 +56,15 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
}
@Override
public Set<String> filterRowKeys(Set candidateRowKeys) {
public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
return parquetUtils.filterRowKeys(conf, path, candidateRowKeys);
}
@Override
public Iterator<R> getRecordIterator(Schema schema) throws IOException {
AvroReadSupport.setAvroReadSchema(conf, schema);
ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
return new ParquetReaderIterator(reader);
ParquetReader<R> reader = AvroParquetReader.<R>builder(path).withConf(conf).build();
return new ParquetReaderIterator<>(reader);
}
@Override

View File

@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.OutputFile;
import java.io.IOException;
// TODO(HUDI-3035) unify w/ HoodieParquetWriter
public class HoodieParquetStreamWriter<R extends IndexedRecord> implements AutoCloseable {
private final ParquetWriter<R> writer;
private final HoodieAvroWriteSupport writeSupport;
public HoodieParquetStreamWriter(FSDataOutputStream outputStream,
HoodieAvroParquetConfig parquetConfig) throws IOException {
this.writeSupport = parquetConfig.getWriteSupport();
this.writer = new Builder<R>(new OutputStreamBackedOutputFile(outputStream), writeSupport)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.withCompressionCodec(parquetConfig.getCompressionCodecName())
.withRowGroupSize(parquetConfig.getBlockSize())
.withPageSize(parquetConfig.getPageSize())
.withDictionaryPageSize(parquetConfig.getPageSize())
.withDictionaryEncoding(parquetConfig.dictionaryEnabled())
.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION)
.withConf(parquetConfig.getHadoopConf())
.build();
}
public void writeAvro(String key, R object) throws IOException {
writer.write(object);
writeSupport.add(key);
}
@Override
public void close() throws IOException {
writer.close();
}
private static class Builder<T> extends ParquetWriter.Builder<T, Builder<T>> {
private final WriteSupport<T> writeSupport;
private Builder(Path file, WriteSupport<T> writeSupport) {
super(file);
this.writeSupport = writeSupport;
}
private Builder(OutputFile file, WriteSupport<T> writeSupport) {
super(file);
this.writeSupport = writeSupport;
}
@Override
protected Builder<T> self() {
return this;
}
@Override
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return writeSupport;
}
}
}

View File

@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.parquet.io;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
/**
* Implementation of {@link InputFile} backed by {@code byte[]} buffer
*/
public class ByteBufferBackedInputFile implements InputFile {
private final byte[] buffer;
private final int offset;
private final int length;
public ByteBufferBackedInputFile(byte[] buffer, int offset, int length) {
this.buffer = buffer;
this.offset = offset;
this.length = length;
}
public ByteBufferBackedInputFile(byte[] buffer) {
this(buffer, 0, buffer.length);
}
@Override
public long getLength() {
return length;
}
@Override
public SeekableInputStream newStream() {
return new DelegatingSeekableInputStream(new ByteBufferBackedInputStream(buffer, offset, length)) {
@Override
public long getPos() {
return ((ByteBufferBackedInputStream) getStream()).getPosition();
}
@Override
public void seek(long newPos) {
((ByteBufferBackedInputStream) getStream()).seek(newPos);
}
};
}
}

View File

@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.parquet.io;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import javax.annotation.Nonnull;
import java.io.IOException;
/**
* Implementation of the {@link OutputFile} backed by {@link java.io.OutputStream}
*/
public class OutputStreamBackedOutputFile implements OutputFile {
private static final long DEFAULT_BLOCK_SIZE = 1024L * 1024L;
private final FSDataOutputStream outputStream;
public OutputStreamBackedOutputFile(FSDataOutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public PositionOutputStream create(long blockSizeHint) {
return new PositionOutputStreamAdapter(outputStream);
}
@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) {
return create(blockSizeHint);
}
@Override
public boolean supportsBlockSize() {
return false;
}
@Override
public long defaultBlockSize() {
return DEFAULT_BLOCK_SIZE;
}
private static class PositionOutputStreamAdapter extends PositionOutputStream {
private final FSDataOutputStream delegate;
PositionOutputStreamAdapter(FSDataOutputStream delegate) {
this.delegate = delegate;
}
@Override
public long getPos() throws IOException {
return delegate.getPos();
}
@Override
public void write(int b) throws IOException {
delegate.write(b);
}
@Override
public void write(@Nonnull byte[] buffer, int off, int len) throws IOException {
delegate.write(buffer, off, len);
}
@Override
public void flush() throws IOException {
delegate.flush();
}
@Override
public void close() {
// We're deliberately not closing the delegate stream here to allow caller
// to explicitly manage its lifecycle
}
}
}