From d3df32fa03e3b83a914ef9df9d56046dacb681bc Mon Sep 17 00:00:00 2001 From: Jian Xu Date: Wed, 14 Mar 2018 12:53:12 -0700 Subject: [PATCH] Add back UseTempFolder changes in HoodieMergeHandle --- .../java/com/uber/hoodie/io/HoodieMergeHandle.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index df79cb83d..ce5f1e9e4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -109,6 +109,10 @@ public class HoodieMergeHandle extends HoodieIOHa String relativePath = new Path(partitionPath + "/" + FSUtils .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); + if (config.shouldUseTempFolderForCopyOnWriteForMerge()) { + this.tempPath = makeTempPath(partitionPath, TaskContext.getPartitionId(), + fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); + } // handle cases of partial failures, for update task if (fs.exists(newFilePath)) { @@ -116,15 +120,15 @@ public class HoodieMergeHandle extends HoodieIOHa } logger.info(String.format("Merging new data into oldPath %s, as newPath %s", - oldFilePath.toString(), newFilePath.toString())); + oldFilePath.toString(), getStorageWriterPath().toString())); // file name is same for all records, in this bunch writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); writeStatus.getStat().setFileId(fileId); - writeStatus.getStat().setPath(relativePath); + writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath); // Create the writer for writing the new version file storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, newFilePath, hoodieTable, config, schema); + .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema); } catch (IOException io) { logger.error("Error in update task at commit " + commitTime, io); writeStatus.setGlobalError(io);