diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 7500abf3a..a974e8b38 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -33,7 +33,9 @@ import com.uber.hoodie.table.HoodieTable; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Optional; + import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; @@ -47,7 +49,7 @@ public class HoodieMergeHandle extends HoodieIOHa private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class); private WriteStatus writeStatus; - private HashMap> keyToNewRecords; + private Map> keyToNewRecords; private HoodieStorageWriter storageWriter; private TableFileSystemView.ReadOptimizedView fileSystemView; private Path newFilePath; @@ -64,77 +66,93 @@ public class HoodieMergeHandle extends HoodieIOHa String fileId) { super(config, commitTime, hoodieTable); this.fileSystemView = hoodieTable.getROFileSystemView(); - init(fileId, recordItr); + init(fileId, init(fileId, recordItr)); + } + + public HoodieMergeHandle(HoodieWriteConfig config, + String commitTime, + HoodieTable hoodieTable, + Map> keyToNewRecords, + String fileId) { + super(config, commitTime, hoodieTable); + this.fileSystemView = hoodieTable.getROFileSystemView(); + this.keyToNewRecords = keyToNewRecords; + init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get()).getPartitionPath()); } /** - * Load the new incoming records in a map, and extract the old file path. + * Extract old file path, initialize StorageWriter and WriteStatus + * @param fileId + * @param partitionPath */ - private void init(String fileId, Iterator> newRecordsItr) { + private void init(String fileId, String partitionPath) { WriteStatus writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName()); writeStatus.setStat(new HoodieWriteStat()); this.writeStatus = writeStatus; - this.keyToNewRecords = new HashMap<>(); - try { - // Load the new records in a map - while (newRecordsItr.hasNext()) { - HoodieRecord record = newRecordsItr.next(); - // If the first record, we need to extract some info out - if (oldFilePath == null) { - String latestValidFilePath = fileSystemView - .getLatestDataFiles(record.getPartitionPath()) - .filter(dataFile -> dataFile.getFileId().equals(fileId)) - .findFirst() - .get().getFileName(); - writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); + String latestValidFilePath = fileSystemView + .getLatestDataFiles(partitionPath) + .filter(dataFile -> dataFile.getFileId().equals(fileId)) + .findFirst() + .get().getFileName(); + writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); - HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, - commitTime, - new Path(config.getBasePath()), - new Path(config.getBasePath(), record.getPartitionPath())); - partitionMetadata.trySave(TaskContext.getPartitionId()); + HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, + commitTime, + new Path(config.getBasePath()), + new Path(config.getBasePath(), partitionPath)); + partitionMetadata.trySave(TaskContext.getPartitionId()); - oldFilePath = new Path( - config.getBasePath() + "/" + record.getPartitionPath() + "/" - + latestValidFilePath); - String relativePath = new Path(record.getPartitionPath() + "/" + FSUtils - .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString(); - newFilePath = new Path(config.getBasePath(), relativePath); - if (config.shouldUseTempFolderForCopyOnWriteForMerge()) { - this.tempPath = makeTempPath(record.getPartitionPath(), TaskContext.getPartitionId(), fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); - } + oldFilePath = new Path( + config.getBasePath() + "/" + partitionPath + "/" + + latestValidFilePath); + String relativePath = new Path(partitionPath + "/" + FSUtils + .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString(); + newFilePath = new Path(config.getBasePath(), relativePath); - // handle cases of partial failures, for update task - if (fs.exists(newFilePath)) { - fs.delete(newFilePath, false); - } - - logger.info(String.format("Merging new data into oldPath %s, as newPath %s", - oldFilePath.toString(), getStorageWriterPath().toString())); - // file name is same for all records, in this bunch - writeStatus.setFileId(fileId); - writeStatus.setPartitionPath(record.getPartitionPath()); - writeStatus.getStat().setFileId(fileId); - writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath); - } - keyToNewRecords.put(record.getRecordKey(), record); - // update the new location of the record, so we know where to find it next - record.setNewLocation(new HoodieRecordLocation(commitTime, fileId)); + // handle cases of partial failures, for update task + if (fs.exists(newFilePath)) { + fs.delete(newFilePath, false); } + + logger.info(String.format("Merging new data into oldPath %s, as newPath %s", + oldFilePath.toString(), newFilePath.toString())); + // file name is same for all records, in this bunch + writeStatus.setFileId(fileId); + writeStatus.setPartitionPath(partitionPath); + writeStatus.getStat().setFileId(fileId); + writeStatus.getStat().setPath(relativePath); // Create the writer for writing the new version file storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema); - - } catch (Exception e) { - logger.error("Error in update task at commit " + commitTime, e); - writeStatus.setGlobalError(e); + .getStorageWriter(commitTime, newFilePath, hoodieTable, config, schema); + } catch (IOException io) { + logger.error("Error in update task at commit " + commitTime, io); + writeStatus.setGlobalError(io); throw new HoodieUpsertException( "Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " - + commitTime + " on path " + hoodieTable.getMetaClient().getBasePath(), e); + + commitTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io); } } + /** + * Load the new incoming records in a map and return partitionPath + * @param fileId + * @param newRecordsItr + * @return + */ + private String init(String fileId, Iterator> newRecordsItr) { + // Load the new records in a map + this.keyToNewRecords = new HashMap<>(); + String partitionPath = null; + while (newRecordsItr.hasNext()) { + HoodieRecord record = newRecordsItr.next(); + partitionPath = record.getPartitionPath(); + keyToNewRecords.put(record.getRecordKey(), record); + // update the new location of the record, so we know where to find it next + record.setNewLocation(new HoodieRecordLocation(commitTime, fileId)); + } + return partitionPath; + } private boolean writeUpdateRecord(HoodieRecord hoodieRecord, Optional indexedRecord) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 138992469..6e89c38cc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -160,7 +160,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // Compacting is very similar to applying updates to existing file HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metaClient); Iterator> result = table - .handleUpdate(commitTime, operation.getFileId(), scanner.iterator()); + .handleUpdate(commitTime, operation.getFileId(), scanner.getRecords()); Iterable> resultIterable = () -> result; return StreamSupport.stream(resultIterable.spliterator(), false) .flatMap(Collection::stream) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 9ced403bd..5b4f176b6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -416,10 +416,23 @@ public class HoodieCopyOnWriteTable extends Hoodi public Iterator> handleUpdate(String commitTime, String fileLoc, - Iterator> recordItr) + Iterator> recordItr) throws IOException { // these are updates HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, recordItr); + return handleUpdateInternal(upsertHandle, commitTime, fileLoc); + } + + public Iterator> handleUpdate(String commitTime, String fileLoc, + Map> keyToNewRecords) + throws IOException { + // these are updates + HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileLoc, keyToNewRecords); + return handleUpdateInternal(upsertHandle, commitTime, fileLoc); + } + + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String commitTime, String fileLoc) + throws IOException { if (upsertHandle.getOldFilePath() == null) { throw new HoodieUpsertException("Error in finding the old file path at commit " + commitTime + " at fileLoc: " + fileLoc); @@ -459,6 +472,11 @@ public class HoodieCopyOnWriteTable extends Hoodi return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileLoc); } + protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileLoc, + Map> keyToNewRecords) { + return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileLoc); + } + public Iterator> handleInsert(String commitTime, Iterator> recordItr) throws Exception { return new LazyInsertIterable<>(recordItr, config, commitTime, this); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index 905da5c11..f01e15f8e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -43,6 +43,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Deque; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -62,8 +63,8 @@ public class HoodieCompactedLogRecordScanner implements private final static Logger log = LogManager.getLogger(HoodieCompactedLogRecordScanner.class); - // Final list of compacted/merged records to iterate - private final Collection> logRecords; + // Final map of compacted/merged records + private final Map> records; // Reader schema for the records private final Schema readerSchema; // Total log files read - for metrics @@ -89,7 +90,7 @@ public class HoodieCompactedLogRecordScanner implements this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass(); // Store merged records for all versions for this log file - Map> records = Maps.newHashMap(); + this.records = Maps.newHashMap(); // iterate over the paths Iterator logFilePathsItr = logFilePaths.iterator(); while (logFilePathsItr.hasNext()) { @@ -202,7 +203,6 @@ public class HoodieCompactedLogRecordScanner implements merge(records, currentInstantLogBlocks); } } - this.logRecords = Collections.unmodifiableCollection(records.values()); this.totalRecordsToUpdate = records.size(); } @@ -297,7 +297,7 @@ public class HoodieCompactedLogRecordScanner implements @Override public Iterator> iterator() { - return logRecords.iterator(); + return records.values().iterator(); } public long getTotalLogFiles() { @@ -308,6 +308,10 @@ public class HoodieCompactedLogRecordScanner implements return totalLogRecords.get(); } + public Map> getRecords() { + return records; + } + public long getTotalRecordsToUpdate() { return totalRecordsToUpdate; }