diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 6ce23a62d..ddde0b964 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -419,9 +419,9 @@ public class HoodieWriteClient implements Seriali HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); List> stats = writeStatuses - .mapToPair((PairFunction) writeStatus -> - new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat())) - .collect(); + .mapToPair((PairFunction) writeStatus -> + new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat())) + .collect(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); for (Tuple2 stat : stats) { @@ -434,14 +434,14 @@ public class HoodieWriteClient implements Seriali if (finalizeCtx != null && result.isPresent()) { Optional durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop())); durationInMs.ifPresent(duration -> { - logger.info("Finalize write elapsed time (Seconds): " + duration / 1000); + logger.info("Finalize write elapsed time (milliseconds): " + duration); metrics.updateFinalizeWriteMetrics(duration, result.get()); } ); } // Clean temp files - cleanTemporaryDataFiles(); + table.cleanTemporaryDataFiles(jsc); // add in extra metadata if (extraMetadata.isPresent()) { @@ -699,7 +699,7 @@ public class HoodieWriteClient implements Seriali }); // clean data files in temporary folder - cleanTemporaryDataFiles(); + table.cleanTemporaryDataFiles(jsc); try { if (commitTimeline.empty() && inflightTimeline.empty()) { @@ -763,35 +763,6 @@ public class HoodieWriteClient implements Seriali } } - private void cleanTemporaryDataFiles() { - if (!config.shouldFinalizeWrite()) { - return; - } - - final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME); - try { - if (!fs.exists(temporaryFolder)) { - return; - } - List fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder)); - List> results = jsc.parallelize(fileStatusesList, config.getFinalizeParallelism()) - .map(fileStatus -> { - FileSystem fs1 = FSUtils.getFs(); - boolean success = fs1.delete(fileStatus.getPath(), false); - logger.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success); - return new Tuple2<>(fileStatus.getPath().toString(), success); - }).collect(); - - for (Tuple2 result : results) { - if (!result._2()) { - logger.info("Failed to delete file: " + result._1()); - throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1()); - } - } - } catch (IOException e) { - throw new HoodieIOException("Failed to clean data files in temporary folder: " + temporaryFolder); - } - } /** * Releases any resources used by the client. */ @@ -877,18 +848,7 @@ public class HoodieWriteClient implements Seriali String commitActionType = table.getCommitActionType(); activeTimeline.createInflight( new HoodieInstant(true, commitActionType, commitTime)); - - // create temporary folder if needed - if (config.shouldFinalizeWrite()) { - final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME); - try { - if (!fs.exists(temporaryFolder)) { - fs.mkdirs(temporaryFolder); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to create temporary folder: " + temporaryFolder); - } - } + table.initializeFinalizeWrite(); } /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 4c9d1fb2c..0677f4500 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -58,10 +58,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class"; private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); - private static final String HOODIE_FINALIZE_WRITE_BEFORE_COMMIT = "hoodie.finalize.write.before.commit"; - private static final String DEFAULT_HOODIE_FINALIZE_WRITE_BEFORE_COMMIT = "false"; - private static final String FINALIZE_PARALLELISM = "hoodie.finalize.parallelism"; - private static final String DEFAULT_FINALIZE_PARALLELISM = "5"; + private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "hoodie.copyonwrite.use.temp.folder"; + private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "false"; + private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; + private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = "5"; private HoodieWriteConfig(Properties props) { super(props); @@ -118,12 +118,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP); } - public boolean shouldFinalizeWrite() { - return Boolean.parseBoolean(props.getProperty(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT)); + public boolean shouldUseTempFolderForCopyOnWrite() { + return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER)); } - public int getFinalizeParallelism() { - return Integer.parseInt(props.getProperty(FINALIZE_PARALLELISM)); + public int getFinalizeWriteParallelism() { + return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM)); } /** @@ -397,13 +397,13 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } - public Builder withFinalizeWrite(boolean shouldFinalizeWrite) { - props.setProperty(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT, String.valueOf(shouldFinalizeWrite)); + public Builder withUseTempFolderCopyOnWrite(boolean shouldUseTempFolderCopyOnWrite) { + props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER, String.valueOf(shouldUseTempFolderCopyOnWrite)); return this; } - public Builder withFinalizeParallelism(int parallelism) { - props.setProperty(FINALIZE_PARALLELISM, String.valueOf(parallelism)); + public Builder withFinalizeWriteParallelism(int parallelism) { + props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism)); return this; } @@ -430,10 +430,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING); setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP), HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS); - setDefaultOnCondition(props, !props.containsKey(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT), - HOODIE_FINALIZE_WRITE_BEFORE_COMMIT, DEFAULT_HOODIE_FINALIZE_WRITE_BEFORE_COMMIT); - setDefaultOnCondition(props, !props.containsKey(FINALIZE_PARALLELISM), - FINALIZE_PARALLELISM, DEFAULT_FINALIZE_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER), + HOODIE_COPYONWRITE_USE_TEMP_FOLDER, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER); + setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), + FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index 18f5be319..d4bb198ae 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -45,7 +45,7 @@ public class HoodieCreateHandle extends HoodieIOH private final WriteStatus status; private final HoodieStorageWriter storageWriter; private final Path path; - private final Path tempPath; + private Path tempPath = null; private long recordsWritten = 0; private long recordsDeleted = 0; @@ -58,10 +58,8 @@ public class HoodieCreateHandle extends HoodieIOH final int sparkPartitionId = TaskContext.getPartitionId(); this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId()); - if (config.shouldFinalizeWrite()) { + if (config.shouldUseTempFolderForCopyOnWrite()) { this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); - } else { - this.tempPath = null; } try { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 699d772d0..c03f75f78 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -575,14 +576,33 @@ public class HoodieCopyOnWriteTable extends Hoodi return stats; } + @Override + public void initializeFinalizeWrite() { + if (!config.shouldUseTempFolderForCopyOnWrite()) { + return; + } + + // create temporary folder if needed + final FileSystem fs = FSUtils.getFs(); + final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME); + try { + if (!fs.exists(temporaryFolder)) { + fs.mkdirs(temporaryFolder); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to create temporary folder: " + temporaryFolder); + } + } + @Override @SuppressWarnings("unchecked") public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) { - if (!config.shouldFinalizeWrite()) { + if (!config.shouldUseTempFolderForCopyOnWrite()) { return Optional.empty(); } - List> results = jsc.parallelize(writeStatuses, config.getFinalizeParallelism()) + // This is to rename each data file from temporary path to its final location + List> results = jsc.parallelize(writeStatuses, config.getFinalizeWriteParallelism()) .map(writeStatus -> { Tuple2 writeStatTuple2 = (Tuple2) writeStatus; HoodieWriteStat writeStat = writeStatTuple2._2(); @@ -610,6 +630,44 @@ public class HoodieCopyOnWriteTable extends Hoodi return Optional.of(results.size()); } + @Override + public void cleanTemporaryDataFiles(JavaSparkContext jsc) { + if (!config.shouldUseTempFolderForCopyOnWrite()) { + return; + } + + final FileSystem fs = FSUtils.getFs(); + final Path temporaryFolder = new Path(config.getBasePath(), + HoodieTableMetaClient.TEMPFOLDER_NAME); + try { + if (!fs.exists(temporaryFolder)) { + logger.info("Temporary folder does not exist: " + temporaryFolder); + return; + } + List fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder)); + List> results = jsc + .parallelize(fileStatusesList, config.getFinalizeWriteParallelism()) + .map(fileStatus -> { + FileSystem fs1 = FSUtils.getFs(); + boolean success = fs1.delete(fileStatus.getPath(), false); + logger.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + + success); + return new Tuple2<>(fileStatus.getPath().toString(), success); + }).collect(); + + for (Tuple2 result : results) { + if (!result._2()) { + logger.info("Failed to delete file: " + result._1()); + throw new HoodieIOException( + "Failed to delete file in temporary folder: " + result._1()); + } + } + } catch (IOException e) { + throw new HoodieIOException( + "Failed to clean data files in temporary folder: " + temporaryFolder); + } + } + private static class PartitionCleanStat implements Serializable { private final String partitionPath; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 1e507c645..e10dde669 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -250,9 +250,19 @@ public class HoodieMergeOnReadTable extends return allRollbackStats; } + @Override + public void initializeFinalizeWrite() { + // do nothing for MOR tables + } + @Override public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) { // do nothing for MOR tables return Optional.empty(); } + + @Override + public void cleanTemporaryDataFiles(JavaSparkContext jsc) { + // do nothing for MOR tables + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index c3b9d1f70..ca9f37762 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -273,6 +273,11 @@ public abstract class HoodieTable implements Seri public abstract List rollback(JavaSparkContext jsc, List commits) throws IOException; + /** + * Initialize resources needed for finalize write. + */ + public abstract void initializeFinalizeWrite(); + /** * Finalize the written data files * @@ -280,4 +285,9 @@ public abstract class HoodieTable implements Seri * @return number of files finalized */ public abstract Optional finalizeWrite(JavaSparkContext jsc, List> writeStatuses); + + /** + * Clean temporary data files after data files are finalized or commit is rolled back. + */ + public abstract void cleanTemporaryDataFiles(JavaSparkContext jsc); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 10773db0f..286dcb7ba 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -305,7 +305,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { @Test public void testUpsertsWithFinalizeWrite() throws Exception { HoodieWriteConfig cfg = getConfigBuilder() - .withFinalizeWrite(true) + .withUseTempFolderCopyOnWrite(true) .build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);