1
0

Introducing HoodieLogFormat V2 with versioning support

- HoodieLogFormat V2 has support for LogFormat evolution through versioning
			- LogVersion is associated with a LogBlock not a LogFile
			- Based on a version for a LogBlock, approporiate code path is executed
		- Implemented LazyReading of Hoodie Log Blocks with Memory / IO tradeoff
		- Implemented Reverse pointer to be able to traverse the log in reverse
		- Introduce new MAGIC for backwards compatibility with logs without versions
This commit is contained in:
Nishith Agarwal
2018-02-15 11:01:25 -08:00
committed by vinoth chandar
parent dfd1979c51
commit 5405a6287b
32 changed files with 2066 additions and 677 deletions

View File

@@ -23,7 +23,7 @@ topnav_dropdowns:
url: /api_docs.html
output: web
- title: Code Structure
url: /code_structure.html
url: /code_and_design.html
output: web
- title: Roadmap
url: /roadmap.html

38
docs/code_and_design.md Normal file
View File

@@ -0,0 +1,38 @@
---
title: Code Structure
keywords: usecases
sidebar: mydoc_sidebar
permalink: code_and_design.html
---
## Code & Project Structure
* hoodie-client : Spark client library to take a bunch of inserts + updates and apply them to a Hoodie table
* hoodie-common : Common code shared between different artifacts of Hoodie
## HoodieLogFormat
The following diagram depicts the LogFormat for Hoodie MergeOnRead. Each logfile consists of one or more log blocks.
Each logblock follows the format shown below.
| Field | Description |
|-------------- |------------------|
| MAGIC | A magic header that marks the start of a block |
| VERSION | The version of the LogFormat, this helps define how to switch between different log format as it evolves |
| TYPE | The type of the log block |
| HEADER LENGTH | The length of the headers, 0 if no headers |
| HEADER | Metadata needed for a log block. For eg. INSTANT_TIME, TARGET_INSTANT_TIME, SCHEMA etc. |
| CONTENT LENGTH | The length of the content of the log block |
| CONTENT | The content of the log block, for example, for a DATA_BLOCK, the content is (number of records + actual records) in byte [] |
| FOOTER LENGTH | The length of the footers, 0 if no footers |
| FOOTER | Metadata needed for a log block. For eg. index entries, a bloom filter for records in a DATA_BLOCK etc. |
| LOGBLOCK LENGTH | The total number of bytes written for a log block, typically the SUM(everything_above). This is a LONG. This acts as a reverse pointer to be able to traverse the log in reverse.|
{% include image.html file="hoodie_log_format_v2.png" alt="hoodie_log_format_v2.png" %}

View File

@@ -1,16 +0,0 @@
---
title: Code Structure
keywords: usecases
sidebar: mydoc_sidebar
permalink: code_structure.html
---
## Code & Project Structure
* hoodie-client : Spark client library to take a bunch of inserts + updates and apply them to a Hoodie table
* hoodie-common : Common code shared between different artifacts of Hoodie

View File

@@ -74,6 +74,14 @@ summary: "Here we list all possible configurations and what they mean"
<span style="color:grey">Should hoodie dynamically compute the insertSplitSize based on the last 24 commit's metadata. Turned off by default. </span>
- [approxRecordSize](#approxRecordSize) () <br/>
<span style="color:grey">The average record size. If specified, hoodie will use this and not compute dynamically based on the last 24 commit's metadata. No value set as default. This is critical in computing the insert parallelism and bin-packing inserts into small files. See above.</span>
- [withCompactionLazyBlockReadEnabled](#withCompactionLazyBlockReadEnabled) (true) <br/>
<span style="color:grey">When a CompactedLogScanner merges all log files, this config helps to choose whether the logblocks should be read lazily or not. Choose true to use I/O intensive lazy block reading (low memory usage) or false for Memory intensive immediate block read (high memory usage)</span>
- [withMaxNumDeltaCommitsBeforeCompaction](#withMaxNumDeltaCommitsBeforeCompaction) (maxNumDeltaCommitsBeforeCompaction = 10) <br/>
<span style="color:grey">Number of max delta commits to keep before triggering an inline compaction</span>
- [withCompactionReverseLogReadEnabled](#withCompactionReverseLogReadEnabled) (false) <br/>
<span style="color:grey">HoodieLogFormatReader reads a logfile in the forward direction starting from pos=0 to pos=file_length. If this config is set to true, the Reader reads the logfile in reverse direction, from pos=file_length to pos=0</span>
- [withMaxMemorySizePerCompactionInBytes](#withMaxMemorySizePerCompactionInBytes) (maxMemorySizePerCompactionInBytes = 1GB) <br/>
<span style="color:grey">HoodieCompactedLogScanner reads logblocks, converts records to HoodieRecords and then merges these log blocks and records. At any point, the number of entries in a log block can be less than or equal to the number of entries in the corresponding parquet file. This can lead to OOM in the Scanner. Hence, a spillable map helps alleviate the memory pressure. Use this config to set the max allowable inMemory footprint of the spillable map.</span>
- [withMetricsConfig](#withMetricsConfig) (HoodieMetricsConfig) <br/>
<span style="color:grey">Hoodie publishes metrics on every commit, clean, rollback etc.</span>

Binary file not shown.

After

Width:  |  Height:  |  Size: 218 KiB

View File

@@ -58,11 +58,12 @@ public class ArchivedCommitsCommand implements CommandMarker {
FileStatus[] fsStatuses = FSUtils.getFs(basePath, HoodieCLI.conf)
.globStatus(new Path(basePath + "/.hoodie/.commits_.archive*"));
List<String[]> allCommits = new ArrayList<>();
int commits = 0;
for (FileStatus fs : fsStatuses) {
//read the archived file
HoodieLogFormat.Reader reader = HoodieLogFormat
.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(), false);
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
List<IndexedRecord> readRecords = new ArrayList<>();
//read the avro blocks
@@ -70,10 +71,17 @@ public class ArchivedCommitsCommand implements CommandMarker {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> records = blk.getRecords();
readRecords.addAll(records);
if(commits == limit) {
break;
}
commits++;
}
List<String[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.map(r -> readCommit(r)).limit(limit).collect(Collectors.toList());
.map(r -> readCommit(r)).collect(Collectors.toList());
allCommits.addAll(readCommits);
if(commits == limit) {
break;
}
}
return HoodiePrintHelper.print(
new String[]{"CommitTime", "CommitType", "CommitDetails"},

View File

@@ -105,6 +105,15 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
// Default memory size per compaction, excess spills to disk
public static final String DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES = String.valueOf(1024*1024*1024L); //1GB
// used to choose a trade off between IO vs Memory when performing compaction process
// Depending on outputfile_size and memory provided, choose true to avoid OOM for large file size + small memory
public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "hoodie.compaction.lazy.block.read";
public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "false";
// used to choose whether to enable reverse log reading (reverse log traversal)
public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = "hoodie.compaction.reverse.log.read";
public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false";
private HoodieCompactionConfig(Properties props) {
super(props);
}
@@ -225,6 +234,18 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
return this;
}
public Builder withCompactionLazyBlockReadEnabled(Boolean compactionLazyBlockReadEnabled) {
props.setProperty(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
String.valueOf(compactionLazyBlockReadEnabled));
return this;
}
public Builder withCompactionReverseLogReadEnabled(Boolean compactionReverseLogReadEnabled) {
props.setProperty(COMPACTION_REVERSE_LOG_READ_ENABLED_PROP,
String.valueOf(compactionReverseLogReadEnabled));
return this;
}
public HoodieCompactionConfig build() {
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP),
@@ -262,6 +283,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
TARGET_IO_PER_COMPACTION_IN_MB_PROP, DEFAULT_TARGET_IO_PER_COMPACTION_IN_MB);
setDefaultOnCondition(props, !props.containsKey(MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP),
MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP, DEFAULT_MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES);
setDefaultOnCondition(props, !props.containsKey(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP),
COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED);
setDefaultOnCondition(props, !props.containsKey(COMPACTION_REVERSE_LOG_READ_ENABLED_PROP),
COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED);
HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));
Preconditions.checkArgument(

View File

@@ -24,14 +24,14 @@ import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
import com.uber.hoodie.metrics.MetricsReporterType;
import org.apache.spark.storage.StorageLevel;
import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import javax.annotation.concurrent.Immutable;
import org.apache.spark.storage.StorageLevel;
/**
* Class storing configs for the {@link com.uber.hoodie.HoodieWriteClient}
@@ -215,6 +215,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
.parseLong(props.getProperty(HoodieCompactionConfig.MAX_SIZE_IN_MEMORY_PER_COMPACTION_IN_BYTES_PROP));
}
public Boolean getCompactionLazyBlockReadEnabled() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP));
}
public Boolean getCompactionReverseLogReadEnabled() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP));
}
/**
* index properties
**/

View File

@@ -159,11 +159,14 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
return Optional.empty();
}
// TODO (NA) - Perform a schema check of current input record with the last schema on log file
// to make sure we don't append records with older (shorter) schema than already appended
public void doAppend() {
int maxBlockSize = config.getLogFileDataBlockMaxSize(); int numberOfRecords = 0;
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, commitTime);
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
while (recordItr.hasNext()) {
HoodieRecord record = recordItr.next();
// update the new location of the record, so we know where to find it next
@@ -178,7 +181,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
// Recompute averageRecordSize before writing a new block and update existing value with avg of new and old
logger.info("AvgRecordSize => " + averageRecordSize);
averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record))/2;
doAppend(metadata);
doAppend(header);
numberOfRecords = 0;
}
Optional<IndexedRecord> indexedRecord = getIndexedRecord(record);
@@ -189,18 +192,18 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
}
numberOfRecords++;
}
doAppend(metadata);
doAppend(header);
}
private void doAppend(Map<HoodieLogBlock.LogMetadataType, String> metadata) {
private void doAppend(Map<HoodieLogBlock.HeaderMetadataType, String> header) {
try {
if (recordList.size() > 0) {
writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, schema, metadata));
writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
recordList.clear();
}
if (keysToDelete.size() > 0) {
writer = writer.appendBlock(
new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new), metadata));
new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new), header));
keysToDelete.clear();
}
} catch (Exception e) {

View File

@@ -18,6 +18,7 @@ package com.uber.hoodie.io;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry;
import com.uber.hoodie.avro.model.HoodieCleanMetadata;
@@ -30,6 +31,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.AvroUtils;
@@ -39,6 +41,7 @@ import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -47,6 +50,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -190,7 +194,9 @@ public class HoodieCommitArchiveLog {
for (HoodieInstant hoodieInstant : instants) {
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
}
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, wrapperSchema);
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
this.writer = writer.appendBlock(block);
} catch (Exception e) {
throw new HoodieCommitException("Failed to archive commits", e);

View File

@@ -154,7 +154,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs,
metaClient.getBasePath(),
operation.getDeltaFilePaths(), readerSchema, maxInstantTime, config.getMaxMemorySizePerCompactionInBytes());
operation.getDeltaFilePaths(), readerSchema, maxInstantTime, config.getMaxMemorySizePerCompactionInBytes(),
config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled());
if (!scanner.iterator().hasNext()) {
return Lists.newArrayList();
}

View File

@@ -265,14 +265,15 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Long numRollbackBlocks = 0L;
// generate metadata
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME,
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
// if update belongs to an existing log file
writer = writer.appendBlock(new HoodieCommandBlock(
HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK,
metadata));
header));
numRollbackBlocks++;
filesToNumBlocksRollback
.put(this.getMetaClient().getFs().getFileStatus(writer.getLogFile().getPath()),

View File

@@ -122,7 +122,7 @@ public class TestHoodieCommitArchiveLog {
//read the file
HoodieLogFormat.Reader reader = HoodieLogFormat
.newReader(fs, new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")),
HoodieArchivedMetaEntry.getClassSchema(), false);
HoodieArchivedMetaEntry.getClassSchema());
int archivedRecordsCount = 0;
List<IndexedRecord> readRecords = new ArrayList<>();

View File

@@ -101,6 +101,19 @@ public class HoodieLogFile implements Serializable {
};
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HoodieLogFile that = (HoodieLogFile) o;
return path != null ? path.equals(that.path) : that.path == null;
}
@Override
public int hashCode() {
return path != null ? path.hashCode() : 0;
}
@Override
public String toString() {
return "HoodieLogFile {" + path + '}';

View File

@@ -30,6 +30,7 @@ import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.SpillableMapUtils;
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
import com.uber.hoodie.exception.HoodieIOException;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -48,14 +49,25 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;
import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.LogMetadataType.INSTANT_TIME;
/**
* Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of
* records which will be used as a lookup table when merging the base columnar file with the redo
* log file.
* NOTE: If readBlockLazily is turned on, does not merge, instead keeps reading log blocks and merges everything at once
* This is an optimization to avoid seek() back and forth to read new block (forward seek())
* and lazily read content of seen block (reverse and forward seek()) during merge
* | | Read Block 1 Metadata | | Read Block 1 Data |
* | | Read Block 2 Metadata | | Read Block 2 Data |
* | I/O Pass 1 | ..................... | I/O Pass 2 | ................. |
* | | Read Block N Metadata | | Read Block N Data |
*
* This results in two I/O passes over the log file.
*
*/
public class HoodieCompactedLogRecordScanner implements
Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
@@ -77,10 +89,11 @@ public class HoodieCompactedLogRecordScanner implements
// Merge strategy to use when combining records from log
private String payloadClassFQN;
// Store the last instant log blocks (needed to implement rollback)
Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes) {
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes,
boolean readBlocksLazily, boolean reverseReader) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
@@ -88,137 +101,141 @@ public class HoodieCompactedLogRecordScanner implements
this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass();
try {
// Store merged records for all versions for this log file, set the maxInMemoryMapSize to half,
// assign other half to the temporary map needed to read next block
records = new ExternalSpillableMap<>(maxMemorySizeInBytes, readerSchema,
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, readerSchema,
payloadClassFQN, Optional.empty());
// iterate over the paths
Iterator<String> logFilePathsItr = logFilePaths.iterator();
while (logFilePathsItr.hasNext()) {
HoodieLogFile logFile = new HoodieLogFile(new Path(logFilePathsItr.next()));
log.info("Scanning log file " + logFile.getPath());
HoodieLogFormatReader logFormatReaderWrapper =
new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile)))
.collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader);
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
log.info("Scanning log file " + logFile);
totalLogFiles.incrementAndGet();
// Use the HoodieLogFormatReader to iterate through the blocks in the log file
HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, logFile, readerSchema, true);
while (reader.hasNext()) {
HoodieLogBlock r = reader.next();
if (r.getBlockType() != CORRUPT_BLOCK &&
!HoodieTimeline.compareTimestamps(r.getLogMetadata().get(INSTANT_TIME), this.latestInstantTime,
HoodieTimeline.LESSER_OR_EQUAL)) {
//hit a block with instant time greater than should be processed, stop processing further
break;
}
switch (r.getBlockType()) {
case AVRO_DATA_BLOCK:
log.info("Reading a data block from file " + logFile.getPath());
// Consider the following scenario
// (Time 0, C1, Task T1) -> Running
// (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct DataBlock (B1) with commitTime C1
// (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2)
// (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2)
// Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same.
// Say, commit C1 eventually failed and a rollback is triggered.
// Rollback will write only 1 rollback block (R1) since it assumes one block is written per ingestion batch for a file,
// but in reality we need to rollback (B1 & B2)
// The following code ensures the same rollback block (R1) is used to rollback both B1 & B2
if(isNewInstantBlock(r)) {
// If this is a avro data block, then merge the last block records into the main result
merge(records, currentInstantLogBlocks);
}
// store the current block
currentInstantLogBlocks.push(r);
break;
case DELETE_BLOCK:
log.info("Reading a delete block from file " + logFile.getPath());
if (isNewInstantBlock(r)) {
// Block with the keys listed as to be deleted, data and delete blocks written in different batches
// so it is safe to merge
// This is a delete block, so lets merge any records from previous data block
merge(records, currentInstantLogBlocks);
}
// store deletes so can be rolled back
currentInstantLogBlocks.push(r);
break;
case COMMAND_BLOCK:
log.info("Reading a command block from file " + logFile.getPath());
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
String targetInstantForCommandBlock = r.getLogMetadata()
.get(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME);
switch (commandBlock.getType()) { // there can be different types of command blocks
case ROLLBACK_PREVIOUS_BLOCK:
// Rollback the last read log block
// Get commit time from last record block, compare with targetCommitTime, rollback only if equal,
// this is required in scenarios of invalid/extra rollback blocks written due to failures during
// the rollback operation itself and ensures the same rollback block (R1) is used to rollback
// both B1 & B2 with same instant_time
int numBlocksRolledBack = 0;
while(!currentInstantLogBlocks.isEmpty()) {
HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
// handle corrupt blocks separately since they may not have metadata
if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
log.info(
"Rolling back the last corrupted log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
}
// rollback last data block or delete block
else if (lastBlock.getBlockType() != CORRUPT_BLOCK &&
targetInstantForCommandBlock
.contentEquals(lastBlock.getLogMetadata().get(INSTANT_TIME))) {
log.info("Rolling back the last log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
}
// invalid or extra rollback block
else if(!targetInstantForCommandBlock
.contentEquals(currentInstantLogBlocks.peek().getLogMetadata().get(INSTANT_TIME))) {
log.warn("Invalid or extra rollback command block in " + logFile.getPath());
break;
}
// this should not happen ideally
else {
log.warn("Unable to apply rollback command block in " + logFile.getPath());
}
}
log.info("Number of applied rollback blocks " + numBlocksRolledBack);
break;
}
break;
case CORRUPT_BLOCK:
log.info("Found a corrupt block in " + logFile.getPath());
// If there is a corrupt block - we will assume that this was the next data block
currentInstantLogBlocks.push(r);
break;
}
// Use the HoodieLogFileReader to iterate through the blocks in the log file
HoodieLogBlock r = logFormatReaderWrapper.next();
if (r.getBlockType() != CORRUPT_BLOCK &&
!HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME),
this.latestInstantTime,
HoodieTimeline.LESSER_OR_EQUAL)) {
//hit a block with instant time greater than should be processed, stop processing further
break;
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
log.info("Merging the final blocks in " + logFile.getPath());
merge(records, currentInstantLogBlocks);
switch (r.getBlockType()) {
case AVRO_DATA_BLOCK:
log.info("Reading a data block from file " + logFile.getPath());
if (isNewInstantBlock(r) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
merge(records, currentInstantLogBlocks);
}
// store the current block
currentInstantLogBlocks.push(r);
break;
case DELETE_BLOCK:
log.info("Reading a delete block from file " + logFile.getPath());
if (isNewInstantBlock(r) && !readBlocksLazily) {
// If this is a delete data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
merge(records, currentInstantLogBlocks);
}
// store deletes so can be rolled back
currentInstantLogBlocks.push(r);
break;
case COMMAND_BLOCK:
// Consider the following scenario
// (Time 0, C1, Task T1) -> Running
// (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct DataBlock (B1) with commitTime C1
// (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2)
// (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2)
// Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same.
// Say, commit C1 eventually failed and a rollback is triggered.
// Rollback will write only 1 rollback block (R1) since it assumes one block is written per ingestion batch for a file,
// but in reality we need to rollback (B1 & B2)
// The following code ensures the same rollback block (R1) is used to rollback both B1 & B2
log.info("Reading a command block from file " + logFile.getPath());
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
String targetInstantForCommandBlock = r.getLogBlockHeader()
.get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
switch (commandBlock.getType()) { // there can be different types of command blocks
case ROLLBACK_PREVIOUS_BLOCK:
// Rollback the last read log block
// Get commit time from last record block, compare with targetCommitTime, rollback only if equal,
// this is required in scenarios of invalid/extra rollback blocks written due to failures during
// the rollback operation itself and ensures the same rollback block (R1) is used to rollback
// both B1 & B2 with same instant_time
int numBlocksRolledBack = 0;
while (!currentInstantLogBlocks.isEmpty()) {
HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
// handle corrupt blocks separately since they may not have metadata
if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
log.info(
"Rolling back the last corrupted log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
}
// rollback last data block or delete block
else if (lastBlock.getBlockType() != CORRUPT_BLOCK &&
targetInstantForCommandBlock
.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
log.info("Rolling back the last log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
}
// invalid or extra rollback block
else if (!targetInstantForCommandBlock
.contentEquals(
currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
log.warn("TargetInstantTime " + targetInstantForCommandBlock +
" invalid or extra rollback command block in " + logFile.getPath());
break;
}
// this should not happen ideally
else {
log.warn("Unable to apply rollback command block in " + logFile.getPath());
}
}
log.info("Number of applied rollback blocks " + numBlocksRolledBack);
break;
}
break;
case CORRUPT_BLOCK:
log.info("Found a corrupt block in " + logFile.getPath());
// If there is a corrupt block - we will assume that this was the next data block
currentInstantLogBlocks.push(r);
break;
}
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
log.info("Merging the final data blocks");
merge(records, currentInstantLogBlocks);
}
} catch (IOException e) {
throw new HoodieIOException("IOException when reading compacting log files");
throw new HoodieIOException("IOException when reading log file ");
}
this.totalRecordsToUpdate = records.size();
log.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
log.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
log.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
log.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
log.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records
.getInMemoryMapNumEntries());
log.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records
.getCurrentInMemoryMapSize());
log.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records
.getDiskBasedMapNumEntries());
log.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
}
/**
* Checks if the current logblock belongs to a later instant
* @param logBlock
* @return
*/
private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK
&& !logBlock.getLogMetadata().get(INSTANT_TIME)
.contentEquals(currentInstantLogBlocks.peek().getLogMetadata().get(INSTANT_TIME));
return currentInstantLogBlocks.size() > 0
&& currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK
&& !logBlock.getLogBlockHeader().get(INSTANT_TIME)
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME));
}
/**
@@ -228,7 +245,10 @@ public class HoodieCompactedLogRecordScanner implements
*/
private Map<String, HoodieRecord<? extends HoodieRecordPayload>> loadRecordsFromBlock(
HoodieAvroDataBlock dataBlock) throws IOException {
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock = Maps.newHashMap();
// TODO (NA) - Instead of creating a new HashMap use the spillable map
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock = Maps
.newHashMap();
// TODO (NA) - Implemnt getRecordItr() in HoodieAvroDataBlock and use that here
List<IndexedRecord> recs = dataBlock.getRecords();
totalLogRecords.addAndGet(recs.size());
recs.forEach(rec -> {
@@ -255,7 +275,7 @@ public class HoodieCompactedLogRecordScanner implements
* Merge the last seen log blocks with the accumulated records
*/
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Deque<HoodieLogBlock> lastBlocks) throws IOException {
Deque<HoodieLogBlock> lastBlocks) throws IOException {
while (!lastBlocks.isEmpty()) {
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = lastBlocks.pollLast();
@@ -265,6 +285,7 @@ public class HoodieCompactedLogRecordScanner implements
break;
case DELETE_BLOCK:
// TODO : If delete is the only block written and/or records are present in parquet file
// TODO : Mark as tombstone (optional.empty()) for data instead of deleting the entry
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(records::remove);
break;
case CORRUPT_BLOCK:
@@ -278,7 +299,7 @@ public class HoodieCompactedLogRecordScanner implements
* Merge the records read from a single data block with the accumulated records
*/
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock) {
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordsFromLastBlock) {
recordsFromLastBlock.forEach((key, hoodieRecord) -> {
if (records.containsKey(key)) {
// Merge and store the merged record

View File

@@ -0,0 +1,410 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import com.uber.hoodie.exception.CorruptedLogFileException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* Scans a log file and provides block level iterator on the log file Loads the entire block
* contents in memory Can emit either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one
* is found)
*/
class HoodieLogFileReader implements HoodieLogFormat.Reader {
private static final int DEFAULT_BUFFER_SIZE = 4096;
private final static Logger log = LogManager.getLogger(HoodieLogFileReader.class);
private final FSDataInputStream inputStream;
private final HoodieLogFile logFile;
private static final byte[] oldMagicBuffer = new byte[4];
private static final byte[] magicBuffer = new byte[6];
private final Schema readerSchema;
private HoodieLogBlock nextBlock = null;
private LogFormatVersion nextBlockVersion;
private boolean readBlockLazily;
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
this.inputStream = fs.open(logFile.getPath(), bufferSize);
this.logFile = logFile;
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
if(this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen();
}
addShutDownHook();
}
HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema,
boolean readBlockLazily, boolean reverseReader) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readBlockLazily, reverseReader);
}
HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
}
@Override
public HoodieLogFile getLogFile() {
return logFile;
}
/**
* Close the inputstream when the JVM exits
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
inputStream.close();
} catch (Exception e) {
log.warn("unable to close input stream for log file " + logFile, e);
// fail silently for any sort of exception
}
}
});
}
// TODO : convert content and block length to long by using ByteBuffer, raw byte [] allows for max of Integer size
private HoodieLogBlock readBlock() throws IOException {
int blocksize = -1;
int type = -1;
HoodieLogBlockType blockType = null;
Map<HeaderMetadataType, String> header = null;
try {
if (isOldMagic()) {
// 1 Read the block type for a log block
type = inputStream.readInt();
Preconditions.checkArgument(type < HoodieLogBlockType.values().length,
"Invalid block byte type found " + type);
blockType = HoodieLogBlockType.values()[type];
// 2 Read the total size of the block
blocksize = inputStream.readInt();
} else {
// 1 Read the total size of the block
blocksize = (int) inputStream.readLong();
}
} catch (Exception e) {
// An exception reading any of the above indicates a corrupt block
// Create a corrupt block by finding the next OLD_MAGIC marker or EOF
return createCorruptBlock();
}
// We may have had a crash which could have written this block partially
// Skip blocksize in the stream and we should either find a sync marker (start of the next block) or EOF
// If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupt(blocksize);
if (isCorrupted) {
return createCorruptBlock();
}
// 2. Read the version for this log format
this.nextBlockVersion = readVersion();
// 3. Read the block type for a log block
if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) {
type = inputStream.readInt();
Preconditions.checkArgument(type < HoodieLogBlockType.values().length,
"Invalid block byte type found " + type);
blockType = HoodieLogBlockType.values()[type];
}
// 4. Read the header for a log block, if present
if (nextBlockVersion.hasHeader()) {
header = HoodieLogBlock.getLogMetadata(inputStream);
}
int contentLength = blocksize;
// 5. Read the content length for the content
if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) {
contentLength = (int) inputStream.readLong();
}
// 6. Read the content or skip content based on IO vs Memory trade-off by client
// TODO - have a max block size and reuse this buffer in the ByteBuffer (hard to guess max block size for now)
long contentPosition = inputStream.getPos();
byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, contentLength, readBlockLazily);
// 7. Read footer if any
Map<HeaderMetadataType, String> footer = null;
if (nextBlockVersion.hasFooter()) {
footer = HoodieLogBlock.getLogMetadata(inputStream);
}
// 8. Read log block length, if present. This acts as a reverse pointer when traversing a log file in reverse
long logBlockLength = 0;
if (nextBlockVersion.hasLogBlockLength()) {
logBlockLength = inputStream.readLong();
}
// 9. Read the log block end position in the log file
long blockEndPos = inputStream.getPos();
switch (blockType) {
// based on type read the block
case AVRO_DATA_BLOCK:
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return HoodieAvroDataBlock.getBlock(content, readerSchema);
} else {
return HoodieAvroDataBlock.getBlock(logFile, inputStream, Optional.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
}
case DELETE_BLOCK:
return HoodieDeleteBlock.getBlock(logFile, inputStream, Optional.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);
case COMMAND_BLOCK:
return HoodieCommandBlock.getBlock(logFile, inputStream, Optional.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);
default:
throw new HoodieNotSupportedException("Unsupported Block " + blockType);
}
}
private HoodieLogBlock createCorruptBlock() throws IOException {
log.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
long currentPos = inputStream.getPos();
long nextBlockOffset = scanForNextAvailableBlockOffset();
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
inputStream.seek(currentPos);
log.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
long contentPosition = inputStream.getPos();
byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily);
return HoodieCorruptBlock.getBlock(logFile, inputStream, Optional.ofNullable(corruptedBytes), readBlockLazily,
contentPosition, corruptedBlockSize, corruptedBlockSize, new HashMap<>(), new HashMap<>());
}
private boolean isBlockCorrupt(int blocksize) throws IOException {
long currentPos = inputStream.getPos();
try {
inputStream.seek(currentPos + blocksize);
} catch (EOFException e) {
// this is corrupt
return true;
}
try {
readMagic();
// all good - either we found the sync marker or EOF. Reset position and continue
return false;
} catch (CorruptedLogFileException e) {
// This is a corrupted block
return true;
} finally {
inputStream.seek(currentPos);
}
}
private long scanForNextAvailableBlockOffset() throws IOException {
while (true) {
long currentPos = inputStream.getPos();
try {
boolean isEOF = readMagic();
return isEOF ? inputStream.getPos() : currentPos;
} catch (CorruptedLogFileException e) {
// No luck - advance and try again
inputStream.seek(currentPos + 1);
}
}
}
@Override
public void close() throws IOException {
this.inputStream.close();
}
@Override
/**
* hasNext is not idempotent. TODO - Fix this. It is okay for now - PR
*/
public boolean hasNext() {
try {
boolean isEOF = readMagic();
if (isEOF) {
return false;
}
this.nextBlock = readBlock();
return nextBlock != null;
} catch (IOException e) {
throw new HoodieIOException("IOException when reading logfile " + logFile, e);
}
}
/**
* Read log format version from log file, if present
* For old log files written with Magic header OLD_MAGIC and without version, return DEFAULT_VERSION
*
* @throws IOException
*/
private LogFormatVersion readVersion() throws IOException {
// If not old log file format (written with Magic header OLD_MAGIC), then read log version
if (Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC)) {
Arrays.fill(oldMagicBuffer, (byte) 0);
return new HoodieLogFormatVersion(HoodieLogFormatVersion.DEFAULT_VERSION);
}
return new HoodieLogFormatVersion(inputStream.readInt());
}
private boolean isOldMagic() {
return Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC);
}
private boolean readMagic() throws IOException {
try {
long pos = inputStream.getPos();
// 1. Read magic header from the start of the block
inputStream.readFully(magicBuffer, 0, 6);
if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) {
inputStream.seek(pos);
// 1. Read old magic header from the start of the block
// (for backwards compatibility of older log files written without log version)
inputStream.readFully(oldMagicBuffer, 0, 4);
if (!Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC)) {
throw new CorruptedLogFileException(
logFile + "could not be read. Did not find the magic bytes at the start of the block");
}
}
return false;
} catch (EOFException e) {
// We have reached the EOF
return true;
}
}
@Override
public HoodieLogBlock next() {
if (nextBlock == null) {
// may be hasNext is not called
hasNext();
}
return nextBlock;
}
/**
* hasPrev is not idempotent
*
* @return
*/
public boolean hasPrev() {
try {
if(!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
reverseLogFilePosition = lastReverseLogFilePosition;
reverseLogFilePosition -= Long.BYTES;
lastReverseLogFilePosition = reverseLogFilePosition;
inputStream.seek(reverseLogFilePosition);
} catch (Exception e) {
// Either reached EOF while reading backwards or an exception
return false;
}
return true;
}
/**
* This is a reverse iterator
* Note: At any point, an instance of HoodieLogFileReader should either iterate reverse (prev)
* or forward (next). Doing both in the same instance is not supported
* WARNING : Every call to prev() should be preceded with hasPrev()
*
* @return
* @throws IOException
*/
public HoodieLogBlock prev() throws IOException {
if(!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
long blockSize = inputStream.readLong();
long blockEndPos = inputStream.getPos();
// blocksize should read everything about a block including the length as well
try {
inputStream.seek(reverseLogFilePosition - blockSize);
} catch (Exception e) {
// this could be a corrupt block
inputStream.seek(blockEndPos);
throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, " +
"fallback to forward reading of logfile");
}
boolean hasNext = hasNext();
reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition;
return this.nextBlock;
}
/**
* Reverse pointer, does not read the block. Return the current position of the log file (in reverse)
* If the pointer (inputstream) is moved in any way, it is the job of the client of this class to
* seek/reset it back to the file position returned from the method to expect correct results
*
* @return
* @throws IOException
*/
public long moveToPrev() throws IOException {
if(!this.reverseReader) {
throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
}
inputStream.seek(lastReverseLogFilePosition);
long blockSize = inputStream.readLong();
// blocksize should be everything about a block including the length as well
inputStream.seek(reverseLogFilePosition - blockSize);
reverseLogFilePosition -= blockSize;
lastReverseLogFilePosition = reverseLogFilePosition;
return reverseLogFilePosition;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
}
}

View File

@@ -19,17 +19,18 @@ package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
/**
* File Format for Hoodie Log Files. The File Format consists of blocks each seperated with a MAGIC
* File Format for Hoodie Log Files. The File Format consists of blocks each seperated with a OLD_MAGIC
* sync marker. A Block can either be a Data block, Command block or Delete Block. Data Block -
* Contains log records serialized as Avro Binary Format Command Block - Specific commands like
* RoLLBACK_PREVIOUS-BLOCK - Tombstone for the previously written block Delete Block - List of keys
@@ -42,7 +43,21 @@ public interface HoodieLogFormat {
* this file specific (generate a random 4 byte magic and stick it in the file header), but this I
* think is suffice for now - PR
*/
byte[] MAGIC = new byte[]{'H', 'U', 'D', 'I'};
byte[] OLD_MAGIC = new byte[]{'H', 'U', 'D', 'I'};
/**
* Magic 6 bytes we put at the start of every block in the log file.
* This is added to maintain backwards compatiblity due to lack of log format/block
* version in older log files. All new log block will now write this OLD_MAGIC value
*/
byte[] MAGIC = new byte[]{'#', 'H', 'U', 'D', 'I', '#'};
/**
* The current version of the log format. Anytime the log format changes
* this version needs to be bumped and corresponding changes need to be made to
* {@link HoodieLogFormatVersion}
*/
int currentVersion = 1;
/**
* Writer interface to allow appending block to this file format
@@ -196,9 +211,8 @@ public interface HoodieLogFormat {
return new WriterBuilder();
}
static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema,
boolean readMetadata)
static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema)
throws IOException {
return new HoodieLogFormatReader(fs, logFile, readerSchema, readMetadata);
return new HoodieLogFileReader(fs, logFile, readerSchema, false, false);
}
}

View File

@@ -16,192 +16,85 @@
package com.uber.hoodie.common.table.log;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import com.uber.hoodie.exception.CorruptedLogFileException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Scans a log file and provides block level iterator on the log file Loads the entire block
* contents in memory Can emit either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one
* is found)
*/
import java.io.IOException;
import java.util.List;
public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private static final int DEFAULT_BUFFER_SIZE = 4096;
private final static Logger log = LogManager.getLogger(HoodieLogFormatReader.class);
private final FSDataInputStream inputStream;
private final HoodieLogFile logFile;
private static final byte[] magicBuffer = new byte[4];
private final List<HoodieLogFile> logFiles;
private HoodieLogFileReader currentReader;
private final FileSystem fs;
private final Schema readerSchema;
private HoodieLogBlock nextBlock = null;
private boolean readMetadata = true;
private final boolean readBlocksLazily;
private final boolean reverseLogReader;
HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readMetadata) throws IOException {
this.inputStream = fs.open(logFile.getPath(), bufferSize);
this.logFile = logFile;
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles,
Schema readerSchema, boolean readBlocksLazily, boolean reverseLogReader) throws IOException {
this.logFiles = logFiles;
this.fs = fs;
this.readerSchema = readerSchema;
this.readMetadata = readMetadata;
}
HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema,
boolean readMetadata) throws IOException {
this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readMetadata);
}
@Override
public HoodieLogFile getLogFile() {
return logFile;
}
private HoodieLogBlock readBlock() throws IOException {
// 2. Read the block type
int ordinal = inputStream.readInt();
Preconditions.checkArgument(ordinal < HoodieLogBlockType.values().length,
"Invalid block byte ordinal found " + ordinal);
HoodieLogBlockType blockType = HoodieLogBlockType.values()[ordinal];
// 3. Read the size of the block
int blocksize = inputStream.readInt();
// We may have had a crash which could have written this block partially
// Skip blocksize in the stream and we should either find a sync marker (start of the next block) or EOF
// If we did not find either of it, then this block is a corrupted block.
boolean isCorrupted = isBlockCorrupt(blocksize);
if (isCorrupted) {
return createCorruptBlock();
}
// 4. Read the content
// TODO - have a max block size and reuse this buffer in the ByteBuffer (hard to guess max block size for now)
byte[] content = new byte[blocksize];
inputStream.readFully(content, 0, blocksize);
switch (blockType) {
// based on type read the block
case AVRO_DATA_BLOCK:
return HoodieAvroDataBlock.fromBytes(content, readerSchema, readMetadata);
case DELETE_BLOCK:
return HoodieDeleteBlock.fromBytes(content, readMetadata);
case COMMAND_BLOCK:
return HoodieCommandBlock.fromBytes(content, readMetadata);
default:
throw new HoodieNotSupportedException("Unsupported Block " + blockType);
this.readBlocksLazily = readBlocksLazily;
this.reverseLogReader = reverseLogReader;
if(logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, readBlocksLazily,
false);
}
}
private HoodieLogBlock createCorruptBlock() throws IOException {
log.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
long currentPos = inputStream.getPos();
long nextBlockOffset = scanForNextAvailableBlockOffset();
// Rewind to the initial start and read corrupted bytes till the nextBlockOffset
inputStream.seek(currentPos);
log.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
byte[] content = new byte[corruptedBlockSize];
inputStream.readFully(content, 0, corruptedBlockSize);
return HoodieCorruptBlock.fromBytes(content, corruptedBlockSize, true);
}
private boolean isBlockCorrupt(int blocksize) throws IOException {
long currentPos = inputStream.getPos();
try {
inputStream.seek(currentPos + blocksize);
} catch (EOFException e) {
// this is corrupt
return true;
}
try {
readMagic();
// all good - either we found the sync marker or EOF. Reset position and continue
return false;
} catch (CorruptedLogFileException e) {
// This is a corrupted block
return true;
} finally {
inputStream.seek(currentPos);
}
}
private long scanForNextAvailableBlockOffset() throws IOException {
while (true) {
long currentPos = inputStream.getPos();
try {
boolean isEOF = readMagic();
return isEOF ? inputStream.getPos() : currentPos;
} catch (CorruptedLogFileException e) {
// No luck - advance and try again
inputStream.seek(currentPos + 1);
}
}
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles,
Schema readerSchema) throws IOException {
this(fs, logFiles, readerSchema, false, false);
}
@Override
public void close() throws IOException {
this.inputStream.close();
if (currentReader != null) {
currentReader.close();
}
}
@Override
/**
* hasNext is not idempotent. TODO - Fix this. It is okay for now - PR
*/
public boolean hasNext() {
try {
boolean isEOF = readMagic();
if (isEOF) {
return false;
}
this.nextBlock = readBlock();
return nextBlock != null;
} catch (IOException e) {
throw new HoodieIOException("IOException when reading logfile " + logFile, e);
}
}
private boolean readMagic() throws IOException {
try {
// 1. Read magic header from the start of the block
inputStream.readFully(magicBuffer, 0, 4);
if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) {
throw new CorruptedLogFileException(
logFile + "could not be read. Did not find the magic bytes at the start of the block");
}
if(currentReader == null) {
return false;
} catch (EOFException e) {
// We have reached the EOF
}
else if (currentReader.hasNext()) {
return true;
}
else if (logFiles.size() > 0) {
try {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, readBlocksLazily,
false);
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
return this.currentReader.hasNext();
}
return false;
}
@Override
public HoodieLogBlock next() {
if (nextBlock == null) {
// may be hasNext is not called
hasNext();
}
return nextBlock;
HoodieLogBlock block = currentReader.next();
return block;
}
@Override
public HoodieLogFile getLogFile() {
return currentReader.getLogFile();
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported for HoodieLogFormatReader");
}
}
}

View File

@@ -22,7 +22,6 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.WriterBuilder;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieException;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,6 +31,8 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
/**
* HoodieLogFormatWriter can be used to append blocks to a log file Use
* HoodieLogFormat.WriterBuilder to construct
@@ -117,16 +118,39 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
@Override
public Writer appendBlock(HoodieLogBlock block)
throws IOException, InterruptedException {
byte[] content = block.getBytes();
// 1. write the magic header for the start of the block
this.output.write(HoodieLogFormat.MAGIC);
// 2. Write the block type
this.output.writeInt(block.getBlockType().ordinal());
// 3. Write the size of the block
this.output.writeInt(content.length);
// 4. Write the contents of the block
this.output.write(content);
// Find current version
LogFormatVersion currentLogFormatVersion = new HoodieLogFormatVersion(HoodieLogFormat.currentVersion);
long currentSize = this.output.size();
// 1. Write the magic header for the start of the block
this.output.write(HoodieLogFormat.MAGIC);
// bytes for header
byte [] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
// content bytes
byte [] content = block.getContentBytes();
// bytes for footer
byte [] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
// 2. Write the total size of the block (excluding Magic)
this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
// 3. Write the version of this log block
this.output.writeInt(currentLogFormatVersion.getVersion());
// 4. Write the block type
this.output.writeInt(block.getBlockType().ordinal());
// 5. Write the headers for the log block
this.output.write(headerBytes);
// 6. Write the size of the content block
this.output.writeLong(content.length);
// 7. Write the contents of the data block
this.output.write(content);
// 8. Write the footers for the log block
this.output.write(footerBytes);
// 9. Write the total size of the log block (including magic) which is everything written until now (for reverse pointer)
this.output.writeLong(this.output.size() - currentSize);
// Flush every block to disk
flush();
@@ -134,6 +158,32 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
return rolloverIfNeeded();
}
/**
*
* This method returns the total LogBlock Length which is the sum of
* 1. Number of bytes to write version
* 2. Number of bytes to write ordinal
* 3. Length of the headers
* 4. Number of bytes used to write content length
* 5. Length of the content
* 6. Length of the footers
* 7. Number of bytes to write totalLogBlockLength
* @param contentLength
* @param headerLength
* @param footerLength
* @return
*/
private int getLogBlockLength(int contentLength, int headerLength, int footerLength) {
return
Integer.BYTES + // Number of bytes to write version
Integer.BYTES + // Number of bytes to write ordinal
headerLength + // Length of the headers
Long.BYTES + // Number of bytes used to write content length
contentLength + // Length of the content
footerLength + // Length of the footers
Long.BYTES; // Number of bytes to write totalLogBlockLength at end of block (for reverse pointer)
}
private Writer rolloverIfNeeded() throws IOException, InterruptedException {
// Roll over if the size is past the threshold
if (getCurrentSize() > sizeThreshold) {

View File

@@ -0,0 +1,131 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log;
/**
* A set of feature flags associated with a log format.
* Versions are changed when the log format changes.
* TODO(na) - Implement policies around major/minor versions
*/
abstract class LogFormatVersion {
private final int version;
LogFormatVersion(int version) {
this.version = version;
}
public int getVersion() {
return version;
}
public abstract boolean hasMagicHeader();
public abstract boolean hasContent();
public abstract boolean hasContentLength();
public abstract boolean hasOrdinal();
public abstract boolean hasHeader();
public abstract boolean hasFooter();
public abstract boolean hasLogBlockLength();
}
/**
* Implements logic to determine behavior for feature flags for {@link LogFormatVersion}
*/
final class HoodieLogFormatVersion extends LogFormatVersion {
public final static int DEFAULT_VERSION = 0;
HoodieLogFormatVersion(int version) {
super(version);
}
@Override
public boolean hasMagicHeader() {
switch (super.getVersion()) {
case DEFAULT_VERSION:
return true;
default:
return true;
}
}
@Override
public boolean hasContent() {
switch (super.getVersion()) {
case DEFAULT_VERSION:
return true;
default:
return true;
}
}
@Override
public boolean hasContentLength() {
switch (super.getVersion()) {
case DEFAULT_VERSION:
return true;
default:
return true;
}
}
@Override
public boolean hasOrdinal() {
switch (super.getVersion()) {
case DEFAULT_VERSION:
return true;
default:
return true;
}
}
@Override
public boolean hasHeader() {
switch (super.getVersion()) {
case DEFAULT_VERSION:
return false;
default:
return true;
}
}
@Override
public boolean hasFooter() {
switch (super.getVersion()) {
case DEFAULT_VERSION:
return false;
case 1:
return true;
}
return false;
}
@Override
public boolean hasLogBlockLength() {
switch (super.getVersion()) {
case DEFAULT_VERSION:
return false;
case 1:
return true;
}
return false;
}
}

View File

@@ -16,6 +16,8 @@
package com.uber.hoodie.common.table.log.block;
import com.google.common.annotations.VisibleForTesting;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.storage.SizeAwareDataInputStream;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.exception.HoodieIOException;
@@ -27,43 +29,134 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* DataBlock contains a list of records serialized using Avro.
* The Datablock contains
* 1. Compressed Writer Schema length
* 2. Compressed Writer Schema content
* 3. Total number of records in the block
* 4. Size of a record
* 5. Actual avro serialized content of the record
* 1. Data Block version
* 2. Total number of records in the block
* 3. Size of a record
* 4. Actual avro serialized content of the record
*/
public class HoodieAvroDataBlock extends HoodieLogBlock {
private List<IndexedRecord> records;
private Schema schema;
public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema, Map<LogMetadataType, String> metadata) {
super(metadata);
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records,
@Nonnull Map<HeaderMetadataType, String> header,
@Nonnull Map<HeaderMetadataType, String> footer) {
super(header, footer, Optional.empty(), Optional.empty(), null, false);
this.records = records;
this.schema = schema;
this.schema = Schema.parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
}
public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema) {
this(records, schema, null);
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records,
@Nonnull Map<HeaderMetadataType, String> header) {
this(records, header, new HashMap<>());
}
private HoodieAvroDataBlock(Optional<byte[]> content, @Nonnull FSDataInputStream inputStream,
boolean readBlockLazily, Optional<HoodieLogBlockContentLocation> blockContentLocation,
Schema readerSchema, @Nonnull Map<HeaderMetadataType, String> headers,
@Nonnull Map<HeaderMetadataType, String> footer) {
super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily);
this.schema = readerSchema;
}
public static HoodieLogBlock getBlock(HoodieLogFile logFile,
FSDataInputStream inputStream,
Optional<byte[]> content,
boolean readBlockLazily,
long position,
long blockSize,
long blockEndpos,
Schema readerSchema,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
return new HoodieAvroDataBlock(content, inputStream, readBlockLazily,
Optional.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)),
readerSchema, header, footer);
}
@Override
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing records from content
if (getContent().isPresent()) {
return getContent().get();
} else if (readBlockLazily && !getContent().isPresent() && records == null) {
// read block lazily
createRecordsFromContentBytes();
}
Schema schema = Schema.parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
// 1. Write out the log block version
output.writeInt(HoodieLogBlock.version);
// 2. Write total number of records
output.writeInt(records.size());
// 3. Write the records
Iterator<IndexedRecord> itr = records.iterator();
while (itr.hasNext()) {
IndexedRecord s = itr.next();
ByteArrayOutputStream temp = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null);
try {
// Encode the record into bytes
writer.write(s, encoder);
encoder.flush();
// Get the size of the bytes
int size = temp.toByteArray().length;
// Write the record size
output.writeInt(size);
// Write the content
output.write(temp.toByteArray());
itr.remove();
} catch (IOException e) {
throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e);
}
}
output.close();
return baos.toByteArray();
}
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.AVRO_DATA_BLOCK;
}
//TODO : (na) lazily create IndexedRecords only when required
public List<IndexedRecord> getRecords() {
if (records == null) {
try {
// in case records are absent, read content lazily and then convert to IndexedRecords
createRecordsFromContentBytes();
} catch (IOException io) {
throw new HoodieIOException("Unable to convert content bytes to records", io);
}
}
return records;
}
@@ -71,18 +164,114 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
return schema;
}
@Override
public byte[] getBytes() throws IOException {
//TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used
//TODO (na) - Implement a recordItr instead of recordList
private void createRecordsFromContentBytes() throws IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();
}
SizeAwareDataInputStream dis =
new SizeAwareDataInputStream(
new DataInputStream(new ByteArrayInputStream(getContent().get())));
// 1. Read version for this data block
int version = dis.readInt();
HoodieAvroDataBlockVersion logBlockVersion = new HoodieAvroDataBlockVersion(version);
// Get schema from the header
Schema writerSchema = new Schema.Parser()
.parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
// If readerSchema was not present, use writerSchema
if (schema == null) {
schema = writerSchema;
}
GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, schema);
// 2. Get the total records
int totalRecords = 0;
if (logBlockVersion.hasRecordCount()) {
totalRecords = dis.readInt();
}
List<IndexedRecord> records = new ArrayList<>(totalRecords);
// 3. Read the content
for (int i = 0; i < totalRecords; i++) {
int recordLength = dis.readInt();
Decoder decoder = DecoderFactory.get()
.binaryDecoder(getContent().get(), dis.getNumberOfBytesRead(), recordLength, null);
IndexedRecord record = reader.read(null, decoder);
records.add(record);
dis.skipBytes(recordLength);
}
dis.close();
this.records = records;
// Free up content to be GC'd, deflate
deflate();
}
/*****************************************************DEPRECATED METHODS**********************************************/
@Deprecated
@VisibleForTesting
/**
* This constructor is retained to provide backwards compatibility to HoodieArchivedLogs
* which were written using HoodieLogFormat V1
*/
public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema) {
super(new HashMap<>(), new HashMap<>(), Optional.empty(), Optional.empty(), null, false);
this.records = records;
this.schema = schema;
}
@Deprecated
/**
* This method is retained to provide backwards compatibility to HoodieArchivedLogs which were written using HoodieLogFormat V1
*/
public static HoodieLogBlock getBlock(byte[] content, Schema readerSchema) throws IOException {
SizeAwareDataInputStream dis = new SizeAwareDataInputStream(
new DataInputStream(new ByteArrayInputStream(content)));
// 1. Read the schema written out
int schemaLength = dis.readInt();
byte[] compressedSchema = new byte[schemaLength];
dis.readFully(compressedSchema, 0, schemaLength);
Schema writerSchema = new Schema.Parser().parse(HoodieAvroUtils.decompress(compressedSchema));
if (readerSchema == null) {
readerSchema = writerSchema;
}
GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
// 2. Get the total records
int totalRecords = dis.readInt();
List<IndexedRecord> records = new ArrayList<>(totalRecords);
// 3. Read the content
for (int i = 0; i < totalRecords; i++) {
int recordLength = dis.readInt();
Decoder decoder = DecoderFactory.get()
.binaryDecoder(content, dis.getNumberOfBytesRead(), recordLength, null);
IndexedRecord record = reader.read(null, decoder);
records.add(record);
dis.skipBytes(recordLength);
}
dis.close();
return new HoodieAvroDataBlock(records, readerSchema);
}
@Deprecated
@VisibleForTesting
public byte[] getBytes(Schema schema) throws IOException {
GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
// 1. Write out metadata
if (super.getLogMetadata() != null) {
output.write(HoodieLogBlock.getLogMetadataBytes(super.getLogMetadata()));
}
// 2. Compress and Write schema out
byte[] schemaContent = HoodieAvroUtils.compress(schema.toString());
output.writeInt(schemaContent.length);
@@ -118,45 +307,4 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
return baos.toByteArray();
}
@Override
public HoodieLogBlockType getBlockType() {
return HoodieLogBlockType.AVRO_DATA_BLOCK;
}
//TODO (na) - Break down content into smaller chunks of byte [] to be GC as they are used
public static HoodieLogBlock fromBytes(byte[] content, Schema readerSchema, boolean readMetadata) throws IOException {
SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));
Map<LogMetadataType, String> metadata = null;
// 1. Read the metadata written out, if applicable
if (readMetadata) {
metadata = HoodieLogBlock.getLogMetadata(dis);
}
// 1. Read the schema written out
int schemaLength = dis.readInt();
byte[] compressedSchema = new byte[schemaLength];
dis.readFully(compressedSchema, 0, schemaLength);
Schema writerSchema = new Schema.Parser().parse(HoodieAvroUtils.decompress(compressedSchema));
if (readerSchema == null) {
readerSchema = writerSchema;
}
//TODO : (na) lazily create IndexedRecords only when required
GenericDatumReader<IndexedRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
// 2. Get the total records
int totalRecords = dis.readInt();
List<IndexedRecord> records = new ArrayList<>(totalRecords);
// 3. Read the content
for (int i=0;i<totalRecords;i++) {
int recordLength = dis.readInt();
Decoder decoder = DecoderFactory.get().binaryDecoder(content, dis.getNumberOfBytesRead(), recordLength, null);
IndexedRecord record = reader.read(null, decoder);
records.add(record);
dis.skipBytes(recordLength);
}
dis.close();
return new HoodieAvroDataBlock(records, readerSchema, metadata);
}
}

View File

@@ -16,14 +16,12 @@
package com.uber.hoodie.common.table.log.block;
import com.uber.hoodie.common.storage.SizeAwareDataInputStream;
import com.uber.hoodie.common.model.HoodieLogFile;
import org.apache.hadoop.fs.FSDataInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* Command block issues a specific command to the scanner
@@ -34,27 +32,16 @@ public class HoodieCommandBlock extends HoodieLogBlock {
public enum HoodieCommandBlockTypeEnum {ROLLBACK_PREVIOUS_BLOCK}
public HoodieCommandBlock(HoodieCommandBlockTypeEnum type,
Map<LogMetadataType, String> metadata) {
super(metadata);
this.type = type;
public HoodieCommandBlock(Map<HeaderMetadataType, String> header) {
this(Optional.empty(), null, false, Optional.empty(), header, new HashMap<>());
}
public HoodieCommandBlock(HoodieCommandBlockTypeEnum type) {
this(type, null);
}
@Override
public byte[] getBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
if (super.getLogMetadata() != null) {
output.write(HoodieLogBlock.getLogMetadataBytes(super.getLogMetadata()));
}
output.writeInt(type.ordinal());
output.close();
return baos.toByteArray();
private HoodieCommandBlock(Optional<byte[]> content, FSDataInputStream inputStream,
boolean readBlockLazily, Optional<HoodieLogBlockContentLocation> blockContentLocation,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, content, inputStream, readBlockLazily);
this.type = HoodieCommandBlockTypeEnum.values()[Integer
.parseInt(header.get(HeaderMetadataType.COMMAND_BLOCK_TYPE))];
}
public HoodieCommandBlockTypeEnum getType() {
@@ -66,13 +53,23 @@ public class HoodieCommandBlock extends HoodieLogBlock {
return HoodieLogBlockType.COMMAND_BLOCK;
}
public static HoodieLogBlock fromBytes(byte[] content, boolean readMetadata) throws IOException {
SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));
Map<LogMetadataType, String> metadata = null;
if (readMetadata) {
metadata = HoodieLogBlock.getLogMetadata(dis);
}
int ordinal = dis.readInt();
return new HoodieCommandBlock(HoodieCommandBlockTypeEnum.values()[ordinal], metadata);
@Override
public byte[] getContentBytes() {
return new byte[0];
}
public static HoodieLogBlock getBlock(HoodieLogFile logFile,
FSDataInputStream inputStream,
Optional<byte[]> content,
boolean readBlockLazily,
long position,
long blockSize,
long blockEndpos,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
return new HoodieCommandBlock(content, inputStream, readBlockLazily,
Optional.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)),
header, footer);
}
}

View File

@@ -16,14 +16,12 @@
package com.uber.hoodie.common.table.log.block;
import com.uber.hoodie.common.storage.SizeAwareDataInputStream;
import com.uber.hoodie.common.model.HoodieLogFile;
import org.apache.hadoop.fs.FSDataInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
/**
* Corrupt block is emitted whenever the scanner finds the length of the block written at the
@@ -31,26 +29,20 @@ import java.util.Map;
*/
public class HoodieCorruptBlock extends HoodieLogBlock {
private final byte[] corruptedBytes;
private HoodieCorruptBlock(byte[] corruptedBytes, Map<LogMetadataType, String> metadata) {
super(metadata);
this.corruptedBytes = corruptedBytes;
}
private HoodieCorruptBlock(byte[] corruptedBytes) {
this(corruptedBytes, null);
private HoodieCorruptBlock(Optional<byte[]> corruptedBytes, FSDataInputStream inputStream,
boolean readBlockLazily, Optional<HoodieLogBlockContentLocation> blockContentLocation,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, corruptedBytes, inputStream, readBlockLazily);
}
@Override
public byte[] getBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
if (super.getLogMetadata() != null) {
output.write(HoodieLogBlock.getLogMetadataBytes(super.getLogMetadata()));
public byte[] getContentBytes() throws IOException {
if (!getContent().isPresent() && readBlockLazily) {
// read content from disk
inflate();
}
output.write(corruptedBytes);
return baos.toByteArray();
return getContent().get();
}
@Override
@@ -58,26 +50,17 @@ public class HoodieCorruptBlock extends HoodieLogBlock {
return HoodieLogBlockType.CORRUPT_BLOCK;
}
public byte[] getCorruptedBytes() {
return corruptedBytes;
}
public static HoodieLogBlock getBlock(HoodieLogFile logFile,
FSDataInputStream inputStream,
Optional<byte[]> corruptedBytes,
boolean readBlockLazily,
long position,
long blockSize,
long blockEndPos,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) throws IOException {
public static HoodieLogBlock fromBytes(byte[] content, int blockSize, boolean readMetadata)
throws IOException {
SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));
Map<LogMetadataType, String> metadata = null;
int bytesRemaining = blockSize;
if (readMetadata) {
try { //attempt to read metadata
metadata = HoodieLogBlock.getLogMetadata(dis);
bytesRemaining = blockSize - HoodieLogBlock.getLogMetadataBytes(metadata).length;
} catch (IOException e) {
// unable to read metadata, possibly corrupted
metadata = null;
}
}
byte[] corruptedBytes = new byte[bytesRemaining];
dis.readFully(corruptedBytes);
return new HoodieCorruptBlock(corruptedBytes, metadata);
return new HoodieCorruptBlock(corruptedBytes, inputStream, readBlockLazily,
Optional.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), header, footer);
}
}

View File

@@ -16,48 +16,82 @@
package com.uber.hoodie.common.table.log.block;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.storage.SizeAwareDataInputStream;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import com.uber.hoodie.common.storage.SizeAwareDataInputStream;
import org.apache.commons.lang3.StringUtils;
import java.util.Optional;
/**
* Delete block contains a list of keys to be deleted from scanning the blocks so far
*/
public class HoodieDeleteBlock extends HoodieLogBlock {
private final String[] keysToDelete;
private String[] keysToDelete;
public HoodieDeleteBlock(String[] keysToDelete, Map<LogMetadataType, String> metadata) {
super(metadata);
public HoodieDeleteBlock(String[] keysToDelete,
Map<HeaderMetadataType, String> header) {
this(Optional.empty(), null, false, Optional.empty(), header, new HashMap<>());
this.keysToDelete = keysToDelete;
}
public HoodieDeleteBlock(String[] keysToDelete) {
this(keysToDelete, null);
private HoodieDeleteBlock(Optional<byte[]> content, FSDataInputStream inputStream,
boolean readBlockLazily, Optional<HoodieLogBlockContentLocation> blockContentLocation,
Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, content, inputStream, readBlockLazily);
}
@Override
public byte[] getBytes() throws IOException {
public byte[] getContentBytes() throws IOException {
// In case this method is called before realizing keys from content
if (getContent().isPresent()) {
return getContent().get();
} else if (readBlockLazily && !getContent().isPresent() && keysToDelete == null) {
// read block lazily
getKeysToDelete();
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
if (super.getLogMetadata() != null) {
output.write(HoodieLogBlock.getLogMetadataBytes(super.getLogMetadata()));
}
byte[] bytesToWrite = StringUtils.join(keysToDelete, ',').getBytes(Charset.forName("utf-8"));
byte[] bytesToWrite = StringUtils.join(getKeysToDelete(), ',').getBytes(Charset.forName("utf-8"));
output.writeInt(HoodieLogBlock.version);
output.writeInt(bytesToWrite.length);
output.write(bytesToWrite);
return baos.toByteArray();
}
public String[] getKeysToDelete() {
return keysToDelete;
try {
if (keysToDelete == null) {
if (!getContent().isPresent() && readBlockLazily) {
// read content from disk
inflate();
}
SizeAwareDataInputStream dis =
new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(getContent().get())));
int version = dis.readInt();
int dataLength = dis.readInt();
byte[] data = new byte[dataLength];
dis.readFully(data);
this.keysToDelete = new String(data).split(",");
deflate();
}
return keysToDelete;
} catch (IOException io) {
throw new HoodieIOException("Unable to generate keys to delete from block content", io);
}
}
@Override
@@ -65,15 +99,17 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
return HoodieLogBlockType.DELETE_BLOCK;
}
public static HoodieLogBlock fromBytes(byte[] content, boolean readMetadata) throws IOException {
SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));
Map<LogMetadataType, String> metadata = null;
if (readMetadata) {
metadata = HoodieLogBlock.getLogMetadata(dis);
}
int dataLength = dis.readInt();
byte[] data = new byte[dataLength];
dis.readFully(data);
return new HoodieDeleteBlock(new String(data).split(","), metadata);
public static HoodieLogBlock getBlock(HoodieLogFile logFile,
FSDataInputStream inputStream,
Optional<byte[]> content,
boolean readBlockLazily,
long position,
long blockSize,
long blockEndPos,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) throws IOException {
return new HoodieDeleteBlock(content, inputStream, readBlockLazily,
Optional.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), header, footer);
}
}

View File

@@ -17,21 +17,66 @@
package com.uber.hoodie.common.table.log.block;
import com.google.common.collect.Maps;
import com.uber.hoodie.common.storage.SizeAwareDataInputStream;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FSDataInputStream;
/**
* Abstract class defining a block in HoodieLogFile
*/
public abstract class HoodieLogBlock {
public byte[] getBytes() throws IOException {
/**
* The current version of the log block. Anytime the logBlock format changes
* this version needs to be bumped and corresponding changes need to be made to
* {@link HoodieLogBlockVersion}
* TODO : Change this to a class, something like HoodieLogBlockVersionV1/V2 and implement/override operations there
*/
public static int version = 1;
// Header for each log block
private final Map<HeaderMetadataType, String> logBlockHeader;
// Footer for each log block
private final Map<HeaderMetadataType, String> logBlockFooter;
// Location of a log block on disk
private final Optional<HoodieLogBlockContentLocation> blockContentLocation;
// data for a specific block
private Optional<byte []> content;
// TODO : change this to just InputStream so this works for any FileSystem
// create handlers to return specific type of inputstream based on FS
// input stream corresponding to the log file where this logBlock belongs
protected FSDataInputStream inputStream;
// Toggle flag, whether to read blocks lazily (I/O intensive) or not (Memory intensive)
protected boolean readBlockLazily;
public HoodieLogBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Optional<HoodieLogBlockContentLocation> blockContentLocation,
@Nonnull Optional<byte []> content,
FSDataInputStream inputStream,
boolean readBlockLazily) {
this.logBlockHeader = logBlockHeader;
this.logBlockFooter = logBlockFooter;
this.blockContentLocation = blockContentLocation;
this.content = content;
this.inputStream = inputStream;
this.readBlockLazily = readBlockLazily;
}
// Return the bytes representation of the data belonging to a LogBlock
public byte[] getContentBytes() throws IOException {
throw new HoodieException("No implementation was provided");
}
public byte [] getMagic() {
throw new HoodieException("No implementation was provided");
}
@@ -39,8 +84,25 @@ public abstract class HoodieLogBlock {
throw new HoodieException("No implementation was provided");
}
//log metadata for each log block
private Map<LogMetadataType, String> logMetadata;
public long getLogBlockLength() {
throw new HoodieException("No implementation was provided");
}
public Optional<HoodieLogBlockContentLocation> getBlockContentLocation() {
return this.blockContentLocation;
}
public Map<HeaderMetadataType, String> getLogBlockHeader() {
return logBlockHeader;
}
public Map<HeaderMetadataType, String> getLogBlockFooter() {
return logBlockFooter;
}
public Optional<byte[]> getContent() {
return content;
}
/**
* Type of the log block WARNING: This enum is serialized as the ordinal. Only add new enums at
@@ -54,32 +116,71 @@ public abstract class HoodieLogBlock {
}
/**
* Metadata abstraction for a HoodieLogBlock WARNING : This enum is serialized as the ordinal.
* Log Metadata headers abstraction for a HoodieLogBlock WARNING : This enum is serialized as the ordinal.
* Only add new enums at the end.
*/
public enum LogMetadataType {
public enum HeaderMetadataType {
INSTANT_TIME,
TARGET_INSTANT_TIME
TARGET_INSTANT_TIME,
SCHEMA,
COMMAND_BLOCK_TYPE
}
public HoodieLogBlock(Map<LogMetadataType, String> logMetadata) {
this.logMetadata = logMetadata;
/**
* Log Metadata footers abstraction for a HoodieLogBlock WARNING : This enum is serialized as the ordinal.
* Only add new enums at the end.
*/
public enum FooterMetadataType {
}
public Map<LogMetadataType, String> getLogMetadata() {
return logMetadata;
/**
* This class is used to store the Location of the Content of a Log Block. It's used when a client chooses for a
* IO intensive CompactedScanner, the location helps to lazily read contents from the log file
*/
public static final class HoodieLogBlockContentLocation {
// The logFile that contains this block
private final HoodieLogFile logFile;
// The filePosition in the logFile for the contents of this block
private final long contentPositionInLogFile;
// The number of bytes / size of the contents of this block
private final long blockSize;
// The final position where the complete block ends
private final long blockEndPos;
HoodieLogBlockContentLocation(HoodieLogFile logFile, long contentPositionInLogFile, long blockSize, long blockEndPos) {
this.logFile = logFile;
this.contentPositionInLogFile = contentPositionInLogFile;
this.blockSize = blockSize;
this.blockEndPos = blockEndPos;
}
public HoodieLogFile getLogFile() {
return logFile;
}
public long getContentPositionInLogFile() {
return contentPositionInLogFile;
}
public long getBlockSize() {
return blockSize;
}
public long getBlockEndPos() {
return blockEndPos;
}
}
/**
* Convert log metadata to bytes 1. Write size of metadata 2. Write enum ordinal 3. Write actual
* bytes
*/
public static byte[] getLogMetadataBytes(Map<LogMetadataType, String> metadata)
public static byte[] getLogMetadataBytes(Map<HeaderMetadataType, String> metadata)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
output.writeInt(metadata.size());
for (Map.Entry<LogMetadataType, String> entry : metadata.entrySet()) {
for (Map.Entry<HeaderMetadataType, String> entry : metadata.entrySet()) {
output.writeInt(entry.getKey().ordinal());
byte[] bytes = entry.getValue().getBytes();
output.writeInt(bytes.length);
@@ -91,10 +192,10 @@ public abstract class HoodieLogBlock {
/**
* Convert bytes to LogMetadata, follow the same order as {@link HoodieLogBlock#getLogMetadataBytes}
*/
public static Map<LogMetadataType, String> getLogMetadata(SizeAwareDataInputStream dis)
public static Map<HeaderMetadataType, String> getLogMetadata(DataInputStream dis)
throws IOException {
Map<LogMetadataType, String> metadata = Maps.newHashMap();
Map<HeaderMetadataType, String> metadata = Maps.newHashMap();
// 1. Read the metadata written out
int metadataCount = dis.readInt();
try {
@@ -103,7 +204,7 @@ public abstract class HoodieLogBlock {
int metadataEntrySize = dis.readInt();
byte[] metadataEntry = new byte[metadataEntrySize];
dis.readFully(metadataEntry, 0, metadataEntrySize);
metadata.put(LogMetadataType.values()[metadataEntryIndex], new String(metadataEntry));
metadata.put(HeaderMetadataType.values()[metadataEntryIndex], new String(metadataEntry));
metadataCount--;
}
return metadata;
@@ -111,4 +212,60 @@ public abstract class HoodieLogBlock {
throw new IOException("Could not read metadata fields ", eof);
}
}
/**
* Read or Skip block content of a log block in the log file. Depends on lazy reading enabled in
* {@link com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner}
*
* @param inputStream
* @param contentLength
* @param readBlockLazily
* @return
* @throws IOException
*/
public static byte [] readOrSkipContent(FSDataInputStream inputStream,
Integer contentLength, boolean readBlockLazily) throws IOException {
byte [] content = null;
if (!readBlockLazily) {
// Read the contents in memory
content = new byte[contentLength];
inputStream.readFully(content, 0, contentLength);
} else {
// Seek to the end of the content block
inputStream.seek(inputStream.getPos() + contentLength);
}
return content;
}
/**
* When lazyReading of blocks is turned on, inflate the content of a log block from disk
* @throws IOException
*/
protected void inflate() throws IOException {
try {
content = Optional.of(new byte[(int) this.getBlockContentLocation().get().getBlockSize()]);
inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile());
inputStream.readFully(content.get(), 0, content.get().length);
inputStream.seek(this.getBlockContentLocation().get().getBlockEndPos());
} catch(IOException e) {
try {
// TODO : fs.open() and return inputstream again, need to pass FS configuration
// because the inputstream might close/timeout for large number of log blocks to be merged
inflate();
} catch(IOException io) {
throw new HoodieIOException("unable to lazily read log block from disk", io);
}
}
}
/**
* After the content bytes is converted into the required DataStructure by a logBlock, deflate the content
* to release byte [] and relieve memory pressure when GC kicks in.
* NOTE: This still leaves the heap fragmented
*/
protected void deflate() {
content = Optional.empty();
}
}

View File

@@ -0,0 +1,79 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.table.log.block;
import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.version;
abstract class HoodieLogBlockVersion {
private final int currentVersion;
public final static int DEFAULT_VERSION = 0;
HoodieLogBlockVersion(int version) {
this.currentVersion = version;
}
int getVersion() {
return currentVersion;
}
}
/**
* A set of feature flags associated with a data log block format.
* Versions are changed when the log block format changes.
* TODO(na) - Implement policies around major/minor versions
*/
final class HoodieAvroDataBlockVersion extends HoodieLogBlockVersion {
HoodieAvroDataBlockVersion(int version) {
super(version);
}
public boolean hasRecordCount() {
switch (super.getVersion()) {
case DEFAULT_VERSION:
return true;
default:
return true;
}
}
}
/**
* A set of feature flags associated with a command log block format.
* Versions are changed when the log block format changes.
* TODO(na) - Implement policies around major/minor versions
*/
final class HoodieCommandBlockVersion extends HoodieLogBlockVersion {
HoodieCommandBlockVersion(int version) {
super(version);
}
}
/**
* A set of feature flags associated with a delete log block format.
* Versions are changed when the log block format changes.
* TODO(na) - Implement policies around major/minor versions
*/
final class HoodieDeleteBlockVersion extends HoodieLogBlockVersion {
HoodieDeleteBlockVersion(int version) {
super(version);
}
}

View File

@@ -291,8 +291,9 @@ public class HoodieTestUtils {
.overBaseCommit(location.getCommitTime())
.withFs(fs).build();
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, location.getCommitTime());
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getCommitTime());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
logWriter.appendBlock(new HoodieAvroDataBlock(s.getValue().stream().map(r -> {
try {
GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
@@ -304,7 +305,7 @@ public class HoodieTestUtils {
} catch (IOException e) {
return null;
}
}).collect(Collectors.toList()), schema, metadata));
}).collect(Collectors.toList()), header));
logWriter.close();
} catch (Exception e) {
fail(e.toString());

View File

@@ -72,6 +72,11 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
public static final String COMPACTION_MEMORY_FRACTION_PROP = "compaction.memory.fraction";
public static final String DEFAULT_COMPACTION_MEMORY_FRACTION = "0.75";
// used to choose a trade off between IO vs Memory when performing compaction process
// Depending on outputfile size and memory provided, choose true to avoid OOM for large file size + small memory
public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "compaction.lazy.block.read.enabled";
public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "true";
public static final Log LOG = LogFactory.getLog(HoodieRealtimeRecordReader.class);
private final HashMap<String, ArrayWritable> deltaRecordMap;
@@ -132,7 +137,8 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
split.getDeltaFilePaths(),
readerSchema, split.getMaxCommitTime(),
(long) Math.ceil(Double.valueOf(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION))
*jobConf.getMemoryForMapTask()));
*jobConf.getMemoryForMapTask()),
Boolean.valueOf(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), false);
// NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit
// but can return records for completed commits > the commit we are trying to read (if using readCommit() API)
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : compactedLogRecordScanner) {
@@ -140,6 +146,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
.get();
String key = hoodieRecord.getRecordKey();
// we assume, a later safe record in the log, is newer than what we have in the map & replace it.
// TODO : handle deletes here
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, writerSchema);
deltaRecordMap.put(key, aWritable);
if (LOG.isDebugEnabled()) {
@@ -302,6 +309,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<Void, ArrayWrita
arrayWritableToString(deltaRecordMap.get(key))));
}
if (deltaRecordMap.containsKey(key)) {
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro ?
Writable[] replaceValue = deltaRecordMap.get(key).get();
Writable[] originalValue = arrayWritable.get();
System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);

View File

@@ -91,9 +91,10 @@ public class HoodieRealtimeRecordReaderTest {
records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0"));
}
Schema writeSchema = records.get(0).getSchema();
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, newCommit);
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema, metadata);
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
long size = writer.getCurrentSize();
return writer;

View File

@@ -395,7 +395,7 @@ public class HoodieHiveClient {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private MessageType readSchemaFromLogFile(Optional<HoodieInstant> lastCompactionCommitOpt,
Path path) throws IOException {
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true);
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
HoodieAvroDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
@@ -404,6 +404,7 @@ public class HoodieHiveClient {
}
}
if (lastBlock != null) {
lastBlock.getRecords();
return new parquet.avro.AvroSchemaConverter().convert(lastBlock.getSchema());
}
// Fall back to read the schema from last compaction

View File

@@ -314,9 +314,10 @@ public class TestUtil {
List<IndexedRecord> records = (isLogSchemaSimple ? SchemaTestUtil
.generateTestRecords(0, 100)
: SchemaTestUtil.generateEvolvedTestRecords(100, 100));
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, dataFile.getCommitTime());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, schema, metadata);
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, dataFile.getCommitTime());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
logWriter.appendBlock(dataBlock);
logWriter.close();
return logWriter.getLogFile();