diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 082ececdb..4f19ee679 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -60,6 +60,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "hoodie.copyonwrite.use.temp.folder"; private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "false"; + private static final String HOODIE_MERGEHANDLE_USE_TEMP_FOLDER = "hoodie.mergehandle.use.temp.folder"; + private static final String DEFAULT_HOODIE_MERGEHANDLE_USE_TEMP_FOLDER = "false"; private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; @@ -122,6 +124,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER)); } + public boolean shouldUseTempFolderForMergeHandle() { + return Boolean.parseBoolean(props.getProperty(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER)); + } + public int getFinalizeWriteParallelism() { return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM)); } @@ -402,6 +408,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withUseTempFolderMergeHandle(boolean shouldUseTempFolderMergeHandle) { + props.setProperty(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER, String.valueOf(shouldUseTempFolderMergeHandle)); + return this; + } + public Builder withFinalizeWriteParallelism(int parallelism) { props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism)); return this; @@ -432,6 +443,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS); setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER), HOODIE_COPYONWRITE_USE_TEMP_FOLDER, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER); + setDefaultOnCondition(props, !props.containsKey(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER), + HOODIE_MERGEHANDLE_USE_TEMP_FOLDER, DEFAULT_HOODIE_MERGEHANDLE_USE_TEMP_FOLDER); setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index e6ce31468..35706c630 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -52,6 +52,7 @@ public class HoodieMergeHandle extends HoodieIOHa private TableFileSystemView.ReadOptimizedView fileSystemView; private Path newFilePath; private Path oldFilePath; + private Path tempPath = null; private long recordsWritten = 0; private long recordsDeleted = 0; private long updatedRecordsWritten = 0; @@ -100,6 +101,9 @@ public class HoodieMergeHandle extends HoodieIOHa String relativePath = new Path(record.getPartitionPath() + "/" + FSUtils .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); + if (config.shouldUseTempFolderForCopyOnWrite() && config.shouldUseTempFolderForMergeHandle()) { + this.tempPath = makeTempPath(record.getPartitionPath(), TaskContext.getPartitionId(), fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); + } // handle cases of partial failures, for update task if (fs.exists(newFilePath)) { @@ -107,12 +111,12 @@ public class HoodieMergeHandle extends HoodieIOHa } logger.info(String.format("Merging new data into oldPath %s, as newPath %s", - oldFilePath.toString(), newFilePath.toString())); + oldFilePath.toString(), getNewFilePath().toString())); // file name is same for all records, in this bunch writeStatus.setFileId(fileId); writeStatus.setPartitionPath(record.getPartitionPath()); writeStatus.getStat().setFileId(fileId); - writeStatus.getStat().setPath(relativePath); + writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath); } keyToNewRecords.put(record.getRecordKey(), record); // update the new location of the record, so we know where to find it next @@ -120,7 +124,7 @@ public class HoodieMergeHandle extends HoodieIOHa } // Create the writer for writing the new version file storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, newFilePath, hoodieTable, config, schema); + .getStorageWriter(commitTime, getNewFilePath(), hoodieTable, config, schema); } catch (Exception e) { logger.error("Error in update task at commit " + commitTime, e); @@ -186,18 +190,18 @@ public class HoodieMergeHandle extends HoodieIOHa if (copyOldRecord) { // this should work as it is, since this is an existing record String errMsg = "Failed to merge old record into new file for key " + key + " from old file " - + getOldFilePath() + " to new file " + newFilePath; + + getOldFilePath() + " to new file " + getNewFilePath(); try { storageWriter.writeAvro(key, oldRecord); } catch (ClassCastException e) { logger.error( "Schema mismatch when rewriting old record " + oldRecord + " from file " - + getOldFilePath() + " to file " + newFilePath + " with schema " + schema + + getOldFilePath() + " to file " + getNewFilePath() + " with schema " + schema .toString(true)); throw new HoodieUpsertException(errMsg, e); } catch (IOException e) { logger.error("Failed to merge old record into new file for key " + key + " from old file " - + getOldFilePath() + " to new file " + newFilePath, e); + + getOldFilePath() + " to new file " + getNewFilePath(), e); throw new HoodieUpsertException(errMsg, e); } recordsWritten++; @@ -219,7 +223,7 @@ public class HoodieMergeHandle extends HoodieIOHa storageWriter.close(); } - writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, newFilePath)); + writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, getNewFilePath())); writeStatus.getStat().setNumWrites(recordsWritten); writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); @@ -233,6 +237,14 @@ public class HoodieMergeHandle extends HoodieIOHa return oldFilePath; } + private Path getNewFilePath() { + // Use tempPath for storage writer if possible + if (this.tempPath != null) { + return this.tempPath; + } + return this.newFilePath; + } + public WriteStatus getWriteStatus() { return writeStatus; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 9cab45bc7..1e507c645 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -255,9 +255,4 @@ public class HoodieMergeOnReadTable extends // do nothing for MOR tables return Optional.empty(); } - - @Override - public void cleanTemporaryDataFiles(JavaSparkContext jsc) { - // do nothing for MOR tables - } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 286dcb7ba..a1822bdfc 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -306,6 +306,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { public void testUpsertsWithFinalizeWrite() throws Exception { HoodieWriteConfig cfg = getConfigBuilder() .withUseTempFolderCopyOnWrite(true) + .withUseTempFolderMergeHandle(true) .build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);