1
0

refactor classes to accept Map passed by RealtimeCompactor to avoid multiple map creations in HoodieMergeHandle

This commit is contained in:
Nishith Agarwal
2018-01-24 13:34:14 -08:00
committed by vinoth chandar
parent 30049383f5
commit 7076c2e9f0
4 changed files with 100 additions and 60 deletions

View File

@@ -33,7 +33,9 @@ import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
@@ -47,7 +49,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class);
private WriteStatus writeStatus;
private HashMap<String, HoodieRecord<T>> keyToNewRecords;
private Map<String, HoodieRecord<T>> keyToNewRecords;
private HoodieStorageWriter<IndexedRecord> storageWriter;
private TableFileSystemView.ReadOptimizedView fileSystemView;
private Path newFilePath;
@@ -64,77 +66,93 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
String fileId) {
super(config, commitTime, hoodieTable);
this.fileSystemView = hoodieTable.getROFileSystemView();
init(fileId, recordItr);
init(fileId, init(fileId, recordItr));
}
public HoodieMergeHandle(HoodieWriteConfig config,
String commitTime,
HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords,
String fileId) {
super(config, commitTime, hoodieTable);
this.fileSystemView = hoodieTable.getROFileSystemView();
this.keyToNewRecords = keyToNewRecords;
init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath());
}
/**
* Load the new incoming records in a map, and extract the old file path.
* Extract old file path, initialize StorageWriter and WriteStatus
* @param fileId
* @param partitionPath
*/
private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
private void init(String fileId, String partitionPath) {
WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
writeStatus.setStat(new HoodieWriteStat());
this.writeStatus = writeStatus;
this.keyToNewRecords = new HashMap<>();
try {
// Load the new records in a map
while (newRecordsItr.hasNext()) {
HoodieRecord<T> record = newRecordsItr.next();
// If the first record, we need to extract some info out
if (oldFilePath == null) {
String latestValidFilePath = fileSystemView
.getLatestDataFiles(record.getPartitionPath())
.filter(dataFile -> dataFile.getFileId().equals(fileId))
.findFirst()
.get().getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
String latestValidFilePath = fileSystemView
.getLatestDataFiles(partitionPath)
.filter(dataFile -> dataFile.getFileId().equals(fileId))
.findFirst()
.get().getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs,
commitTime,
new Path(config.getBasePath()),
new Path(config.getBasePath(), record.getPartitionPath()));
partitionMetadata.trySave(TaskContext.getPartitionId());
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs,
commitTime,
new Path(config.getBasePath()),
new Path(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
oldFilePath = new Path(
config.getBasePath() + "/" + record.getPartitionPath() + "/"
+ latestValidFilePath);
String relativePath = new Path(record.getPartitionPath() + "/" + FSUtils
.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
if (config.shouldUseTempFolderForCopyOnWriteForMerge()) {
this.tempPath = makeTempPath(record.getPartitionPath(), TaskContext.getPartitionId(), fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
}
oldFilePath = new Path(
config.getBasePath() + "/" + partitionPath + "/"
+ latestValidFilePath);
String relativePath = new Path(partitionPath + "/" + FSUtils
.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
// handle cases of partial failures, for update task
if (fs.exists(newFilePath)) {
fs.delete(newFilePath, false);
}
logger.info(String.format("Merging new data into oldPath %s, as newPath %s",
oldFilePath.toString(), getStorageWriterPath().toString()));
// file name is same for all records, in this bunch
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(record.getPartitionPath());
writeStatus.getStat().setFileId(fileId);
writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
}
keyToNewRecords.put(record.getRecordKey(), record);
// update the new location of the record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
// handle cases of partial failures, for update task
if (fs.exists(newFilePath)) {
fs.delete(newFilePath, false);
}
logger.info(String.format("Merging new data into oldPath %s, as newPath %s",
oldFilePath.toString(), newFilePath.toString()));
// file name is same for all records, in this bunch
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId);
writeStatus.getStat().setPath(relativePath);
// Create the writer for writing the new version file
storageWriter = HoodieStorageWriterFactory
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema);
} catch (Exception e) {
logger.error("Error in update task at commit " + commitTime, e);
writeStatus.setGlobalError(e);
.getStorageWriter(commitTime, newFilePath, hoodieTable, config, schema);
} catch (IOException io) {
logger.error("Error in update task at commit " + commitTime, io);
writeStatus.setGlobalError(io);
throw new HoodieUpsertException(
"Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit "
+ commitTime + " on path " + hoodieTable.getMetaClient().getBasePath(), e);
+ commitTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io);
}
}
/**
* Load the new incoming records in a map and return partitionPath
* @param fileId
* @param newRecordsItr
* @return
*/
private String init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
// Load the new records in a map
this.keyToNewRecords = new HashMap<>();
String partitionPath = null;
while (newRecordsItr.hasNext()) {
HoodieRecord<T> record = newRecordsItr.next();
partitionPath = record.getPartitionPath();
keyToNewRecords.put(record.getRecordKey(), record);
// update the new location of the record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
}
return partitionPath;
}
private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
Optional<IndexedRecord> indexedRecord) {

View File

@@ -160,7 +160,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
// Compacting is very similar to applying updates to existing file
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metaClient);
Iterator<List<WriteStatus>> result = table
.handleUpdate(commitTime, operation.getFileId(), scanner.iterator());
.handleUpdate(commitTime, operation.getFileId(), scanner.getRecords());
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(), false)
.flatMap(Collection::stream)

View File

@@ -416,10 +416,23 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLoc,
Iterator<HoodieRecord<T>> recordItr)
Iterator<HoodieRecord<T>> recordItr)
throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr);
return handleUpdateInternal(upsertHandle, commitTime, fileLoc);
}
public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLoc,
Map<String, HoodieRecord<T>> keyToNewRecords)
throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, keyToNewRecords);
return handleUpdateInternal(upsertHandle, commitTime, fileLoc);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String commitTime, String fileLoc)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException("Error in finding the old file path at commit " +
commitTime + " at fileLoc: " + fileLoc);
@@ -459,6 +472,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc);
}
protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc,
Map<String, HoodieRecord<T>> keyToNewRecords) {
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileLoc);
}
public Iterator<List<WriteStatus>> handleInsert(String commitTime,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
return new LazyInsertIterable<>(recordItr, config, commitTime, this);

View File

@@ -43,6 +43,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -62,8 +63,8 @@ public class HoodieCompactedLogRecordScanner implements
private final static Logger log = LogManager.getLogger(HoodieCompactedLogRecordScanner.class);
// Final list of compacted/merged records to iterate
private final Collection<HoodieRecord<? extends HoodieRecordPayload>> logRecords;
// Final map of compacted/merged records
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> records;
// Reader schema for the records
private final Schema readerSchema;
// Total log files read - for metrics
@@ -89,7 +90,7 @@ public class HoodieCompactedLogRecordScanner implements
this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass();
// Store merged records for all versions for this log file
Map<String, HoodieRecord<? extends HoodieRecordPayload>> records = Maps.newHashMap();
this.records = Maps.newHashMap();
// iterate over the paths
Iterator<String> logFilePathsItr = logFilePaths.iterator();
while (logFilePathsItr.hasNext()) {
@@ -202,7 +203,6 @@ public class HoodieCompactedLogRecordScanner implements
merge(records, currentInstantLogBlocks);
}
}
this.logRecords = Collections.unmodifiableCollection(records.values());
this.totalRecordsToUpdate = records.size();
}
@@ -297,7 +297,7 @@ public class HoodieCompactedLogRecordScanner implements
@Override
public Iterator<HoodieRecord<? extends HoodieRecordPayload>> iterator() {
return logRecords.iterator();
return records.values().iterator();
}
public long getTotalLogFiles() {
@@ -308,6 +308,10 @@ public class HoodieCompactedLogRecordScanner implements
return totalLogRecords.get();
}
public Map<String, HoodieRecord<? extends HoodieRecordPayload>> getRecords() {
return records;
}
public long getTotalRecordsToUpdate() {
return totalRecordsToUpdate;
}