1
0

Adding ability for inserts to be written to log files

This commit is contained in:
Nishith Agarwal
2018-05-13 16:25:11 -07:00
committed by vinoth chandar
parent 34827d50e1
commit 3da063f83b
52 changed files with 1061 additions and 519 deletions

View File

@@ -31,7 +31,6 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -45,6 +44,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -60,21 +60,35 @@ import org.apache.spark.util.SizeEstimator;
public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOHandle<T> {
private static Logger logger = LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
private static AtomicLong recordIndex = new AtomicLong(1);
private final WriteStatus writeStatus;
private final String fileId;
// Buffer for holding records in memory before they are flushed to disk
List<IndexedRecord> recordList = new ArrayList<>();
// Buffer for holding records (to be deleted) in memory before they are flushed to disk
List<String> keysToDelete = new ArrayList<>();
private TableFileSystemView.RealtimeView fileSystemView;
private String partitionPath;
private Iterator<HoodieRecord<T>> recordItr;
// Total number of records written during an append
private long recordsWritten = 0;
// Total number of records deleted during an append
private long recordsDeleted = 0;
// Average record size for a HoodieRecord. This size is updated at the end of every log block flushed to disk
private long averageRecordSize = 0;
private HoodieLogFile currentLogFile;
private Writer writer;
// Flag used to initialize some metadata
private boolean doInit = true;
// Total number of bytes written during this append phase (an estimation)
private long estimatedNumberOfBytesWritten;
// Number of records that must be written to meet the max block size for a log block
private int numberOfRecords = 0;
// Max block size to limit to for a log block
private int maxBlockSize = config.getLogFileDataBlockMaxSize();
// Header metadata for a log block
private Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String fileId, Iterator<HoodieRecord<T>> recordItr) {
@@ -87,43 +101,46 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
this.recordItr = recordItr;
}
private void init(String partitionPath) {
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
this(config, commitTime, hoodieTable, UUID.randomUUID().toString(), null);
}
// extract some information from the first record
FileSlice fileSlice = fileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice1 -> fileSlice1.getDataFile().get().getFileId().equals(fileId)).findFirst()
.get();
// HACK(vc) This also assumes a base file. It will break, if appending without one.
String latestValidFilePath = fileSlice.getDataFile().get().getFileName();
String baseCommitTime = FSUtils.getCommitTime(latestValidFilePath);
writeStatus.getStat().setPrevCommit(baseCommitTime);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId);
this.partitionPath = partitionPath;
try {
this.writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(
fileSlice.getLogFiles().map(logFile -> logFile.getLogVersion())
.max(Comparator.naturalOrder()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize());
} catch (Exception e) {
logger.error("Error in update task at commit " + commitTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException(
"Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit "
+ commitTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath()
+ partitionPath, e);
private void init(HoodieRecord record) {
if (doInit) {
this.partitionPath = record.getPartitionPath();
// extract some information from the first record
Optional<FileSlice> fileSlice = fileSystemView.getLatestFileSlices(partitionPath)
.filter(fileSlice1 -> fileSlice1.getFileId().equals(fileId)).findFirst();
String baseInstantTime = commitTime;
if (fileSlice.isPresent()) {
baseInstantTime = fileSlice.get().getBaseCommitTime();
} else {
// This means there is no base data file, start appending to a new log file
fileSlice = Optional.of(new FileSlice(baseInstantTime, this.fileId));
logger.info("New InsertHandle for partition :" + partitionPath);
}
writeStatus.getStat().setPrevCommit(baseInstantTime);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId);
averageRecordSize = SizeEstimator.estimate(record);
try {
this.writer = createLogWriter(fileSlice, baseInstantTime);
this.currentLogFile = writer.getLogFile();
((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize());
} catch (Exception e) {
logger.error("Error in update task at commit " + commitTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException(
"Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
+ commitTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath()
+ partitionPath, e);
}
Path path = new Path(partitionPath, writer.getLogFile().getFileName());
writeStatus.getStat().setPath(path.toString());
doInit = false;
}
Path path = new Path(partitionPath,
FSUtils.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId));
writeStatus.getStat().setPath(path.toString());
}
private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
@@ -160,38 +177,11 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
// TODO (NA) - Perform a schema check of current input record with the last schema on log file
// to make sure we don't append records with older (shorter) schema than already appended
public void doAppend() {
int maxBlockSize = config.getLogFileDataBlockMaxSize();
int numberOfRecords = 0;
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
while (recordItr.hasNext()) {
HoodieRecord record = recordItr.next();
// update the new location of the record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
if (doInit) {
init(record.getPartitionPath());
averageRecordSize = SizeEstimator.estimate(record);
doInit = false;
}
// Append if max number of records reached to achieve block size
if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
// Recompute averageRecordSize before writing a new block and update existing value with
// avg of new and old
logger.info("AvgRecordSize => " + averageRecordSize);
averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
doAppend(header);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
numberOfRecords = 0;
}
Optional<IndexedRecord> indexedRecord = getIndexedRecord(record);
if (indexedRecord.isPresent()) {
recordList.add(indexedRecord.get());
} else {
keysToDelete.add(record.getRecordKey());
}
numberOfRecords++;
init(record);
flushToDiskIfRequired(record);
writeToBuffer(record);
}
doAppend(header);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
@@ -199,6 +189,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private void doAppend(Map<HoodieLogBlock.HeaderMetadataType, String> header) {
try {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
if (recordList.size() > 0) {
writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
recordList.clear();
@@ -214,11 +206,37 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
}
}
public void close() {
@Override
public boolean canWrite(HoodieRecord record) {
return config.getParquetMaxFileSize() >= estimatedNumberOfBytesWritten * config
.getLogFileToParquetCompressionRatio();
}
@Override
public void write(HoodieRecord record, Optional<IndexedRecord> insertValue) {
Optional recordMetadata = record.getData().getMetadata();
try {
init(record);
flushToDiskIfRequired(record);
writeToBuffer(record);
} catch (Throwable t) {
// Not throwing exception from here, since we don't want to fail the entire job
// for a single record
writeStatus.markFailure(record, t, recordMetadata);
logger.error("Error writing record " + record, t);
}
}
@Override
public WriteStatus close() {
try {
// flush any remaining records to disk
doAppend(header);
if (writer != null) {
writer.close();
}
writeStatus.getStat().setPrevCommit(commitTime);
writeStatus.getStat().setFileId(this.fileId);
writeStatus.getStat().setNumWrites(recordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten);
@@ -226,13 +244,54 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalUpsertTime(timer.endTimer());
writeStatus.getStat().setRuntimeStats(runtimeStats);
return writeStatus;
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
}
}
@Override
public WriteStatus getWriteStatus() {
return writeStatus;
}
private Writer createLogWriter(Optional<FileSlice> fileSlice, String baseCommitTime)
throws IOException, InterruptedException {
return HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(
fileSlice.get().getLogFiles().map(logFile -> logFile.getLogVersion())
.max(Comparator.naturalOrder()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}
private void writeToBuffer(HoodieRecord<T> record) {
// update the new location of the record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
Optional<IndexedRecord> indexedRecord = getIndexedRecord(record);
if (indexedRecord.isPresent()) {
recordList.add(indexedRecord.get());
} else {
keysToDelete.add(record.getRecordKey());
}
numberOfRecords++;
}
/**
* Checks if the number of records have reached the set threshold and then flushes the records to disk
*/
private void flushToDiskIfRequired(HoodieRecord record) {
// Append if max number of records reached to achieve block size
if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
// Recompute averageRecordSize before writing a new block and update existing value with
// avg of new and old
logger.info("AvgRecordSize => " + averageRecordSize);
averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
doAppend(header);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
numberOfRecords = 0;
}
}
}

View File

@@ -38,7 +38,7 @@ import org.apache.log4j.Logger;
/**
* Cleaner is responsible for garbage collecting older files in a given partition path, such that
* <p> 1) It provides sufficient time for existing queries running on older versions, to finish <p>
* <p> 1) It provides sufficient time for existing queries running on older versions, to close <p>
* 2) It bounds the growth of the files in the file system <p> TODO: Should all cleaning be done
* based on {@link com.uber.hoodie.common.model.HoodieCommitMetadata}
*/

View File

@@ -52,6 +52,7 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Archiver to bound the growth of <action>.commit files
@@ -99,9 +100,9 @@ public class HoodieCommitArchiveLog {
/**
* Check if commits need to be archived. If yes, archive commits.
*/
public boolean archiveIfRequired() {
public boolean archiveIfRequired(final JavaSparkContext jsc) {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
List<HoodieInstant> instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList());
boolean success = true;
if (instantsToArchive.iterator().hasNext()) {
this.writer = openWriter();
@@ -117,13 +118,13 @@ public class HoodieCommitArchiveLog {
}
}
private Stream<HoodieInstant> getInstantsToArchive() {
private Stream<HoodieInstant> getInstantsToArchive(JavaSparkContext jsc) {
// TODO : rename to max/minInstantsToKeep
int maxCommitsToKeep = config.getMaxCommitsToKeep();
int minCommitsToKeep = config.getMinCommitsToKeep();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// GroupBy each action and limit each action timeline to maxCommitsToKeep
// TODO: Handle ROLLBACK_ACTION in future

View File

@@ -31,8 +31,8 @@ import com.uber.hoodie.io.storage.HoodieStorageWriter;
import com.uber.hoodie.io.storage.HoodieStorageWriterFactory;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -49,12 +49,13 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
private Path tempPath = null;
private long recordsWritten = 0;
private long recordsDeleted = 0;
private Iterator<HoodieRecord<T>> recordIterator;
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath) {
String partitionPath, String fileId) {
super(config, commitTime, hoodieTable);
this.status = ReflectionUtils.loadClass(config.getWriteStatusClassName());
status.setFileId(UUID.randomUUID().toString());
status.setFileId(fileId);
status.setPartitionPath(partitionPath);
final int sparkPartitionId = TaskContext.getPartitionId();
@@ -77,12 +78,13 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
logger.info("New InsertHandle for partition :" + partitionPath);
}
/**
* Determines whether we can accept the incoming records, into the current file, depending on
* <p>
* - Whether it belongs to the same partitionPath as existing records - Whether the current file
* written bytes lt max file size
*/
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator) {
this(config, commitTime, hoodieTable, partitionPath, fileId);
this.recordIterator = recordIterator;
}
@Override
public boolean canWrite(HoodieRecord record) {
return storageWriter.canWrite() && record.getPartitionPath().equals(status.getPartitionPath());
}
@@ -114,9 +116,30 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
}
}
/**
* Writes all records passed
*/
public void write() {
try {
while (recordIterator.hasNext()) {
HoodieRecord<T> record = recordIterator.next();
write(record, record.getData().getInsertValue(schema));
}
} catch (IOException io) {
throw new HoodieInsertException(
"Failed to insert records for path " + getStorageWriterPath(), io);
}
}
@Override
public WriteStatus getWriteStatus() {
return status;
}
/**
* Performs actions to durably, persist the current changes and returns a WriteStatus object
*/
@Override
public WriteStatus close() {
logger.info("Closing the file " + status.getFileId() + " as we are done with all the records "
+ recordsWritten);

View File

@@ -16,6 +16,8 @@
package com.uber.hoodie.io;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -26,7 +28,9 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -103,4 +107,25 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
public Schema getSchema() {
return schema;
}
/**
* Determines whether we can accept the incoming records, into the current file, depending on
* <p>
* - Whether it belongs to the same partitionPath as existing records - Whether the current file
* written bytes lt max file size
*/
public boolean canWrite(HoodieRecord record) {
return false;
}
/**
* Perform the actual writing of the given record into the backing file.
*/
public void write(HoodieRecord record, Optional<IndexedRecord> insertValue) {
// NO_OP
}
public abstract WriteStatus close();
public abstract WriteStatus getWriteStatus();
}

View File

@@ -243,7 +243,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
}
}
public void close() {
@Override
public WriteStatus close() {
try {
// write out any pending records (this can happen when inserts are turned into updates)
Iterator<String> pendingRecordsItr = keyToNewRecords.keySet().iterator();
@@ -269,6 +270,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalUpsertTime(timer.endTimer());
writeStatus.getStat().setRuntimeStats(runtimeStats);
return writeStatus;
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
}
@@ -283,6 +285,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
return (this.tempPath == null) ? this.newFilePath : this.tempPath;
}
@Override
public WriteStatus getWriteStatus() {
return writeStatus;
}

View File

@@ -18,11 +18,13 @@ package com.uber.hoodie.io.compact;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -33,10 +35,10 @@ import java.util.stream.Collectors;
*/
public class CompactionOperation implements Serializable {
private String dataFileCommitTime;
private long dataFileSize;
private Optional<String> dataFileCommitTime;
private Optional<Long> dataFileSize;
private List<String> deltaFilePaths;
private String dataFilePath;
private Optional<String> dataFilePath;
private String fileId;
private String partitionPath;
private Map<String, Object> metrics;
@@ -46,24 +48,32 @@ public class CompactionOperation implements Serializable {
public CompactionOperation() {
}
public CompactionOperation(HoodieDataFile dataFile, String partitionPath,
public CompactionOperation(Optional<HoodieDataFile> dataFile, String partitionPath,
List<HoodieLogFile> logFiles, HoodieWriteConfig writeConfig) {
this.dataFilePath = dataFile.getPath();
this.fileId = dataFile.getFileId();
if (dataFile.isPresent()) {
this.dataFilePath = Optional.of(dataFile.get().getPath());
this.fileId = dataFile.get().getFileId();
this.dataFileCommitTime = Optional.of(dataFile.get().getCommitTime());
this.dataFileSize = Optional.of(dataFile.get().getFileSize());
} else {
assert logFiles.size() > 0;
this.dataFilePath = Optional.empty();
this.fileId = FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath());
this.dataFileCommitTime = Optional.empty();
this.dataFileSize = Optional.empty();
}
this.partitionPath = partitionPath;
this.dataFileCommitTime = dataFile.getCommitTime();
this.dataFileSize = dataFile.getFileSize();
this.deltaFilePaths = logFiles.stream().map(s -> s.getPath().toString())
.collect(Collectors.toList());
this.metrics = writeConfig.getCompactionStrategy()
.captureMetrics(dataFile, partitionPath, logFiles);
.captureMetrics(writeConfig, dataFile, partitionPath, logFiles);
}
public String getDataFileCommitTime() {
public Optional<String> getDataFileCommitTime() {
return dataFileCommitTime;
}
public long getDataFileSize() {
public Optional<Long> getDataFileSize() {
return dataFileSize;
}
@@ -71,7 +81,7 @@ public class CompactionOperation implements Serializable {
return deltaFilePaths;
}
public String getDataFilePath() {
public Optional<String> getDataFilePath() {
return dataFilePath;
}

View File

@@ -86,16 +86,19 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
private JavaRDD<WriteStatus> executeCompaction(JavaSparkContext jsc,
List<CompactionOperation> operations, HoodieTable hoodieTable, HoodieWriteConfig config,
String compactionCommitTime) throws IOException {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
// Compacting is very similar to applying updates to existing file
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
log.info("After filtering, Compacting " + operations + " files");
return jsc.parallelize(operations, operations.size())
.map(s -> compact(hoodieTable, config, s, compactionCommitTime))
.map(s -> compact(table, metaClient, config, s, compactionCommitTime))
.flatMap(writeStatusesItr -> writeStatusesItr.iterator());
}
private List<WriteStatus> compact(HoodieTable hoodieTable, HoodieWriteConfig config,
private List<WriteStatus> compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
HoodieWriteConfig config,
CompactionOperation operation, String commitTime) throws IOException {
FileSystem fs = hoodieTable.getMetaClient().getFs();
FileSystem fs = metaClient.getFs();
Schema readerSchema = HoodieAvroUtils
.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
@@ -107,7 +110,6 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
// Load all the delta commits since the last compaction commit and get all the blocks to be
// loaded and load it using CompositeAvroLogReader
// Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
String maxInstantTime = metaClient.getActiveTimeline()
.getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION,
@@ -125,9 +127,16 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
}
// Compacting is very similar to applying updates to existing file
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metaClient);
Iterator<List<WriteStatus>> result = table
.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords());
Iterator<List<WriteStatus>> result;
// If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a
// new base parquet file.
if (operation.getDataFilePath().isPresent()) {
result = hoodieCopyOnWriteTable
.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords());
} else {
result = hoodieCopyOnWriteTable
.handleInsert(commitTime, operation.getPartitionPath(), operation.getFileId(), scanner.iterator());
}
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream)
.map(s -> {
@@ -176,7 +185,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
.getLogVersionComparator().reversed()).collect(Collectors.toList());
totalLogFiles.add((long) logFiles.size());
totalFileSlices.add(1L);
return new CompactionOperation(s.getDataFile().get(), partitionPath, logFiles, config);
return new CompactionOperation(s.getDataFile(), partitionPath, logFiles, config);
})
.filter(c -> !c.getDeltaFilePaths().isEmpty())
.collect(toList()).iterator()).collect();
@@ -195,4 +204,4 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
return operations;
}
}
}

View File

@@ -54,16 +54,19 @@ public abstract class CompactionStrategy implements Serializable {
* @param logFiles - List of log files to compact with the base file
* @return Map[String, Object] - metrics captured
*/
public Map<String, Object> captureMetrics(HoodieDataFile dataFile, String partitionPath,
List<HoodieLogFile> logFiles) {
public Map<String, Object> captureMetrics(HoodieWriteConfig writeConfig, Optional<HoodieDataFile> dataFile, String
partitionPath, List<HoodieLogFile> logFiles) {
Map<String, Object> metrics = Maps.newHashMap();
Long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
// Total size of all the log files
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(Optional::isPresent)
.map(Optional::get).reduce((size1, size2) -> size1 + size2).orElse(0L);
// Total read will be the base file + all the log files
Long totalIORead = FSUtils.getSizeInMB(dataFile.getFileSize() + totalLogFileSize);
Long totalIORead = FSUtils.getSizeInMB((dataFile.isPresent() ? dataFile.get().getFileSize() : 0L)
+ totalLogFileSize);
// Total write will be similar to the size of the base file
Long totalIOWrite = FSUtils.getSizeInMB(dataFile.getFileSize());
Long totalIOWrite = FSUtils
.getSizeInMB(dataFile.isPresent() ? dataFile.get().getFileSize() : defaultMaxParquetFileSize);
// Total IO will the the IO for read + write
Long totalIO = totalIORead + totalIOWrite;
// Save these metrics and we will use during the filter

View File

@@ -39,10 +39,11 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat
private static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILE_SIZE";
@Override
public Map<String, Object> captureMetrics(HoodieDataFile dataFile, String partitionPath,
public Map<String, Object> captureMetrics(HoodieWriteConfig config, Optional<HoodieDataFile> dataFile, String
partitionPath,
List<HoodieLogFile> logFiles) {
Map<String, Object> metrics = super.captureMetrics(dataFile, partitionPath, logFiles);
Map<String, Object> metrics = super.captureMetrics(config, dataFile, partitionPath, logFiles);
// Total size of all the log files
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize)
.filter(Optional::isPresent).map(Optional::get).reduce((size1, size2) -> size1 + size2)

View File

@@ -72,7 +72,7 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
this.schema = schema;
}
private static Configuration registerFileSystem(Path file, Configuration conf) {
public static Configuration registerFileSystem(Path file, Configuration conf) {
Configuration returnConf = new Configuration(conf);
String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
returnConf.set("fs." + HoodieWrapperFileSystem.getHoodieScheme(scheme) + ".impl",

View File

@@ -30,18 +30,17 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class HoodieStorageWriterFactory {
public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R>
getStorageWriter(String commitTime, Path path, HoodieTable<T> hoodieTable,
public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> getStorageWriter(
String commitTime, Path path, HoodieTable<T> hoodieTable,
HoodieWriteConfig config, Schema schema) throws IOException {
//TODO - based on the metadata choose the implementation of HoodieStorageWriter
// Currently only parquet is supported
return newParquetStorageWriter(commitTime, path, config, schema, hoodieTable);
}
private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R>
newParquetStorageWriter(
String commitTime, Path path, HoodieWriteConfig config, Schema schema,
HoodieTable hoodieTable) throws IOException {
private static <T extends HoodieRecordPayload,
R extends IndexedRecord> HoodieStorageWriter<R> newParquetStorageWriter(String commitTime, Path path,
HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable) throws IOException {
BloomFilter filter = new BloomFilter(config.getBloomFilterNumEntries(),
config.getBloomFilterFPP());
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(