diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 6b0adf67f..6ce23a62d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -57,6 +57,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.text.ParseException; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -64,7 +65,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -416,14 +419,30 @@ public class HoodieWriteClient implements Seriali HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); List> stats = writeStatuses - .mapToPair((PairFunction) writeStatus -> - new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat())) - .collect(); + .mapToPair((PairFunction) writeStatus -> + new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat())) + .collect(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); for (Tuple2 stat : stats) { metadata.addWriteStat(stat._1(), stat._2()); } + + // Finalize write + final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); + final Optional result = table.finalizeWrite(jsc, stats); + if (finalizeCtx != null && result.isPresent()) { + Optional durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop())); + durationInMs.ifPresent(duration -> { + logger.info("Finalize write elapsed time (Seconds): " + duration / 1000); + metrics.updateFinalizeWriteMetrics(duration, result.get()); + } + ); + } + + // Clean temp files + cleanTemporaryDataFiles(); + // add in extra metadata if (extraMetadata.isPresent()) { extraMetadata.get().forEach((k, v) -> metadata.addMetadata(k, v)); @@ -679,6 +698,9 @@ public class HoodieWriteClient implements Seriali } }); + // clean data files in temporary folder + cleanTemporaryDataFiles(); + try { if (commitTimeline.empty() && inflightTimeline.empty()) { // nothing to rollback @@ -741,6 +763,35 @@ public class HoodieWriteClient implements Seriali } } + private void cleanTemporaryDataFiles() { + if (!config.shouldFinalizeWrite()) { + return; + } + + final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME); + try { + if (!fs.exists(temporaryFolder)) { + return; + } + List fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder)); + List> results = jsc.parallelize(fileStatusesList, config.getFinalizeParallelism()) + .map(fileStatus -> { + FileSystem fs1 = FSUtils.getFs(); + boolean success = fs1.delete(fileStatus.getPath(), false); + logger.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success); + return new Tuple2<>(fileStatus.getPath().toString(), success); + }).collect(); + + for (Tuple2 result : results) { + if (!result._2()) { + logger.info("Failed to delete file: " + result._1()); + throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1()); + } + } + } catch (IOException e) { + throw new HoodieIOException("Failed to clean data files in temporary folder: " + temporaryFolder); + } + } /** * Releases any resources used by the client. */ @@ -826,6 +877,18 @@ public class HoodieWriteClient implements Seriali String commitActionType = table.getCommitActionType(); activeTimeline.createInflight( new HoodieInstant(true, commitActionType, commitTime)); + + // create temporary folder if needed + if (config.shouldFinalizeWrite()) { + final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME); + try { + if (!fs.exists(temporaryFolder)) { + fs.mkdirs(temporaryFolder); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to create temporary folder: " + temporaryFolder); + } + } } /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index b2efc8254..4c9d1fb2c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -58,6 +58,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class"; private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); + private static final String HOODIE_FINALIZE_WRITE_BEFORE_COMMIT = "hoodie.finalize.write.before.commit"; + private static final String DEFAULT_HOODIE_FINALIZE_WRITE_BEFORE_COMMIT = "false"; + private static final String FINALIZE_PARALLELISM = "hoodie.finalize.parallelism"; + private static final String DEFAULT_FINALIZE_PARALLELISM = "5"; private HoodieWriteConfig(Properties props) { super(props); @@ -114,6 +118,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP); } + public boolean shouldFinalizeWrite() { + return Boolean.parseBoolean(props.getProperty(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT)); + } + + public int getFinalizeParallelism() { + return Integer.parseInt(props.getProperty(FINALIZE_PARALLELISM)); + } + /** * compaction properties **/ @@ -385,6 +397,16 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withFinalizeWrite(boolean shouldFinalizeWrite) { + props.setProperty(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT, String.valueOf(shouldFinalizeWrite)); + return this; + } + + public Builder withFinalizeParallelism(int parallelism) { + props.setProperty(FINALIZE_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public HoodieWriteConfig build() { HoodieWriteConfig config = new HoodieWriteConfig(props); // Check for mandatory properties @@ -408,6 +430,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING); setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP), HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS); + setDefaultOnCondition(props, !props.containsKey(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT), + HOODIE_FINALIZE_WRITE_BEFORE_COMMIT, DEFAULT_HOODIE_FINALIZE_WRITE_BEFORE_COMMIT); + setDefaultOnCondition(props, !props.containsKey(FINALIZE_PARALLELISM), + FINALIZE_PARALLELISM, DEFAULT_FINALIZE_PARALLELISM); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index 629869e18..18f5be319 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -45,6 +45,7 @@ public class HoodieCreateHandle extends HoodieIOH private final WriteStatus status; private final HoodieStorageWriter storageWriter; private final Path path; + private final Path tempPath; private long recordsWritten = 0; private long recordsDeleted = 0; @@ -55,7 +56,14 @@ public class HoodieCreateHandle extends HoodieIOH status.setFileId(UUID.randomUUID().toString()); status.setPartitionPath(partitionPath); - this.path = makeNewPath(partitionPath, TaskContext.getPartitionId(), status.getFileId()); + final int sparkPartitionId = TaskContext.getPartitionId(); + this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId()); + if (config.shouldFinalizeWrite()) { + this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId()); + } else { + this.tempPath = null; + } + try { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime, @@ -64,10 +72,10 @@ public class HoodieCreateHandle extends HoodieIOH partitionMetadata.trySave(TaskContext.getPartitionId()); this.storageWriter = HoodieStorageWriterFactory - .getStorageWriter(commitTime, path, hoodieTable, config, schema); + .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema); } catch (IOException e) { throw new HoodieInsertException( - "Failed to initialize HoodieStorageWriter for path " + path, e); + "Failed to initialize HoodieStorageWriter for path " + getStorageWriterPath(), e); } logger.info("New InsertHandle for partition :" + partitionPath); } @@ -126,7 +134,10 @@ public class HoodieCreateHandle extends HoodieIOH stat.setFileId(status.getFileId()); String relativePath = path.toString().replace(new Path(config.getBasePath()) + "/", ""); stat.setPath(relativePath); - stat.setTotalWriteBytes(FSUtils.getFileSize(fs, path)); + if (tempPath != null) { + stat.setTempPath(tempPath.toString().replace(new Path(config.getBasePath()) + "/", "")); + } + stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath())); stat.setTotalWriteErrors(status.getFailedRecords().size()); status.setStat(stat); @@ -136,4 +147,12 @@ public class HoodieCreateHandle extends HoodieIOH e); } } + + private Path getStorageWriterPath() { + // Use tempPath for storage writer if possible + if (this.tempPath != null) { + return this.tempPath; + } + return this.path; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index a02df5213..f207ea41a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -17,6 +17,7 @@ package com.uber.hoodie.io; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.util.FSUtils; @@ -65,6 +66,12 @@ public abstract class HoodieIOHandle { FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName)); } + public Path makeTempPath(String partitionPath, int taskPartitionId, String fileName, int stageId, long taskAttemptId) { + Path path = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME); + return new Path(path.toString(), + FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId, taskAttemptId)); + } + /** * Deletes any new tmp files written during the current commit, into the partition */ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java index f6c79bb10..fe275e159 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java @@ -37,9 +37,11 @@ public class HoodieMetrics { public String rollbackTimerName = null; public String cleanTimerName = null; public String commitTimerName = null; + public String finalizeTimerName = null; private Timer rollbackTimer = null; private Timer cleanTimer = null; private Timer commitTimer = null; + private Timer finalizeTimer = null; public HoodieMetrics(HoodieWriteConfig config, String tableName) { this.config = config; @@ -49,6 +51,7 @@ public class HoodieMetrics { this.rollbackTimerName = getMetricsName("timer", "rollback"); this.cleanTimerName = getMetricsName("timer", "clean"); this.commitTimerName = getMetricsName("timer", "commit"); + this.finalizeTimerName = getMetricsName("timer", "finalize"); } } @@ -77,6 +80,13 @@ public class HoodieMetrics { return commitTimer == null ? null : commitTimer.time(); } + public Timer.Context getFinalizeCtx() { + if (config.isMetricsOn() && finalizeTimer == null) { + finalizeTimer = createTimer(finalizeTimerName); + } + return finalizeTimer == null ? null : finalizeTimer.time(); + } + public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata) { if (config.isMetricsOn()) { @@ -119,6 +129,15 @@ public class HoodieMetrics { } } + public void updateFinalizeWriteMetrics(long durationInMs, int numFilesFinalized) { + if (config.isMetricsOn()) { + logger.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", + durationInMs, numFilesFinalized)); + registerGauge(getMetricsName("finalize", "duration"), durationInMs); + registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized); + } + } + @VisibleForTesting String getMetricsName(String action, String metric) { return config == null ? null : diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 3e4b1dc86..699d772d0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -27,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; @@ -574,6 +575,41 @@ public class HoodieCopyOnWriteTable extends Hoodi return stats; } + @Override + @SuppressWarnings("unchecked") + public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) { + if (!config.shouldFinalizeWrite()) { + return Optional.empty(); + } + + List> results = jsc.parallelize(writeStatuses, config.getFinalizeParallelism()) + .map(writeStatus -> { + Tuple2 writeStatTuple2 = (Tuple2) writeStatus; + HoodieWriteStat writeStat = writeStatTuple2._2(); + final FileSystem fs = FSUtils.getFs(); + final Path finalPath = new Path(config.getBasePath(), writeStat.getPath()); + + if (writeStat.getTempPath() != null) { + final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath()); + boolean success; + try { + logger.info("Renaming temporary file: " + tempPath + " to " + finalPath); + success = fs.rename(tempPath, finalPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to rename file: " + tempPath + " to " + finalPath); + } + + if (!success) { + throw new HoodieIOException("Failed to rename file: " + tempPath + " to " + finalPath); + } + } + + return new Tuple2<>(writeStat.getPath(), true); + }).collect(); + + return Optional.of(results.size()); + } + private static class PartitionCleanStat implements Serializable { private final String partitionPath; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 839aa1840..1e507c645 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -250,4 +250,9 @@ public class HoodieMergeOnReadTable extends return allRollbackStats; } + @Override + public Optional finalizeWrite(JavaSparkContext jsc, List writeStatuses) { + // do nothing for MOR tables + return Optional.empty(); + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 3140e34ee..c3b9d1f70 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -23,6 +23,7 @@ import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; @@ -46,6 +47,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; /** * Abstract implementation of a HoodieTable @@ -270,4 +272,12 @@ public abstract class HoodieTable implements Seri */ public abstract List rollback(JavaSparkContext jsc, List commits) throws IOException; + + /** + * Finalize the written data files + * + * @param writeStatuses List of WriteStatus + * @return number of files finalized + */ + public abstract Optional finalizeWrite(JavaSparkContext jsc, List> writeStatuses); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 184ea82e0..10773db0f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -302,6 +302,100 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "001").count()); } + @Test + public void testUpsertsWithFinalizeWrite() throws Exception { + HoodieWriteConfig cfg = getConfigBuilder() + .withFinalizeWrite(true) + .build(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); + FileSystem fs = FSUtils.getFs(); + + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + // check the partition metadata is written out + assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs); + + // verify that there is a commit + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()) + .getCommitTimeline(); + + assertEquals("Expecting a single commit.", 1, + timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); + assertEquals("Latest commit should be 001", newCommitTime, + timeline.lastInstant().get().getTimestamp()); + assertEquals("Must contain 200 records", + records.size(), + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); + // Should have 100 records in table (check using Index), all in locations marked at commit + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + + List taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table) + .collect(); + checkTaggedRecords(taggedRecords, "001"); + + /** + * Write 2 (updates) + */ + newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, 100); + LinkedHashMap recordsMap = new LinkedHashMap<>(); + for (HoodieRecord rec : records) { + if (!recordsMap.containsKey(rec.getKey())) { + recordsMap.put(rec.getKey(), rec); + } + } + List dedupedRecords = new ArrayList<>(recordsMap.values()); + + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + // verify there are now 2 commits + timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline(); + assertEquals("Expecting two commits.", + timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), 2); + assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(), + newCommitTime); + + metaClient = new HoodieTableMetaClient(fs, basePath); + table = HoodieTable.getHoodieTable(metaClient, getConfig()); + + // Index should be able to locate all updates in correct locations. + taggedRecords = index.tagLocation(jsc.parallelize(dedupedRecords, 1), table).collect(); + checkTaggedRecords(taggedRecords, "004"); + + // Check the entire dataset has 100 records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain 200 records", + 200, + HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths).count()); + + // Check that the incremental consumption from time 000 + assertEquals("Incremental consumption from time 002, should give all records in commit 004", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "002").count()); + assertEquals("Incremental consumption from time 001, should give all records in commit 004", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "001").count()); + } + @Test public void testDeletes() throws Exception { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java index f1a58f740..c8f241062 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java @@ -70,6 +70,12 @@ public class HoodieWriteStat implements Serializable { */ private long totalWriteErrors; + /** + * Relative path to the temporary file from the base path. + */ + @Nullable + private String tempPath; + /** * Following properties are associated only with the result of a Compaction Operation */ @@ -198,11 +204,20 @@ public class HoodieWriteStat implements Serializable { this.totalRecordsToBeUpdate = totalRecordsToBeUpdate; } + public void setTempPath(String tempPath) { + this.tempPath = tempPath; + } + + public String getTempPath() { + return this.tempPath; + } + @Override public String toString() { return new StringBuilder() .append("HoodieWriteStat {") .append("path=" + path) + .append(", tempPath=" + tempPath) .append(", prevCommit='" + prevCommit + '\'') .append(", numWrites=" + numWrites) .append(", numDeletes=" + numDeletes) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index 841cf47ab..c776abf38 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -50,6 +50,7 @@ public class HoodieTableMetaClient implements Serializable { private final transient static Logger log = LogManager.getLogger(HoodieTableMetaClient.class); public static String METAFOLDER_NAME = ".hoodie"; + public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp"; private String basePath; private transient FileSystem fs; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index f0173f077..31fac7355 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -94,6 +94,10 @@ public class FSUtils { return String.format("%s_%d_%s.parquet", fileId, taskPartitionId, commitTime); } + public static String makeTempDataFileName(String partitionPath, String commitTime, int taskPartitionId, String fileId, int stageId, long taskAttemptId) { + return String.format("%s_%s_%d_%s_%d_%d.parquet", partitionPath.replace("/", "-"), fileId, taskPartitionId, commitTime, stageId, taskAttemptId); + } + public static String maskWithoutFileId(String commitTime, int taskPartitionId) { return String.format("*_%s_%s.parquet", taskPartitionId, commitTime); } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java index 43b0ebdf3..3e05158b2 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java @@ -43,6 +43,18 @@ public class TestFSUtils { .equals(fileName + "_" + taskPartitionId + "_" + commitTime + ".parquet")); } + @Test + public void testMakeTempDataFileName() { + String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); + String partitionPath = "2017/12/31"; + int taskPartitionId = Integer.MAX_VALUE; + int stageId = Integer.MAX_VALUE; + long taskAttemptId = Long.MAX_VALUE; + String fileName = UUID.randomUUID().toString(); + assertTrue(FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId, taskAttemptId) + .equals(partitionPath.replace("/", "-") + "_" + fileName + "_" + taskPartitionId + "_" + commitTime + "_" + stageId + "_" + taskAttemptId + ".parquet")); + } + @Test public void testMaskFileName() { String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());