diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 4f19ee679..b6d5f766b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -58,10 +58,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class"; private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); - private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "hoodie.copyonwrite.use.temp.folder"; - private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "false"; - private static final String HOODIE_MERGEHANDLE_USE_TEMP_FOLDER = "hoodie.mergehandle.use.temp.folder"; - private static final String DEFAULT_HOODIE_MERGEHANDLE_USE_TEMP_FOLDER = "false"; + private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE = "hoodie.copyonwrite.use.temp.folder.for.create"; + private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE = "false"; + private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "hoodie.copyonwrite.use.temp.folder.for.merge"; + private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false"; private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; @@ -120,12 +120,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP); } - public boolean shouldUseTempFolderForCopyOnWrite() { - return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER)); + public boolean shouldUseTempFolderForCopyOnWriteForCreate() { + return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE)); } - public boolean shouldUseTempFolderForMergeHandle() { - return Boolean.parseBoolean(props.getProperty(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER)); + public boolean shouldUseTempFolderForCopyOnWriteForMerge() { + return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE)); + } + + public boolean shouldUseTempFolderForCopyOnWrite() { + return shouldUseTempFolderForCopyOnWriteForCreate() || + shouldUseTempFolderForCopyOnWriteForMerge(); } public int getFinalizeWriteParallelism() { @@ -403,13 +408,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } - public Builder withUseTempFolderCopyOnWrite(boolean shouldUseTempFolderCopyOnWrite) { - props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER, String.valueOf(shouldUseTempFolderCopyOnWrite)); + public Builder withUseTempFolderCopyOnWriteForCreate( + boolean shouldUseTempFolderCopyOnWriteForCreate) { + props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE, String.valueOf + (shouldUseTempFolderCopyOnWriteForCreate)); return this; } - public Builder withUseTempFolderMergeHandle(boolean shouldUseTempFolderMergeHandle) { - props.setProperty(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER, String.valueOf(shouldUseTempFolderMergeHandle)); + public Builder withUseTempFolderCopyOnWriteForMerge( + boolean shouldUseTempFolderCopyOnWriteForMerge) { + props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE, String.valueOf + (shouldUseTempFolderCopyOnWriteForMerge)); return this; } @@ -441,10 +450,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING); setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP), HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS); - setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER), - HOODIE_COPYONWRITE_USE_TEMP_FOLDER, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER); - setDefaultOnCondition(props, !props.containsKey(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER), - HOODIE_MERGEHANDLE_USE_TEMP_FOLDER, DEFAULT_HOODIE_MERGEHANDLE_USE_TEMP_FOLDER); + setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE), + HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE); + setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE), + HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE); setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index 8d933d40c..1fa52de25 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -58,8 +58,9 @@ public class HoodieCreateHandle extends HoodieIOH final int sparkPartitionId = TaskContext.getPartitionId(); this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId()); - if (config.shouldUseTempFolderForCopyOnWrite()) { - this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); + if (config.shouldUseTempFolderForCopyOnWriteForCreate()) { + this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), + TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); } try { @@ -144,9 +145,6 @@ public class HoodieCreateHandle extends HoodieIOH private Path getStorageWriterPath() { // Use tempPath for storage writer if possible - if (this.tempPath != null) { - return this.tempPath; - } - return this.path; + return (this.tempPath == null) ? this.path : this.tempPath; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 35706c630..7500abf3a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -101,7 +101,7 @@ public class HoodieMergeHandle extends HoodieIOHa String relativePath = new Path(record.getPartitionPath() + "/" + FSUtils .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); - if (config.shouldUseTempFolderForCopyOnWrite() && config.shouldUseTempFolderForMergeHandle()) { + if (config.shouldUseTempFolderForCopyOnWriteForMerge()) { this.tempPath = makeTempPath(record.getPartitionPath(), TaskContext.getPartitionId(), fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); } @@ -111,7 +111,7 @@ public class HoodieMergeHandle extends HoodieIOHa } logger.info(String.format("Merging new data into oldPath %s, as newPath %s", - oldFilePath.toString(), getNewFilePath().toString())); + oldFilePath.toString(), getStorageWriterPath().toString())); // file name is same for all records, in this bunch writeStatus.setFileId(fileId); writeStatus.setPartitionPath(record.getPartitionPath()); @@ -124,7 +124,7 @@ public class HoodieMergeHandle extends HoodieIOHa } // Create the writer for writing the new version file storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, getNewFilePath(), hoodieTable, config, schema); + .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema); } catch (Exception e) { logger.error("Error in update task at commit " + commitTime, e); @@ -190,18 +190,18 @@ public class HoodieMergeHandle extends HoodieIOHa if (copyOldRecord) { // this should work as it is, since this is an existing record String errMsg = "Failed to merge old record into new file for key " + key + " from old file " - + getOldFilePath() + " to new file " + getNewFilePath(); + + getOldFilePath() + " to new file " + getStorageWriterPath(); try { storageWriter.writeAvro(key, oldRecord); } catch (ClassCastException e) { logger.error( "Schema mismatch when rewriting old record " + oldRecord + " from file " - + getOldFilePath() + " to file " + getNewFilePath() + " with schema " + schema + + getOldFilePath() + " to file " + getStorageWriterPath() + " with schema " + schema .toString(true)); throw new HoodieUpsertException(errMsg, e); } catch (IOException e) { logger.error("Failed to merge old record into new file for key " + key + " from old file " - + getOldFilePath() + " to new file " + getNewFilePath(), e); + + getOldFilePath() + " to new file " + getStorageWriterPath(), e); throw new HoodieUpsertException(errMsg, e); } recordsWritten++; @@ -223,7 +223,7 @@ public class HoodieMergeHandle extends HoodieIOHa storageWriter.close(); } - writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, getNewFilePath())); + writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath())); writeStatus.getStat().setNumWrites(recordsWritten); writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); @@ -237,12 +237,9 @@ public class HoodieMergeHandle extends HoodieIOHa return oldFilePath; } - private Path getNewFilePath() { + private Path getStorageWriterPath() { // Use tempPath for storage writer if possible - if (this.tempPath != null) { - return this.tempPath; - } - return this.newFilePath; + return (this.tempPath == null) ? this.newFilePath : this.tempPath; } public WriteStatus getWriteStatus() { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index f13336f0e..cb706a389 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -579,6 +579,12 @@ public class HoodieCopyOnWriteTable extends Hoodi return stats; } + /** + * Finalize the written data files + * + * @param writeStatuses List of WriteStatus + * @return number of files finalized + */ @Override @SuppressWarnings("unchecked") public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) { @@ -618,6 +624,10 @@ public class HoodieCopyOnWriteTable extends Hoodi return Optional.of(results.size()); } + /** + * Clean temporary data files that are produced from previous failed commit or retried spark + * stages. + */ private void cleanTemporaryDataFiles(JavaSparkContext jsc) { if (!config.shouldUseTempFolderForCopyOnWrite()) { return; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 06eec8e45..c3b9d1f70 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -280,5 +280,4 @@ public abstract class HoodieTable implements Seri * @return number of files finalized */ public abstract Optional finalizeWrite(JavaSparkContext jsc, List> writeStatuses); - } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 4c158e27d..5caeec23b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -48,6 +48,7 @@ import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.io.FileInputStream; @@ -55,6 +56,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -211,9 +213,12 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { @Test public void testUpserts() throws Exception { - HoodieWriteConfig cfg = getConfig(); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); - HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); + testUpsertsInternal(getConfig()); + } + + private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig) throws Exception { + HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig); + HoodieIndex index = HoodieIndex.createIndex(hoodieWriteConfig, jsc); /** * Write 1 (only inserts) @@ -304,95 +309,11 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { @Test public void testUpsertsWithFinalizeWrite() throws Exception { - HoodieWriteConfig cfg = getConfigBuilder() - .withUseTempFolderCopyOnWrite(true) - .withUseTempFolderMergeHandle(true) + HoodieWriteConfig hoodieWriteConfig = getConfigBuilder() + .withUseTempFolderCopyOnWriteForCreate(true) + .withUseTempFolderCopyOnWriteForMerge(true) .build(); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); - HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); - - /** - * Write 1 (only inserts) - */ - String newCommitTime = "001"; - client.startCommitWithTime(newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 200); - JavaRDD writeRecords = jsc.parallelize(records, 1); - - List statuses = client.upsert(writeRecords, newCommitTime).collect(); - assertNoWriteErrors(statuses); - - // check the partition metadata is written out - assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs); - - // verify that there is a commit - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - - assertEquals("Expecting a single commit.", 1, - timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); - assertEquals("Latest commit should be 001", newCommitTime, - timeline.lastInstant().get().getTimestamp()); - assertEquals("Must contain 200 records", - records.size(), - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); - // Should have 100 records in table (check using Index), all in locations marked at commit - HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); - - List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table) - .collect(); - checkTaggedRecords(taggedRecords, "001"); - - /** - * Write 2 (updates) - */ - newCommitTime = "004"; - client.startCommitWithTime(newCommitTime); - - records = dataGen.generateUpdates(newCommitTime, 100); - LinkedHashMap recordsMap = new LinkedHashMap<>(); - for (HoodieRecord rec : records) { - if (!recordsMap.containsKey(rec.getKey())) { - recordsMap.put(rec.getKey(), rec); - } - } - List dedupedRecords = new ArrayList<>(recordsMap.values()); - - statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - // verify there are now 2 commits - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); - assertEquals("Expecting two commits.", - timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), 2); - assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), - newCommitTime); - - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metaClient, getConfig()); - - // Index should be able to locate all updates in correct locations. - taggedRecords = index.tagLocation(jsc.parallelize(dedupedRecords, 1), table).collect(); - checkTaggedRecords(taggedRecords, "004"); - - // Check the entire dataset has 100 records still - String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; - for (int i = 0; i < fullPartitionPaths.length; i++) { - fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); - } - assertEquals("Must contain 200 records", - 200, - HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths).count()); - - // Check that the incremental consumption from time 000 - assertEquals("Incremental consumption from time 002, should give all records in commit 004", - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "002").count()); - assertEquals("Incremental consumption from time 001, should give all records in commit 004", - HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), - HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "001").count()); + testUpsertsInternal(hoodieWriteConfig); } @Test @@ -1575,6 +1496,32 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { .count() == 3); } + @Test + public void testCleanTemporaryDataFiles() throws IOException { + HoodieTestUtils.createCommitFiles(basePath, "000"); + List tempFiles = createTempFiles("000", 10); + assertEquals("Some temp files are created.",10, tempFiles.size()); + assertEquals("Some temp files are created.",tempFiles.size(), getTotalTempFiles()); + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withUseTempFolderCopyOnWriteForCreate(false) + .withUseTempFolderCopyOnWriteForMerge(false).build(); + HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), + config.getBasePath(), true), + config); + table.rollback(jsc, Collections.emptyList()); + assertEquals("Some temp files are created.",tempFiles.size(), getTotalTempFiles()); + + config = HoodieWriteConfig.newBuilder().withPath(basePath) + .withUseTempFolderCopyOnWriteForCreate(true) + .withUseTempFolderCopyOnWriteForMerge(false).build(); + table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), + config.getBasePath(), true), + config); + table.rollback(jsc, Collections.emptyList()); + assertEquals("All temp files are deleted.",0, getTotalTempFiles()); + } + public void testCommitWritesRelativePaths() throws Exception { HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); @@ -1642,6 +1589,18 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { return files; } + private List createTempFiles(String commitTime, int numFiles) throws IOException { + List files = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + files.add(HoodieTestUtils.createNewDataFile(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, commitTime)); + } + return files; + } + + private int getTotalTempFiles() throws IOException { + return fs.listStatus(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)).length; + } + @After public void clean() { if (basePath != null) {