1
0

Write smaller sized multiple blocks to log file instead of a large one

- Use SizeEstimator to size number of records to write
	- Configurable block size
   	- Configurable log file size
This commit is contained in:
Nishith Agarwal
2018-02-01 12:36:12 -08:00
committed by vinoth chandar
parent eb3d0c470f
commit d495484399
4 changed files with 110 additions and 61 deletions

View File

@@ -34,6 +34,12 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
public static final String DEFAULT_PARQUET_BLOCK_SIZE_BYTES = DEFAULT_PARQUET_FILE_MAX_BYTES;
public static final String PARQUET_PAGE_SIZE_BYTES = "hoodie.parquet.page.size";
public static final String DEFAULT_PARQUET_PAGE_SIZE_BYTES = String.valueOf(1 * 1024 * 1024);
// used to size log files
public static final String LOGFILE_SIZE_MAX_BYTES = "hoodie.logfile.max.size";
public static final String DEFAULT_LOGFILE_SIZE_MAX_BYTES = String.valueOf(1024*1024*1024); // 1 GB
// used to size data blocks in log file
public static final String LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES = "hoodie.logfile.data.block.max.size";
public static final String DEFAULT_LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES = String.valueOf(256*1024*1024); // 256 MB
private HoodieStorageConfig(Properties props) {
super(props);
@@ -77,6 +83,16 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
return this;
}
public Builder logFileDataBlockMaxSize(int dataBlockSize) {
props.setProperty(LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES, String.valueOf(dataBlockSize));
return this;
}
public Builder logFileMaxSize(int logFileSize) {
props.setProperty(LOGFILE_SIZE_MAX_BYTES, String.valueOf(logFileSize));
return this;
}
public HoodieStorageConfig build() {
HoodieStorageConfig config = new HoodieStorageConfig(props);
setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES),
@@ -85,6 +101,10 @@ public class HoodieStorageConfig extends DefaultHoodieConfig {
PARQUET_BLOCK_SIZE_BYTES, DEFAULT_PARQUET_BLOCK_SIZE_BYTES);
setDefaultOnCondition(props, !props.containsKey(PARQUET_PAGE_SIZE_BYTES),
PARQUET_PAGE_SIZE_BYTES, DEFAULT_PARQUET_PAGE_SIZE_BYTES);
setDefaultOnCondition(props, !props.containsKey(LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES),
LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES, DEFAULT_LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES);
setDefaultOnCondition(props, !props.containsKey(LOGFILE_SIZE_MAX_BYTES),
LOGFILE_SIZE_MAX_BYTES, DEFAULT_LOGFILE_SIZE_MAX_BYTES);
return config;
}
}

View File

@@ -277,6 +277,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(HoodieStorageConfig.PARQUET_PAGE_SIZE_BYTES));
}
public int getLogFileDataBlockMaxSize() {
return Integer.parseInt(props.getProperty(HoodieStorageConfig.LOGFILE_DATA_BLOCK_SIZE_MAX_BYTES));
}
public int getLogFileMaxSize() {
return Integer.parseInt(props.getProperty(HoodieStorageConfig.LOGFILE_SIZE_MAX_BYTES));
}
/**
* metrics properties
**/

View File

@@ -17,7 +17,6 @@
package com.uber.hoodie.io;
import com.beust.jcommander.internal.Maps;
import com.clearspring.analytics.util.Lists;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDeltaWriteStat;
@@ -38,6 +37,14 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieAppendException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.table.HoodieTable;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
import org.apache.spark.util.SizeEstimator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -45,30 +52,28 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
/**
* IO Operation to append data onto an existing file.
*/
public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOHandle<T> {
private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class);
private static Logger logger = LogManager.getLogger(HoodieAppendHandle.class);
private static AtomicLong recordIndex = new AtomicLong(1);
private TableFileSystemView.RealtimeView fileSystemView;
private final WriteStatus writeStatus;
private final String fileId;
private String partitionPath;
private List<HoodieRecord<T>> records;
private Iterator<HoodieRecord<T>> recordItr;
List<IndexedRecord> recordList = new ArrayList<>();
List<String> keysToDelete = new ArrayList<>();
private long recordsWritten = 0;
private long recordsDeleted = 0;
private long averageRecordSize = 0;
private HoodieLogFile currentLogFile;
private Writer writer;
private boolean doInit = true;
public HoodieAppendHandle(HoodieWriteConfig config,
String commitTime,
@@ -81,55 +86,48 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
this.writeStatus = writeStatus;
this.fileId = fileId;
this.fileSystemView = hoodieTable.getRTFileSystemView();
init(recordItr);
this.recordItr = recordItr;
}
private void init(Iterator<HoodieRecord<T>> recordItr) {
List<HoodieRecord<T>> records = Lists.newArrayList();
recordItr.forEachRemaining(record -> {
records.add(record);
// extract some information from the first record
if (partitionPath == null) {
partitionPath = record.getPartitionPath();
FileSlice fileSlice = fileSystemView.getLatestFileSlices(record.getPartitionPath())
.filter(fileSlice1 -> fileSlice1.getDataFile().get().getFileId().equals(fileId))
.findFirst().get();
// HACK(vc) This also assumes a base file. It will break, if appending without one.
String latestValidFilePath = fileSlice.getDataFile().get().getFileName();
String baseCommitTime = FSUtils.getCommitTime(latestValidFilePath);
writeStatus.getStat().setPrevCommit(baseCommitTime);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(record.getPartitionPath());
writeStatus.getStat().setFileId(fileId);
private void init(String partitionPath) {
try {
this.writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(fileSlice.getLogFiles()
.max(HoodieLogFile.getLogVersionComparator()::compare)
.map(logFile -> logFile.getLogVersion()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFs(fs).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat) writeStatus.getStat())
.setLogVersion(currentLogFile.getLogVersion());
((HoodieDeltaWriteStat) writeStatus.getStat())
.setLogOffset(writer.getCurrentSize());
} catch (Exception e) {
logger.error("Error in update task at commit " + commitTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException(
"Failed to initialize HoodieUpdateHandle for FileId: " + fileId
+ " on commit " + commitTime + " on HDFS path " + hoodieTable
.getMetaClient().getBasePath() + partitionPath, e);
}
Path path = new Path(record.getPartitionPath(),
FSUtils.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId));
writeStatus.getStat().setPath(path.toString());
// extract some information from the first record
FileSlice fileSlice = fileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice1 -> fileSlice1.getDataFile().get().getFileId().equals(fileId))
.findFirst().get();
// HACK(vc) This also assumes a base file. It will break, if appending without one.
String latestValidFilePath = fileSlice.getDataFile().get().getFileName();
String baseCommitTime = FSUtils.getCommitTime(latestValidFilePath);
writeStatus.getStat().setPrevCommit(baseCommitTime);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId);
this.partitionPath = partitionPath;
try {
this.writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(fileSlice.getLogFiles()
.max(HoodieLogFile.getLogVersionComparator().reversed()::compare)
.map(logFile -> logFile.getLogVersion()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat) writeStatus.getStat())
.setLogVersion(currentLogFile.getLogVersion());
((HoodieDeltaWriteStat) writeStatus.getStat())
.setLogOffset(writer.getCurrentSize());
} catch (Exception e) {
logger.error("Error in update task at commit " + commitTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException(
"Failed to initialize HoodieUpdateHandle for FileId: " + fileId
+ " on commit " + commitTime + " on HDFS path " + hoodieTable
.getMetaClient().getBasePath() + partitionPath, e);
}
// update the new location of the record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
});
this.records = records;
Path path = new Path(partitionPath,
FSUtils.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId));
writeStatus.getStat().setPath(path.toString());
}
private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
@@ -162,28 +160,47 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
public void doAppend() {
List<IndexedRecord> recordList = new ArrayList<>();
List<String> keysToDelete = new ArrayList<>();
int maxBlockSize = config.getLogFileDataBlockMaxSize(); int numberOfRecords = 0;
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, commitTime);
Iterator<HoodieRecord<T>> recordsItr = records.iterator();
while (recordsItr.hasNext()) {
HoodieRecord record = recordsItr.next();
while (recordItr.hasNext()) {
HoodieRecord record = recordItr.next();
// update the new location of the record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
if(doInit) {
init(record.getPartitionPath());
averageRecordSize = SizeEstimator.estimate(record);
doInit = false;
}
// Append if max number of records reached to achieve block size
if(numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
// Recompute averageRecordSize before writing a new block and update existing value with avg of new and old
logger.info("AvgRecordSize => " + averageRecordSize);
averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record))/2;
doAppend(metadata);
numberOfRecords = 0;
}
Optional<IndexedRecord> indexedRecord = getIndexedRecord(record);
if (indexedRecord.isPresent()) {
recordList.add(indexedRecord.get());
} else {
keysToDelete.add(record.getRecordKey());
}
recordsItr.remove(); //remove entries when IndexedRecord added to new list
numberOfRecords++;
}
doAppend(metadata);
}
private void doAppend(Map<HoodieLogBlock.LogMetadataType, String> metadata) {
try {
if (recordList.size() > 0) {
writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, schema, metadata));
recordList.clear();
}
if (keysToDelete.size() > 0) {
writer = writer.appendBlock(
new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new), metadata));
keysToDelete.clear();
}
} catch (Exception e) {
throw new HoodieAppendException(
@@ -208,5 +225,4 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
return writeStatus;
}
}