Add back UseTempFolder changes in HoodieMergeHandle
This commit is contained in:
@@ -109,6 +109,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
String relativePath = new Path(partitionPath + "/" + FSUtils
|
String relativePath = new Path(partitionPath + "/" + FSUtils
|
||||||
.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
|
.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
|
||||||
newFilePath = new Path(config.getBasePath(), relativePath);
|
newFilePath = new Path(config.getBasePath(), relativePath);
|
||||||
|
if (config.shouldUseTempFolderForCopyOnWriteForMerge()) {
|
||||||
|
this.tempPath = makeTempPath(partitionPath, TaskContext.getPartitionId(),
|
||||||
|
fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
|
||||||
|
}
|
||||||
|
|
||||||
// handle cases of partial failures, for update task
|
// handle cases of partial failures, for update task
|
||||||
if (fs.exists(newFilePath)) {
|
if (fs.exists(newFilePath)) {
|
||||||
@@ -116,15 +120,15 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
|||||||
}
|
}
|
||||||
|
|
||||||
logger.info(String.format("Merging new data into oldPath %s, as newPath %s",
|
logger.info(String.format("Merging new data into oldPath %s, as newPath %s",
|
||||||
oldFilePath.toString(), newFilePath.toString()));
|
oldFilePath.toString(), getStorageWriterPath().toString()));
|
||||||
// file name is same for all records, in this bunch
|
// file name is same for all records, in this bunch
|
||||||
writeStatus.setFileId(fileId);
|
writeStatus.setFileId(fileId);
|
||||||
writeStatus.setPartitionPath(partitionPath);
|
writeStatus.setPartitionPath(partitionPath);
|
||||||
writeStatus.getStat().setFileId(fileId);
|
writeStatus.getStat().setFileId(fileId);
|
||||||
writeStatus.getStat().setPath(relativePath);
|
writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
|
||||||
// Create the writer for writing the new version file
|
// Create the writer for writing the new version file
|
||||||
storageWriter = HoodieStorageWriterFactory
|
storageWriter = HoodieStorageWriterFactory
|
||||||
.getStorageWriter(commitTime, newFilePath, hoodieTable, config, schema);
|
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema);
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
logger.error("Error in update task at commit " + commitTime, io);
|
logger.error("Error in update task at commit " + commitTime, io);
|
||||||
writeStatus.setGlobalError(io);
|
writeStatus.setGlobalError(io);
|
||||||
|
|||||||
Reference in New Issue
Block a user