diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index df79cb83d..ce5f1e9e4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -109,6 +109,10 @@ public class HoodieMergeHandle extends HoodieIOHa String relativePath = new Path(partitionPath + "/" + FSUtils .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); + if (config.shouldUseTempFolderForCopyOnWriteForMerge()) { + this.tempPath = makeTempPath(partitionPath, TaskContext.getPartitionId(), + fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); + } // handle cases of partial failures, for update task if (fs.exists(newFilePath)) { @@ -116,15 +120,15 @@ public class HoodieMergeHandle extends HoodieIOHa } logger.info(String.format("Merging new data into oldPath %s, as newPath %s", - oldFilePath.toString(), newFilePath.toString())); + oldFilePath.toString(), getStorageWriterPath().toString())); // file name is same for all records, in this bunch writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); writeStatus.getStat().setFileId(fileId); - writeStatus.getStat().setPath(relativePath); + writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath); // Create the writer for writing the new version file storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, newFilePath, hoodieTable, config, schema); + .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema); } catch (IOException io) { logger.error("Error in update task at commit " + commitTime, io); writeStatus.setGlobalError(io);