1
0

Implement HoodieLogFormat replacing Avro as the default log format

This commit is contained in:
Prasanna Rajaperumal
2017-05-17 12:52:36 -07:00
committed by vinoth chandar
parent 3c984447da
commit 240c91241b
30 changed files with 1790 additions and 1260 deletions

View File

@@ -17,7 +17,7 @@
package com.uber.hoodie.common.model;
public enum HoodieFileFormat {
PARQUET(".parquet"), AVRO(".avro");
PARQUET(".parquet"), HOODIE_LOG(".log");
private final String extension;

View File

@@ -52,7 +52,7 @@ public class HoodieTableConfig implements Serializable {
public static final HoodieTableType DEFAULT_TABLE_TYPE = HoodieTableType.COPY_ON_WRITE;
public static final HoodieFileFormat DEFAULT_RO_FILE_FORMAT = HoodieFileFormat.PARQUET;
public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.AVRO;
public static final HoodieFileFormat DEFAULT_RT_FILE_FORMAT = HoodieFileFormat.HOODIE_LOG;
private Properties props;
public HoodieTableConfig(FileSystem fs, String metaPath) {

View File

@@ -0,0 +1,207 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged
* list of records which will be used as a lookup table when merging the base columnar file
* with the redo log file.
*
* TODO(FIX) - Does not apply application specific merge logic - defaults to HoodieAvroPayload
*/
public class HoodieCompactedLogRecordScanner implements Iterable<HoodieRecord<HoodieAvroPayload>> {
private final static Logger log = LogManager.getLogger(HoodieCompactedLogRecordScanner.class);
// Final list of compacted/merged records to iterate
private final Collection<HoodieRecord<HoodieAvroPayload>> logRecords;
// Reader schema for the records
private final Schema readerSchema;
// Total log files read - for metrics
private AtomicLong totalLogFiles = new AtomicLong(0);
// Total log records read - for metrics
private AtomicLong totalLogRecords = new AtomicLong(0);
// Total final list of compacted/merged records
private long totalRecordsToUpdate;
public HoodieCompactedLogRecordScanner(FileSystem fs, List<String> logFilePaths,
Schema readerSchema) {
this.readerSchema = readerSchema;
Map<String, HoodieRecord<HoodieAvroPayload>> records = Maps.newHashMap();
// iterate over the paths
logFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))).forEach(s -> {
log.info("Scanning log file " + s.getPath());
totalLogFiles.incrementAndGet();
try {
// Use the HoodieLogFormatReader to iterate through the blocks in the log file
HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, s, readerSchema);
// Store the records loaded from the last data block (needed to implement rollback)
Map<String, HoodieRecord<HoodieAvroPayload>> recordsFromLastBlock = Maps.newHashMap();
reader.forEachRemaining(r -> {
switch (r.getBlockType()) {
case AVRO_DATA_BLOCK:
log.info("Reading a data block from file " + s.getPath());
// If this is a avro data block, then merge the last block records into the main result
merge(records, recordsFromLastBlock);
// Load the merged records into recordsFromLastBlock
HoodieAvroDataBlock dataBlock = (HoodieAvroDataBlock) r;
loadRecordsFromBlock(dataBlock, recordsFromLastBlock);
break;
case DELETE_BLOCK:
log.info("Reading a delete block from file " + s.getPath());
// This is a delete block, so lets merge any records from previous data block
merge(records, recordsFromLastBlock);
// Delete the keys listed as to be deleted
HoodieDeleteBlock deleteBlock = (HoodieDeleteBlock) r;
Arrays.stream(deleteBlock.getKeysToDelete()).forEach(records::remove);
break;
case COMMAND_BLOCK:
log.info("Reading a command block from file " + s.getPath());
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
if (commandBlock.getType() == HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK) {
log.info("Rolling back the last data block read in " + s.getPath());
// rollback the last read data block
recordsFromLastBlock.clear();
}
break;
case CORRUPT_BLOCK:
log.info("Found a corrupt block in " + s.getPath());
// If there is a corrupt block - we will assume that this was the next data block
// so merge the last block records (TODO - handle when the corrupted block was a tombstone written partially?)
merge(records, recordsFromLastBlock);
recordsFromLastBlock.clear();
break;
}
});
// merge the last read block when all the blocks are done reading
if (!recordsFromLastBlock.isEmpty()) {
log.info("Merging the final data block in " + s.getPath());
merge(records, recordsFromLastBlock);
}
} catch (IOException e) {
throw new HoodieIOException("IOException when reading log file " + s);
}
});
this.logRecords = Collections.unmodifiableCollection(records.values());
this.totalRecordsToUpdate = records.size();
}
/**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path
* and merge with the HoodieAvroPayload if the same key was found before
*
* @param dataBlock
* @param recordsFromLastBlock
*/
private void loadRecordsFromBlock(
HoodieAvroDataBlock dataBlock,
Map<String, HoodieRecord<HoodieAvroPayload>> recordsFromLastBlock) {
recordsFromLastBlock.clear();
List<IndexedRecord> recs = dataBlock.getRecords();
totalLogRecords.addAndGet(recs.size());
recs.forEach(rec -> {
String key = ((GenericRecord) rec).get(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.toString();
String partitionPath =
((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.toString();
HoodieRecord<HoodieAvroPayload> hoodieRecord = new HoodieRecord<>(
new HoodieKey(key, partitionPath),
new HoodieAvroPayload(Optional.of(((GenericRecord) rec))));
if (recordsFromLastBlock.containsKey(key)) {
// Merge and store the merged record
HoodieAvroPayload combinedValue = recordsFromLastBlock.get(key).getData()
.preCombine(hoodieRecord.getData());
recordsFromLastBlock
.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
recordsFromLastBlock.put(key, hoodieRecord);
}
});
}
/**
* Merge the records read from a single data block with the accumulated records
*
* @param records
* @param recordsFromLastBlock
*/
private void merge(Map<String, HoodieRecord<HoodieAvroPayload>> records,
Map<String, HoodieRecord<HoodieAvroPayload>> recordsFromLastBlock) {
recordsFromLastBlock.forEach((key, hoodieRecord) -> {
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieAvroPayload combinedValue = records.get(key).getData()
.preCombine(hoodieRecord.getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
records.put(key, hoodieRecord);
}
});
}
@Override
public Iterator<HoodieRecord<HoodieAvroPayload>> iterator() {
return logRecords.iterator();
}
public long getTotalLogFiles() {
return totalLogFiles.get();
}
public long getTotalLogRecords() {
return totalLogRecords.get();
}
public long getTotalRecordsToUpdate() {
return totalRecordsToUpdate;
}
}

View File

@@ -1,238 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.util.FSUtils;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
/**
* Configuration for a HoodieLog
*/
public class HoodieLogAppendConfig {
private final static Logger log = LogManager.getLogger(HoodieLogAppendConfig.class);
private static final long DEFAULT_SIZE_THRESHOLD = 32 * 1024 * 1024L;
private final int bufferSize;
private final short replication;
private final long blockSize;
private final HoodieLogFile logFile;
private boolean isAutoFlush;
private final Schema schema;
private final FileSystem fs;
private final long sizeThreshold;
private HoodieLogAppendConfig(FileSystem fs, HoodieLogFile logFile, Schema schema, Integer bufferSize,
Short replication, Long blockSize, boolean isAutoFlush, Long sizeThreshold) {
this.fs = fs;
this.logFile = logFile;
this.schema = schema;
this.bufferSize = bufferSize;
this.replication = replication;
this.blockSize = blockSize;
this.isAutoFlush = isAutoFlush;
this.sizeThreshold = sizeThreshold;
}
public int getBufferSize() {
return bufferSize;
}
public short getReplication() {
return replication;
}
public long getBlockSize() {
return blockSize;
}
public Schema getSchema() {
return schema;
}
public FileSystem getFs() {
return fs;
}
public HoodieLogFile getLogFile() {
return logFile;
}
public long getSizeThreshold() {
return sizeThreshold;
}
public boolean isAutoFlush() {
return isAutoFlush;
}
public static Builder newBuilder() {
return new Builder();
}
public HoodieLogAppendConfig withLogFile(HoodieLogFile newFile) {
return new HoodieLogAppendConfig(fs, newFile, schema, bufferSize, replication, blockSize,
isAutoFlush, sizeThreshold);
}
public static class Builder {
// Auto-flush. if set to true - then after every append, the avro block will be flushed
private boolean isAutoFlush = true;
// Buffer size in the Avro writer
private Integer bufferSize;
// Replication for the log file
private Short replication;
// Blocksize for the avro log file (useful if auto-flush is set to false)
private Long blockSize;
// Schema for the log file
private Schema schema;
// FileSystem
private FileSystem fs;
// Size threshold for the log file. Useful when used with a rolling log appender
private Long sizeThreshold;
// Log File extension. Could be .avro.delta or .avro.commits etc
private String logFileExtension;
// File ID
private String fileId;
// version number for this log file. If not specified, then the current version will be computed
private Integer fileVersion;
// Partition path for the log file
private Path partitionPath;
// The base commit time for which the log files are accumulated
private String baseCommitTime;
public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public Builder withReplication(short replication) {
this.replication = replication;
return this;
}
public Builder withBlockSize(long blockSize) {
this.blockSize = blockSize;
return this;
}
public Builder withSchema(Schema schema) {
this.schema = schema;
return this;
}
public Builder withFs(FileSystem fs) {
this.fs = fs;
return this;
}
public Builder withAutoFlush(boolean autoFlush) {
this.isAutoFlush = autoFlush;
return this;
}
public Builder withSizeThreshold(long sizeThreshold) {
this.sizeThreshold = sizeThreshold;
return this;
}
public Builder withLogFileExtension(String logFileExtension) {
this.logFileExtension = logFileExtension;
return this;
}
public Builder withFileId(String fileId) {
this.fileId = fileId;
return this;
}
public Builder withFileVersion(int version) {
this.fileVersion = version;
return this;
}
public Builder onPartitionPath(Path path) {
this.partitionPath = path;
return this;
}
public Builder withBaseCommitTime(String commitTime) {
this.baseCommitTime = commitTime;
return this;
}
public HoodieLogAppendConfig build() throws IOException {
log.info("Building HoodieLogAppendConfig");
if (schema == null) {
throw new IllegalArgumentException("Schema for log is not specified");
}
if (fs == null) {
fs = FSUtils.getFs();
}
if (fileId == null) {
throw new IllegalArgumentException("FileID is not specified");
}
if (baseCommitTime == null) {
throw new IllegalArgumentException("BaseCommitTime is not specified");
}
if (logFileExtension == null) {
throw new IllegalArgumentException("File extension is not specified");
}
if (partitionPath == null) {
throw new IllegalArgumentException("Partition path is not specified");
}
if (fileVersion == null) {
log.info("Computing the next log version for " + fileId + " in " + partitionPath);
fileVersion =
FSUtils.getCurrentLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
log.info(
"Computed the next log version for " + fileId + " in " + partitionPath + " as "
+ fileVersion);
}
Path logPath = new Path(partitionPath,
FSUtils.makeLogFileName(fileId, logFileExtension, baseCommitTime, fileVersion));
log.info("LogConfig created on path " + logPath);
HoodieLogFile logFile = new HoodieLogFile(logPath);
if (bufferSize == null) {
bufferSize = FSUtils.getDefaultBufferSize(fs);
}
if (replication == null) {
replication = FSUtils.getDefaultReplication(fs, partitionPath);
}
if (blockSize == null) {
blockSize = FSUtils.getDefaultBlockSize(fs, partitionPath);
}
if (sizeThreshold == null) {
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
}
return new HoodieLogAppendConfig(fs, logFile, schema, bufferSize, replication, blockSize,
isAutoFlush, sizeThreshold);
}
}
}

View File

@@ -1,65 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.table.log.avro.AvroLogAppender;
import com.uber.hoodie.common.table.log.avro.RollingAvroLogAppender;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* Interface for implementations supporting appending data to a log file
*
* @param <R>
* @see AvroLogAppender
* @see RollingAvroLogAppender
*/
public interface HoodieLogAppender<R> {
/**
* Append a stream of records in a batch (this will be written as a block/unit to the underlying log)
*
* @param records
* @throws IOException
*/
void append(Iterator<R> records) throws IOException, InterruptedException;
/**
* Syncs the log manually if auto-flush is not set in HoodieLogAppendConfig. If auto-flush is set
* Then the LogAppender will automatically flush after the append call.
*
* @throws IOException
*/
void sync() throws IOException;
/**
* Close the appended and release any resources holding on to
*
* @throws IOException
*/
void close() throws IOException;
/**
* Gets the current offset in the log. This is usually used to mark the start of the block in
* meta-data and passed to the HoodieLogReader
*
* @return
* @throws IOException
*/
long getCurrentSize() throws IOException;
}

View File

@@ -32,7 +32,7 @@ import java.util.Optional;
* Also contains logic to roll-over the log file
*/
public class HoodieLogFile {
public static final String DELTA_EXTENSION = ".avro.delta";
public static final String DELTA_EXTENSION = ".log";
private final Path path;
private Optional<FileStatus> fileStatus;
@@ -89,11 +89,6 @@ public class HoodieLogFile {
FSUtils.makeLogFileName(fileId, DELTA_EXTENSION, baseCommitTime, newVersion)));
}
public boolean shouldRollOver(HoodieLogAppender currentWriter, HoodieLogAppendConfig config)
throws IOException {
return currentWriter.getCurrentSize() > config.getSizeThreshold();
}
public static Comparator<HoodieLogFile> getLogVersionComparator() {
return (o1, o2) -> {
// reverse the order

View File

@@ -0,0 +1,193 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* File Format for Hoodie Log Files.
* The File Format consists of blocks each seperated with a MAGIC sync marker.
* A Block can either be a Data block, Command block or Delete Block.
* Data Block - Contains log records serialized as Avro Binary Format
* Command Block - Specific commands like RoLLBACK_PREVIOUS-BLOCK - Tombstone for the previously written block
* Delete Block - List of keys to delete - tombstone for keys
*/
public interface HoodieLogFormat {
/**
* Magic 4 bytes we put at the start of every block in the log file. Sync marker.
* We could make this file specific (generate a random 4 byte magic and stick it in the file header), but this I think is suffice for now - PR
*/
byte [] MAGIC = new byte [] {'H', 'U', 'D', 'I'};
/**
* Writer interface to allow appending block to this file format
*/
interface Writer extends Closeable {
/** @return the path to this {@link HoodieLogFormat} */
HoodieLogFile getLogFile();
/**
* Append Block returns a new Writer if the log is rolled
*/
Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException;
long getCurrentSize() throws IOException;
}
/**
* Reader interface which is an Iterator of HoodieLogBlock
*/
interface Reader extends Closeable, Iterator<HoodieLogBlock> {
/** @return the path to this {@link HoodieLogFormat} */
HoodieLogFile getLogFile();
}
/**
* Builder class to construct the default log format writer
*/
class WriterBuilder {
private final static Logger log = LogManager.getLogger(WriterBuilder.class);
// Default max log file size 512 MB
public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
// Buffer size
private Integer bufferSize;
// Replication for the log file
private Short replication;
// FileSystem
private FileSystem fs;
// Size threshold for the log file. Useful when used with a rolling log appender
private Long sizeThreshold;
// Log File extension. Could be .avro.delta or .avro.commits etc
private String fileExtension;
// File Id
private String logFileId;
// File Commit Time stamp
private String commitTime;
// version number for this log file. If not specified, then the current version will be computed by inspecting the file system
private Integer logVersion;
// Location of the directory containing the log
private Path parentPath;
public WriterBuilder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public WriterBuilder withReplication(short replication) {
this.replication = replication;
return this;
}
public WriterBuilder withFs(FileSystem fs) {
this.fs = fs;
return this;
}
public WriterBuilder withSizeThreshold(long sizeThreshold) {
this.sizeThreshold = sizeThreshold;
return this;
}
public WriterBuilder withFileExtension(String logFileExtension) {
this.fileExtension = logFileExtension;
return this;
}
public WriterBuilder withFileId(String fileId) {
this.logFileId = fileId;
return this;
}
public WriterBuilder overBaseCommit(String baseCommit) {
this.commitTime = baseCommit;
return this;
}
public WriterBuilder withLogVersion(int version) {
this.logVersion = version;
return this;
}
public WriterBuilder onParentPath(Path parentPath) {
this.parentPath = parentPath;
return this;
}
public Writer build() throws IOException, InterruptedException {
log.info("Building HoodieLogFormat Writer");
if (fs == null) {
fs = FSUtils.getFs();
}
if (logFileId == null) {
throw new IllegalArgumentException("FileID is not specified");
}
if (commitTime == null) {
throw new IllegalArgumentException("BaseCommitTime is not specified");
}
if (fileExtension == null) {
throw new IllegalArgumentException("File extension is not specified");
}
if (parentPath == null) {
throw new IllegalArgumentException("Log file parent location is not specified");
}
if (logVersion == null) {
log.info("Computing the next log version for " + logFileId + " in " + parentPath);
logVersion =
FSUtils.getCurrentLogVersion(fs, parentPath, logFileId, fileExtension, commitTime);
log.info(
"Computed the next log version for " + logFileId + " in " + parentPath + " as "
+ logVersion);
}
Path logPath = new Path(parentPath,
FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion));
log.info("HoodieLogFile on path " + logPath);
HoodieLogFile logFile = new HoodieLogFile(logPath);
if (bufferSize == null) {
bufferSize = FSUtils.getDefaultBufferSize(fs);
}
if (replication == null) {
replication = FSUtils.getDefaultReplication(fs, parentPath);
}
if (sizeThreshold == null) {
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
}
return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold);
}
}
static WriterBuilder newWriterBuilder() {
return new WriterBuilder();
}
static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema)
throws IOException {
return new HoodieLogFormatReader(fs, logFile, readerSchema);
}
}

View File

@@ -0,0 +1,201 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import com.uber.hoodie.exception.CorruptedLogFileException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Scans a log file and provides block level iterator on the log file
* Loads the entire block contents in memory
* Can emit either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one is found)
*/
public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private static final int DEFAULT_BUFFER_SIZE = 4096;
private final static Logger log = LogManager.getLogger(HoodieLogFormatReader.class);
private final FSDataInputStream inputStream;
private final HoodieLogFile logFile;
private static final byte[] magicBuffer = new byte[4];
private final Schema readerSchema;
private HoodieLogBlock nextBlock = null;
HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize) throws IOException {
this.inputStream = fs.open(logFile.getPath(), bufferSize);
this.logFile = logFile;
this.readerSchema = readerSchema;
}
HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE);
}
@Override
public HoodieLogFile getLogFile() {
return logFile;
}
private HoodieLogBlock readBlock() throws IOException {
// 2. Read the block type
int ordinal = inputStream.readInt();
Preconditions.checkArgument(ordinal < HoodieLogBlockType.values().length,
"Invalid block byte ordinal found " + ordinal);
HoodieLogBlockType blockType = HoodieLogBlockType.values()[ordinal];
// 3. Read the size of the block
int blocksize = inputStream.readInt();
// We may have had a crash which could have written this block partially
// Skip blocksize in the stream and we should either find a sync marker (start of the next block) or EOF
// If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupt(blocksize);
if(isCorrupted) {
return createCorruptBlock();
}
// 4. Read the content
// TODO - have a max block size and reuse this buffer in the ByteBuffer (hard to guess max block size for now)
byte[] content = new byte[blocksize];
inputStream.readFully(content, 0, blocksize);
switch (blockType) {
// based on type read the block
case AVRO_DATA_BLOCK:
return HoodieAvroDataBlock.fromBytes(content, readerSchema);
case DELETE_BLOCK:
return HoodieDeleteBlock.fromBytes(content);
case COMMAND_BLOCK:
return HoodieCommandBlock.fromBytes(content);
default:
throw new HoodieNotSupportedException("Unsupported Block " + blockType);
}
}
private HoodieLogBlock createCorruptBlock() throws IOException {
log.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
long currentPos = inputStream.getPos();
long nextBlockOffset = scanForNextAvailableBlockOffset();
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
inputStream.seek(currentPos);
log.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
byte[] content = new byte[corruptedBlockSize];
inputStream.readFully(content, 0, corruptedBlockSize);
return HoodieCorruptBlock.fromBytes(content);
}
private boolean isBlockCorrupt(int blocksize) throws IOException {
long currentPos = inputStream.getPos();
try {
inputStream.seek(currentPos + blocksize);
} catch (EOFException e) {
// this is corrupt
return true;
}
try {
readMagic();
// all good - either we found the sync marker or EOF. Reset position and continue
return false;
} catch (CorruptedLogFileException e) {
// This is a corrupted block
return true;
} finally {
inputStream.seek(currentPos);
}
}
private long scanForNextAvailableBlockOffset() throws IOException {
while(true) {
long currentPos = inputStream.getPos();
try {
boolean isEOF = readMagic();
return isEOF ? inputStream.getPos() : currentPos;
} catch (CorruptedLogFileException e) {
// No luck - advance and try again
inputStream.seek(currentPos + 1);
}
}
}
@Override
public void close() throws IOException {
this.inputStream.close();
}
@Override
/**
* hasNext is not idempotent. TODO - Fix this. It is okay for now - PR
*/
public boolean hasNext() {
try {
boolean isEOF = readMagic();
if (isEOF) {
return false;
}
this.nextBlock = readBlock();
return nextBlock != null;
} catch (IOException e) {
throw new HoodieIOException("IOException when reading logfile " + logFile, e);
}
}
private boolean readMagic() throws IOException {
try {
// 1. Read magic header from the start of the block
inputStream.readFully(magicBuffer, 0, 4);
if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) {
throw new CorruptedLogFileException(
logFile + "could not be read. Did not find the magic bytes at the start of the block");
}
return false;
} catch (EOFException e) {
// We have reached the EOF
return true;
}
}
@Override
public HoodieLogBlock next() {
if(nextBlock == null) {
// may be hasNext is not called
hasNext();
}
return nextBlock;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported for HoodieLogFormatReader");
}
}

View File

@@ -0,0 +1,164 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.HoodieLogFormat.WriterBuilder;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieException;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* HoodieLogFormatWriter can be used to append blocks to a log file
* Use HoodieLogFormat.WriterBuilder to construct
*/
public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
private final static Logger log = LogManager.getLogger(HoodieLogFormatWriter.class);
private final HoodieLogFile logFile;
private final FileSystem fs;
private final long sizeThreshold;
private final Integer bufferSize;
private final Short replication;
private FSDataOutputStream output;
/**
*
* @param fs
* @param logFile
* @param bufferSize
* @param replication
* @param sizeThreshold
*/
HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize,
Short replication, Long sizeThreshold)
throws IOException, InterruptedException {
this.fs = fs;
this.logFile = logFile;
this.sizeThreshold = sizeThreshold;
this.bufferSize = bufferSize;
this.replication = replication;
Path path = logFile.getPath();
if (fs.exists(path)) {
log.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
// this happens when either another task executor writing to this file died or data node is going down
if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())
&& fs instanceof DistributedFileSystem) {
log.warn("Trying to recover log on path " + path);
if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
log.warn("Recovered lease on path " + path);
// try again
this.output = fs.append(path, bufferSize);
} else {
log.warn("Failed to recover lease on path " + path);
throw new HoodieException(e);
}
}
}
} else {
log.info(logFile + " does not exist. Create a new file");
// Block size does not matter as we will always manually autoflush
this.output = fs.create(path, false, bufferSize, replication,
WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
// TODO - append a file level meta block
}
}
public FileSystem getFs() {
return fs;
}
public HoodieLogFile getLogFile() {
return logFile;
}
public long getSizeThreshold() {
return sizeThreshold;
}
@Override
public Writer appendBlock(HoodieLogBlock block)
throws IOException, InterruptedException {
byte[] content = block.getBytes();
// 1. write the magic header for the start of the block
this.output.write(HoodieLogFormat.MAGIC);
// 2. Write the block type
this.output.writeInt(block.getBlockType().ordinal());
// 3. Write the size of the block
this.output.writeInt(content.length);
// 4. Write the contents of the block
this.output.write(content);
// Flush every block to disk
flush();
// roll over if size is past the threshold
return rolloverIfNeeded();
}
private Writer rolloverIfNeeded() throws IOException, InterruptedException {
// Roll over if the size is past the threshold
if (getCurrentSize() > sizeThreshold) {
//TODO - make an end marker which seals the old log file (no more appends possible to that file).
log.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold
+ ". Rolling over to the next version");
HoodieLogFile newLogFile = logFile.rollOver(fs);
// close this writer and return the new writer
close();
return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold);
}
return this;
}
@Override
public void close() throws IOException {
flush();
output.close();
output = null;
}
private void flush() throws IOException {
if (output == null) {
return; // Presume closed
}
output.flush();
output.hflush();
}
public long getCurrentSize() throws IOException {
if(output == null) {
throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already");
}
return output.getPos();
}
}

View File

@@ -1,143 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.uber.hoodie.common.table.log.HoodieLogAppendConfig;
import com.uber.hoodie.common.table.log.HoodieLogAppender;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* AvroLogAppender appends a bunch of IndexedRecord to a Avro data file.
* If auto-flush is set, every call to append writes out a block.
* A avro block corresponds to records appended in a single commit.
*
* @see org.apache.avro.file.DataFileReader
*/
public class AvroLogAppender implements HoodieLogAppender<IndexedRecord> {
private final static Logger log = LogManager.getLogger(AvroLogAppender.class);
private final HoodieLogAppendConfig config;
private FSDataOutputStream output;
private DataFileWriter<IndexedRecord> writer;
private boolean autoFlush;
public AvroLogAppender(HoodieLogAppendConfig config) throws IOException, InterruptedException {
FileSystem fs = config.getFs();
this.config = config;
this.autoFlush = config.isAutoFlush();
GenericDatumWriter<IndexedRecord> datumWriter =
new GenericDatumWriter<>(config.getSchema());
this.writer = new DataFileWriter<>(datumWriter);
Path path = config.getLogFile().getPath();
if (fs.exists(path)) {
//TODO - check for log corruption and roll over if needed
log.info(config.getLogFile() + " exists. Appending to existing file");
// this log path exists, we will append to it
// fs = FileSystem.get(fs.getConf());
try {
this.output = fs.append(path, config.getBufferSize());
} catch (RemoteException e) {
// this happens when either another task executor writing to this file died or data node is going down
if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())
&& fs instanceof DistributedFileSystem) {
log.warn("Trying to recover log on path " + path);
if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
log.warn("Recovered lease on path " + path);
// try again
this.output = fs.append(path, config.getBufferSize());
} else {
log.warn("Failed to recover lease on path " + path);
throw new HoodieException(e);
}
}
}
this.writer
.appendTo(new FsInput(path, fs.getConf()), output);
// we always want to flush to disk everytime a avro block is written
this.writer.setFlushOnEveryBlock(true);
} else {
log.info(config.getLogFile() + " does not exist. Create a new file");
this.output = fs.create(path, false, config.getBufferSize(), config.getReplication(),
config.getBlockSize(), null);
this.writer.create(config.getSchema(), output);
this.writer.setFlushOnEveryBlock(true);
// We need to close the writer to be able to tell the name node that we created this file
// this.writer.close();
}
}
public void append(Iterator<IndexedRecord> records) throws IOException {
records.forEachRemaining(r -> {
try {
writer.append(r);
} catch (IOException e) {
throw new HoodieIOException(
"Could not append record " + r + " to " + config.getLogFile());
}
});
if (autoFlush) {
sync();
}
}
public void sync() throws IOException {
if (output == null || writer == null)
return; // Presume closed
writer.flush();
output.flush();
output.hflush();
}
public void close() throws IOException {
sync();
writer.close();
writer = null;
output.close();
output = null;
}
public long getCurrentSize() throws IOException {
if (writer == null) {
throw new IllegalStateException(
"LogWriter " + config.getLogFile() + " has been closed. Cannot getCurrentSize");
}
// writer.sync() returns only the offset for this block and not the global offset
return output.getPos();
}
}

View File

@@ -1,82 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.google.common.collect.Lists;
import com.uber.hoodie.common.table.log.HoodieLogAppender;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.AvroFSInput;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* AvroLogReader allows reading blocks of records given a offset as written by AvroLogAppender
* Avro Log files are never streamed entirely - because of fault tolerance.
* If a block is corrupted, then random access with offset bypasses any corrupt blocks.
* Metadata about offset should be saved when writing blocks and passed in readBlock()
*
* @see AvroLogAppender
*/
public class AvroLogReader {
private final DataFileReader<GenericRecord> reader;
private final HoodieLogFile file;
public AvroLogReader(HoodieLogFile file, FileSystem fs, Schema readerSchema)
throws IOException {
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
datumReader.setExpected(readerSchema);
final AvroFSInput input = new AvroFSInput(FileContext.getFileContext(fs.getConf()), file.getPath());
this.reader = (DataFileReader<GenericRecord>) DataFileReader.openReader(input, datumReader);
this.file = file;
}
public Iterator<GenericRecord> readBlock(long startOffset) throws IOException {
// We keep track of exact offset for blocks, just seek to it directly
reader.seek(startOffset);
List<GenericRecord> records = Lists.newArrayList();
try {
// First check if we are past the sync market and then check reader.hasNext,
// hasNext will load a block in memory and this will fail if a block is corrupted.
while (!reader.pastSync(startOffset) && reader.hasNext()) {
records.add(reader.next());
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read avro records from " + file);
}
return records.iterator();
}
public HoodieLogFile getFile() {
return file;
}
public void close() throws IOException {
reader.close();
}
}

View File

@@ -1,155 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* CompositeAvroLogReader reads all versions of the logs for a given fileId.
* It gives a iterator which iterates through all the versions and the list of blocks for that specific version
* Useful for merging records in RecordReader and compacting all the delta versions
*
* @see AvroLogReader
*/
public class CompositeAvroLogReader {
private final Map<Integer, AvroLogReader> readers;
public CompositeAvroLogReader(Path partitionPath, String fileId, String baseCommitTime, FileSystem fs,
Schema readerSchema, String logFileExtension) throws IOException {
Stream<HoodieLogFile> allLogFiles =
FSUtils.getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
this.readers = allLogFiles.map(hoodieLogFile -> {
try {
return new AvroLogReader(hoodieLogFile, fs, readerSchema);
} catch (IOException e) {
throw new HoodieIOException(
"Could not read avro records from path " + hoodieLogFile);
}
}).collect(Collectors.toMap(new Function<AvroLogReader, Integer>() {
@Override
public Integer apply(AvroLogReader avroLogReader) {
return avroLogReader.getFile().getLogVersion();
}
}, Function.identity()));
}
/**
* Reads all the versions (in the order specified) and all the blocks starting with the offset specified
*
* @param filesToOffsetMap
* @return
* @throws IOException
*/
public Iterator<GenericRecord> readBlocks(SortedMap<Integer, List<Long>> filesToOffsetMap)
throws IOException {
return new Iterators(filesToOffsetMap, readers);
}
public void close() throws IOException {
readers.values().forEach(s -> {
try {
s.close();
} catch (IOException e) {
throw new HoodieIOException("Unable to close " + s.getFile(), e);
}
});
}
public class Iterators implements Iterator<GenericRecord> {
private final Map<Integer, AvroLogReader> readers;
private final Map<Integer, List<Long>> versionsToOffsetMap;
private Integer currentVersion;
private Iterator<Integer> currentVersionIterator;
private Iterator<Long> currentOffsetIterator;
private Iterator<GenericRecord> currentRecordIterator;
public Iterators(Map<Integer, List<Long>> versionToOffsetMap,
Map<Integer, AvroLogReader> readers) {
this.currentVersionIterator = versionToOffsetMap.keySet().iterator();
this.readers = readers;
this.versionsToOffsetMap = versionToOffsetMap;
}
private Iterator<GenericRecord> findNextBlock() throws IOException {
if (currentOffsetIterator != null) {
while (currentOffsetIterator.hasNext()) {
// we have more offsets to process for this file
long currentOffset = currentOffsetIterator.next();
Iterator<GenericRecord> currentBlock =
readers.get(currentVersion).readBlock(currentOffset);
if (currentBlock.hasNext()) {
return currentBlock;
}
}
}
return null;
}
private Iterator<GenericRecord> findNext() {
try {
Iterator<GenericRecord> nextBlock = findNextBlock();
if (nextBlock != null) {
// we have more offsets to process for this version
return nextBlock;
}
// We have no more offsets to process for the version, lets move on to the next version
while (currentVersionIterator.hasNext()) {
currentVersion = currentVersionIterator.next();
currentOffsetIterator = versionsToOffsetMap.get(currentVersion).iterator();
nextBlock = findNextBlock();
if (nextBlock != null) {
return nextBlock;
}
}
} catch (IOException e) {
throw new HoodieIOException(
"Could not read avro records from " + readers.get(currentVersion).getFile());
}
return null;
}
@Override
public boolean hasNext() {
if (currentRecordIterator == null || !currentRecordIterator.hasNext()) {
currentRecordIterator = findNext();
}
return (currentRecordIterator != null && currentRecordIterator.hasNext());
}
@Override
public GenericRecord next() {
return currentRecordIterator.next();
}
}
}

View File

@@ -1,97 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieAvroPayload;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.util.AvroUtils;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
/**
* This reads a bunch of HoodieRecords from avro log files and deduplicates and mantains the merged
* state in memory. This is useful for compaction and record reader
*/
public class HoodieAvroReader implements Iterable<HoodieRecord<HoodieAvroPayload>> {
private final Collection<HoodieRecord<HoodieAvroPayload>> records;
private AtomicLong totalLogFiles = new AtomicLong(0);
private AtomicLong totalLogRecords = new AtomicLong(0);
private long totalRecordsToUpdate;
public HoodieAvroReader(FileSystem fs, List<String> logFilePaths, Schema readerSchema) {
Map<String, HoodieRecord<HoodieAvroPayload>> records = Maps.newHashMap();
for (String path : logFilePaths) {
totalLogFiles.incrementAndGet();
List<HoodieRecord<HoodieAvroPayload>> recordsFromFile = AvroUtils
.loadFromFile(fs, path, readerSchema);
totalLogRecords.addAndGet(recordsFromFile.size());
for (HoodieRecord<HoodieAvroPayload> recordFromFile : recordsFromFile) {
String key = recordFromFile.getRecordKey();
if (records.containsKey(key)) {
// Merge and store the merged record
HoodieAvroPayload combinedValue = records.get(key).getData()
.preCombine(recordFromFile.getData());
records.put(key, new HoodieRecord<>(new HoodieKey(key, recordFromFile.getPartitionPath()),
combinedValue));
} else {
// Put the record as is
records.put(key, recordFromFile);
}
}
}
this.records = records.values();
this.totalRecordsToUpdate = records.size();
}
@Override
public Iterator<HoodieRecord<HoodieAvroPayload>> iterator() {
return records.iterator();
}
@Override
public void forEach(Consumer<? super HoodieRecord<HoodieAvroPayload>> consumer) {
records.forEach(consumer);
}
@Override
public Spliterator<HoodieRecord<HoodieAvroPayload>> spliterator() {
return records.spliterator();
}
public long getTotalLogFiles() {
return totalLogFiles.get();
}
public long getTotalLogRecords() {
return totalLogRecords.get();
}
public long getTotalRecordsToUpdate() {
return totalRecordsToUpdate;
}
}

View File

@@ -1,88 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.avro;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.table.log.HoodieLogAppendConfig;
import com.uber.hoodie.common.table.log.HoodieLogAppender;
import com.uber.hoodie.common.table.log.HoodieLogFile;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* Implementation of {@link HoodieLogAppender} to roll over the log file when the sizeThreshold is reached.
*/
public class RollingAvroLogAppender implements HoodieLogAppender<IndexedRecord> {
private static final Log LOG = LogFactory.getLog(RollingAvroLogAppender.class);
private AvroLogAppender logWriter;
private HoodieLogAppendConfig config;
public RollingAvroLogAppender(HoodieLogAppendConfig config)
throws IOException, InterruptedException {
// initialize
this.logWriter = new AvroLogAppender(config);
this.config = config;
rollOverIfNeeded();
}
private void rollOverIfNeeded() throws IOException, InterruptedException {
HoodieLogFile logFile = config.getLogFile();
boolean shouldRollOver = logFile.shouldRollOver(this, config);
if (shouldRollOver) {
if (logWriter != null) {
// Close the old writer and open a new one
logWriter.close();
}
// Current logWriter is not initialized, set the current file name
HoodieLogFile nextRollLogPath = logFile.rollOver(config.getFs());
LOG.info("Rolling over log from " + logFile + " to " + nextRollLogPath);
this.config = config.withLogFile(nextRollLogPath);
this.logWriter = new AvroLogAppender(this.config);
}
}
public long getCurrentSize() throws IOException {
Preconditions.checkArgument(logWriter != null);
return logWriter.getCurrentSize();
}
public void append(Iterator<IndexedRecord> records) throws IOException, InterruptedException {
LOG.info("Appending records to " + config.getLogFile());
rollOverIfNeeded();
Preconditions.checkArgument(logWriter != null);
logWriter.append(records);
}
public void sync() throws IOException {
Preconditions.checkArgument(logWriter != null);
logWriter.sync();
}
public void close() throws IOException {
Preconditions.checkArgument(logWriter != null);
logWriter.close();
}
public HoodieLogAppendConfig getConfig() {
return config;
}
}

View File

@@ -0,0 +1,134 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.block;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
/**
* DataBlock contains a list of records serialized using Avro.
* The Datablock contains
* 1. Compressed Writer Schema length
* 2. Compressed Writer Schema content
* 3. Total number of records in the block
* 4. Size of a record
* 5. Actual avro serialized content of the record
*/
public class HoodieAvroDataBlock implements HoodieLogBlock {
private List<IndexedRecord> records;
private Schema schema;
public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema) {
this.records = records;
this.schema = schema;
}
public List<IndexedRecord> getRecords() {
return records;
}
public Schema getSchema() {
return schema;
}
@Override
public byte[] getBytes() throws IOException {
GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
// 1. Compress and Write schema out
byte[] schemaContent = HoodieAvroUtils.compress(schema.toString());
output.writeInt(schemaContent.length);
output.write(schemaContent);
// 2. Write total number of records
output.writeInt(records.size());
// 3. Write the records
records.forEach(s -> {
ByteArrayOutputStream temp = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null);
try {
// Encode the record into bytes
writer.write(s, encoder);
encoder.flush();
// Get the size of the bytes
int size = temp.toByteArray().length;
// Write the record size
output.writeInt(size);
// Write the content
output.write(temp.toByteArray());
} catch (IOException e) {
throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e);
}
});
output.close();
return baos.toByteArray();
}
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.AVRO_DATA_BLOCK;
}
public static HoodieLogBlock fromBytes(byte[] content, Schema readerSchema) throws IOException {
// 1. Read the schema written out
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content));
int schemaLength = dis.readInt();
byte[] compressedSchema = new byte[schemaLength];
dis.readFully(compressedSchema, 0, schemaLength);
Schema writerSchema = new Schema.Parser().parse(HoodieAvroUtils.decompress(compressedSchema));
GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
// 2. Get the total records
int totalRecords = dis.readInt();
List<IndexedRecord> records = new ArrayList<>(totalRecords);
// 3. Read the content
for(int i=0;i<totalRecords;i++) {
// TODO - avoid bytes copy
int recordLength = dis.readInt();
byte[] recordData = new byte[recordLength];
dis.readFully(recordData, 0, recordLength);
Decoder decoder = DecoderFactory.get().binaryDecoder(recordData, null);
IndexedRecord record = reader.read(null, decoder);
records.add(record);
}
dis.close();
return new HoodieAvroDataBlock(records, readerSchema);
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.block;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Command block issues a specific command to the scanner
*/
public class HoodieCommandBlock implements HoodieLogBlock {
private final HoodieCommandBlockTypeEnum type;
public enum HoodieCommandBlockTypeEnum {ROLLBACK_PREVIOUS_BLOCK}
public HoodieCommandBlock(HoodieCommandBlockTypeEnum type) {
this.type = type;
}
@Override
public byte[] getBytes() throws IOException {
return ByteBuffer.allocate(4).putInt(type.ordinal()).array();
}
public HoodieCommandBlockTypeEnum getType() {
return type;
}
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.COMMAND_BLOCK;
}
public static HoodieLogBlock fromBytes(byte[] content) {
int ordinal = ByteBuffer.wrap(content).getInt();
return new HoodieCommandBlock(HoodieCommandBlockTypeEnum.values()[ordinal]);
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.block;
import java.io.IOException;
/**
* Corrupt block is emitted whenever the scanner finds the length of the block written at the
* beginning does not match (did not find a EOF or a sync marker after the length)
*/
public class HoodieCorruptBlock implements HoodieLogBlock {
private final byte[] corruptedBytes;
private HoodieCorruptBlock(byte[] corruptedBytes) {
this.corruptedBytes = corruptedBytes;
}
@Override
public byte[] getBytes() throws IOException {
return corruptedBytes;
}
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.CORRUPT_BLOCK;
}
public static HoodieLogBlock fromBytes(byte[] content) {
return new HoodieCorruptBlock(content);
}
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.block;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.commons.lang3.StringUtils;
/**
* Delete block contains a list of keys to be deleted from scanning the blocks so far
*/
public class HoodieDeleteBlock implements HoodieLogBlock {
private final String[] keysToDelete;
public HoodieDeleteBlock(String[] keysToDelete) {
this.keysToDelete = keysToDelete;
}
@Override
public byte[] getBytes() throws IOException {
return StringUtils.join(keysToDelete, ',').getBytes(Charset.forName("utf-8"));
}
public String[] getKeysToDelete() {
return keysToDelete;
}
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.DELETE_BLOCK;
}
public static HoodieLogBlock fromBytes(byte[] content) {
return new HoodieDeleteBlock(new String(content).split(","));
}
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.block;
import java.io.IOException;
/**
* Abstract interface defining a block in HoodieLogFile
*/
public interface HoodieLogBlock {
byte[] getBytes() throws IOException;
HoodieLogBlockType getBlockType();
/**
* Type of the log block
* WARNING: This enum is serialized as the ordinal. Only add new enums at the end.
*/
enum HoodieLogBlockType {
COMMAND_BLOCK,
DELETE_BLOCK,
CORRUPT_BLOCK,
AVRO_DATA_BLOCK
}
}

View File

@@ -52,8 +52,8 @@ import java.util.stream.Stream;
public class FSUtils {
private static final Logger LOG = LogManager.getLogger(FSUtils.class);
// Log files are of this pattern - b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.avro.delta.1
private static final Pattern LOG_FILE_PATTERN = Pattern.compile("(.*)_(.*)\\.(.*)\\.(.*)\\.([0-9]*)");
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
private static final Pattern LOG_FILE_PATTERN = Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)");
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final long MIN_CLEAN_TO_KEEP = 10;
private static final long MIN_ROLLBACK_TO_KEEP = 10;
@@ -188,7 +188,7 @@ public class FSUtils {
if (!matcher.find()) {
throw new InvalidHoodiePathException(logPath, "LogFile");
}
return matcher.group(3) + "." + matcher.group(4);
return matcher.group(3);
}
/**
@@ -223,12 +223,12 @@ public class FSUtils {
if (!matcher.find()) {
throw new InvalidHoodiePathException(logPath, "LogFile");
}
return Integer.parseInt(matcher.group(5));
return Integer.parseInt(matcher.group(4));
}
public static String makeLogFileName(String fileId, String logFileExtension,
String baseCommitTime, int version) {
return String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
String baseCommitTime, int version) {
return "." + String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
}
public static String maskWithoutLogVersion(String commitTime, String fileId, String logFileExtension) {
@@ -251,8 +251,8 @@ public class FSUtils {
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
return Arrays.stream(fs.listStatus(partitionPath,
path -> path.getName().startsWith(fileId) && path.getName().contains(logFileExtension)))
.map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension)))
.map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime));
}
/**

View File

@@ -18,7 +18,13 @@ package com.uber.hoodie.common.util;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.SchemaCompatabilityException;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.*;
import org.apache.avro.io.BinaryEncoder;
@@ -137,4 +143,30 @@ public class HoodieAvroUtils {
}
return newRecord;
}
public static byte[] compress(String text) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
OutputStream out = new DeflaterOutputStream(baos);
out.write(text.getBytes("UTF-8"));
out.close();
} catch (IOException e) {
throw new HoodieIOException("IOException while compressing text " + text, e);
}
return baos.toByteArray();
}
public static String decompress(byte[] bytes) {
InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
byte[] buffer = new byte[8192];
int len;
while((len = in.read(buffer))>0)
baos.write(buffer, 0, len);
return new String(baos.toByteArray(), "UTF-8");
} catch (IOException e) {
throw new HoodieIOException("IOException while decompressing text", e);
}
}
}

View File

@@ -0,0 +1,24 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.exception;
public class CorruptedLogFileException extends HoodieException {
public CorruptedLogFileException(String msg) {
super(msg);
}
}