1
0

Add FinalizeWrite in HoodieCreateHandle for COW tables

This commit is contained in:
Jian Xu
2017-11-29 16:59:28 -08:00
committed by vinoth chandar
parent e10100fe32
commit c874248f23
13 changed files with 318 additions and 7 deletions

View File

@@ -57,6 +57,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -64,7 +65,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -416,14 +419,30 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
List<Tuple2<String, HoodieWriteStat>> stats = writeStatuses
.mapToPair((PairFunction<WriteStatus, String, HoodieWriteStat>) writeStatus ->
new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()))
.collect();
.mapToPair((PairFunction<WriteStatus, String, HoodieWriteStat>) writeStatus ->
new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()))
.collect();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
for (Tuple2<String, HoodieWriteStat> stat : stats) {
metadata.addWriteStat(stat._1(), stat._2());
}
// Finalize write
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
final Optional<Integer> result = table.finalizeWrite(jsc, stats);
if (finalizeCtx != null && result.isPresent()) {
Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
durationInMs.ifPresent(duration -> {
logger.info("Finalize write elapsed time (Seconds): " + duration / 1000);
metrics.updateFinalizeWriteMetrics(duration, result.get());
}
);
}
// Clean temp files
cleanTemporaryDataFiles();
// add in extra metadata
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach((k, v) -> metadata.addMetadata(k, v));
@@ -679,6 +698,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
});
// clean data files in temporary folder
cleanTemporaryDataFiles();
try {
if (commitTimeline.empty() && inflightTimeline.empty()) {
// nothing to rollback
@@ -741,6 +763,35 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}
private void cleanTemporaryDataFiles() {
if (!config.shouldFinalizeWrite()) {
return;
}
final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
try {
if (!fs.exists(temporaryFolder)) {
return;
}
List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder));
List<Tuple2<String, Boolean>> results = jsc.parallelize(fileStatusesList, config.getFinalizeParallelism())
.map(fileStatus -> {
FileSystem fs1 = FSUtils.getFs();
boolean success = fs1.delete(fileStatus.getPath(), false);
logger.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success);
return new Tuple2<>(fileStatus.getPath().toString(), success);
}).collect();
for (Tuple2<String, Boolean> result : results) {
if (!result._2()) {
logger.info("Failed to delete file: " + result._1());
throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1());
}
}
} catch (IOException e) {
throw new HoodieIOException("Failed to clean data files in temporary folder: " + temporaryFolder);
}
}
/**
* Releases any resources used by the client.
*/
@@ -826,6 +877,18 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
String commitActionType = table.getCommitActionType();
activeTimeline.createInflight(
new HoodieInstant(true, commitActionType, commitTime));
// create temporary folder if needed
if (config.shouldFinalizeWrite()) {
final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
try {
if (!fs.exists(temporaryFolder)) {
fs.mkdirs(temporaryFolder);
}
} catch (IOException e) {
throw new HoodieIOException("Failed to create temporary folder: " + temporaryFolder);
}
}
}
/**

View File

@@ -58,6 +58,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
private static final String HOODIE_FINALIZE_WRITE_BEFORE_COMMIT = "hoodie.finalize.write.before.commit";
private static final String DEFAULT_HOODIE_FINALIZE_WRITE_BEFORE_COMMIT = "false";
private static final String FINALIZE_PARALLELISM = "hoodie.finalize.parallelism";
private static final String DEFAULT_FINALIZE_PARALLELISM = "5";
private HoodieWriteConfig(Properties props) {
super(props);
@@ -114,6 +118,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
}
public boolean shouldFinalizeWrite() {
return Boolean.parseBoolean(props.getProperty(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT));
}
public int getFinalizeParallelism() {
return Integer.parseInt(props.getProperty(FINALIZE_PARALLELISM));
}
/**
* compaction properties
**/
@@ -385,6 +397,16 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withFinalizeWrite(boolean shouldFinalizeWrite) {
props.setProperty(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT, String.valueOf(shouldFinalizeWrite));
return this;
}
public Builder withFinalizeParallelism(int parallelism) {
props.setProperty(FINALIZE_PARALLELISM, String.valueOf(parallelism));
return this;
}
public HoodieWriteConfig build() {
HoodieWriteConfig config = new HoodieWriteConfig(props);
// Check for mandatory properties
@@ -408,6 +430,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT),
HOODIE_FINALIZE_WRITE_BEFORE_COMMIT, DEFAULT_HOODIE_FINALIZE_WRITE_BEFORE_COMMIT);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_PARALLELISM),
FINALIZE_PARALLELISM, DEFAULT_FINALIZE_PARALLELISM);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,

View File

@@ -45,6 +45,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
private final WriteStatus status;
private final HoodieStorageWriter<IndexedRecord> storageWriter;
private final Path path;
private final Path tempPath;
private long recordsWritten = 0;
private long recordsDeleted = 0;
@@ -55,7 +56,14 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
status.setFileId(UUID.randomUUID().toString());
status.setPartitionPath(partitionPath);
this.path = makeNewPath(partitionPath, TaskContext.getPartitionId(), status.getFileId());
final int sparkPartitionId = TaskContext.getPartitionId();
this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId());
if (config.shouldFinalizeWrite()) {
this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
} else {
this.tempPath = null;
}
try {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs,
commitTime,
@@ -64,10 +72,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
partitionMetadata.trySave(TaskContext.getPartitionId());
this.storageWriter =
HoodieStorageWriterFactory
.getStorageWriter(commitTime, path, hoodieTable, config, schema);
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema);
} catch (IOException e) {
throw new HoodieInsertException(
"Failed to initialize HoodieStorageWriter for path " + path, e);
"Failed to initialize HoodieStorageWriter for path " + getStorageWriterPath(), e);
}
logger.info("New InsertHandle for partition :" + partitionPath);
}
@@ -126,7 +134,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
stat.setFileId(status.getFileId());
String relativePath = path.toString().replace(new Path(config.getBasePath()) + "/", "");
stat.setPath(relativePath);
stat.setTotalWriteBytes(FSUtils.getFileSize(fs, path));
if (tempPath != null) {
stat.setTempPath(tempPath.toString().replace(new Path(config.getBasePath()) + "/", ""));
}
stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
stat.setTotalWriteErrors(status.getFailedRecords().size());
status.setStat(stat);
@@ -136,4 +147,12 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
e);
}
}
private Path getStorageWriterPath() {
// Use tempPath for storage writer if possible
if (this.tempPath != null) {
return this.tempPath;
}
return this.path;
}
}

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.io;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
@@ -65,6 +66,12 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
}
public Path makeTempPath(String partitionPath, int taskPartitionId, String fileName, int stageId, long taskAttemptId) {
Path path = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
return new Path(path.toString(),
FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId, taskAttemptId));
}
/**
* Deletes any new tmp files written during the current commit, into the partition
*/

View File

@@ -37,9 +37,11 @@ public class HoodieMetrics {
public String rollbackTimerName = null;
public String cleanTimerName = null;
public String commitTimerName = null;
public String finalizeTimerName = null;
private Timer rollbackTimer = null;
private Timer cleanTimer = null;
private Timer commitTimer = null;
private Timer finalizeTimer = null;
public HoodieMetrics(HoodieWriteConfig config, String tableName) {
this.config = config;
@@ -49,6 +51,7 @@ public class HoodieMetrics {
this.rollbackTimerName = getMetricsName("timer", "rollback");
this.cleanTimerName = getMetricsName("timer", "clean");
this.commitTimerName = getMetricsName("timer", "commit");
this.finalizeTimerName = getMetricsName("timer", "finalize");
}
}
@@ -77,6 +80,13 @@ public class HoodieMetrics {
return commitTimer == null ? null : commitTimer.time();
}
public Timer.Context getFinalizeCtx() {
if (config.isMetricsOn() && finalizeTimer == null) {
finalizeTimer = createTimer(finalizeTimerName);
}
return finalizeTimer == null ? null : finalizeTimer.time();
}
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs,
HoodieCommitMetadata metadata) {
if (config.isMetricsOn()) {
@@ -119,6 +129,15 @@ public class HoodieMetrics {
}
}
public void updateFinalizeWriteMetrics(long durationInMs, int numFilesFinalized) {
if (config.isMetricsOn()) {
logger.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)",
durationInMs, numFilesFinalized));
registerGauge(getMetricsName("finalize", "duration"), durationInMs);
registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
}
}
@VisibleForTesting
String getMetricsName(String action, String metric) {
return config == null ? null :

View File

@@ -27,6 +27,7 @@ import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
@@ -574,6 +575,41 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return stats;
}
@Override
@SuppressWarnings("unchecked")
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
if (!config.shouldFinalizeWrite()) {
return Optional.empty();
}
List<Tuple2<String, Boolean>> results = jsc.parallelize(writeStatuses, config.getFinalizeParallelism())
.map(writeStatus -> {
Tuple2<String, HoodieWriteStat> writeStatTuple2 = (Tuple2<String, HoodieWriteStat>) writeStatus;
HoodieWriteStat writeStat = writeStatTuple2._2();
final FileSystem fs = FSUtils.getFs();
final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());
if (writeStat.getTempPath() != null) {
final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath());
boolean success;
try {
logger.info("Renaming temporary file: " + tempPath + " to " + finalPath);
success = fs.rename(tempPath, finalPath);
} catch (IOException e) {
throw new HoodieIOException("Failed to rename file: " + tempPath + " to " + finalPath);
}
if (!success) {
throw new HoodieIOException("Failed to rename file: " + tempPath + " to " + finalPath);
}
}
return new Tuple2<>(writeStat.getPath(), true);
}).collect();
return Optional.of(results.size());
}
private static class PartitionCleanStat implements Serializable {
private final String partitionPath;

View File

@@ -250,4 +250,9 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
return allRollbackStats;
}
@Override
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
// do nothing for MOR tables
return Optional.empty();
}
}

View File

@@ -23,6 +23,7 @@ import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
@@ -46,6 +47,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
/**
* Abstract implementation of a HoodieTable
@@ -270,4 +272,12 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
*/
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits)
throws IOException;
/**
* Finalize the written data files
*
* @param writeStatuses List of WriteStatus
* @return number of files finalized
*/
public abstract Optional<Integer> finalizeWrite(JavaSparkContext jsc, List<Tuple2<String, HoodieWriteStat>> writeStatuses);
}

View File

@@ -302,6 +302,100 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "001").count());
}
@Test
public void testUpsertsWithFinalizeWrite() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder()
.withFinalizeWrite(true)
.build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
FileSystem fs = FSUtils.getFs();
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
// check the partition metadata is written out
assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs);
// verify that there is a commit
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTimeline timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath())
.getCommitTimeline();
assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
assertEquals("Latest commit should be 001", newCommitTime,
timeline.lastInstant().get().getTimestamp());
assertEquals("Must contain 200 records",
records.size(),
HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
// Should have 100 records in table (check using Index), all in locations marked at commit
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table)
.collect();
checkTaggedRecords(taggedRecords, "001");
/**
* Write 2 (updates)
*/
newCommitTime = "004";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
LinkedHashMap<HoodieKey, HoodieRecord> recordsMap = new LinkedHashMap<>();
for (HoodieRecord rec : records) {
if (!recordsMap.containsKey(rec.getKey())) {
recordsMap.put(rec.getKey(), rec);
}
}
List<HoodieRecord> dedupedRecords = new ArrayList<>(recordsMap.values());
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
// verify there are now 2 commits
timeline = new HoodieActiveTimeline(fs, metaClient.getMetaPath()).getCommitTimeline();
assertEquals("Expecting two commits.",
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), 2);
assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(),
newCommitTime);
metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig());
// Index should be able to locate all updates in correct locations.
taggedRecords = index.tagLocation(jsc.parallelize(dedupedRecords, 1), table).collect();
checkTaggedRecords(taggedRecords, "004");
// Check the entire dataset has 100 records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals("Must contain 200 records",
200,
HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths).count());
// Check that the incremental consumption from time 000
assertEquals("Incremental consumption from time 002, should give all records in commit 004",
HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "002").count());
assertEquals("Incremental consumption from time 001, should give all records in commit 004",
HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "001").count());
}
@Test
public void testDeletes() throws Exception {

View File

@@ -70,6 +70,12 @@ public class HoodieWriteStat implements Serializable {
*/
private long totalWriteErrors;
/**
* Relative path to the temporary file from the base path.
*/
@Nullable
private String tempPath;
/**
* Following properties are associated only with the result of a Compaction Operation
*/
@@ -198,11 +204,20 @@ public class HoodieWriteStat implements Serializable {
this.totalRecordsToBeUpdate = totalRecordsToBeUpdate;
}
public void setTempPath(String tempPath) {
this.tempPath = tempPath;
}
public String getTempPath() {
return this.tempPath;
}
@Override
public String toString() {
return new StringBuilder()
.append("HoodieWriteStat {")
.append("path=" + path)
.append(", tempPath=" + tempPath)
.append(", prevCommit='" + prevCommit + '\'')
.append(", numWrites=" + numWrites)
.append(", numDeletes=" + numDeletes)

View File

@@ -50,6 +50,7 @@ public class HoodieTableMetaClient implements Serializable {
private final transient static Logger log = LogManager.getLogger(HoodieTableMetaClient.class);
public static String METAFOLDER_NAME = ".hoodie";
public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
private String basePath;
private transient FileSystem fs;

View File

@@ -94,6 +94,10 @@ public class FSUtils {
return String.format("%s_%d_%s.parquet", fileId, taskPartitionId, commitTime);
}
public static String makeTempDataFileName(String partitionPath, String commitTime, int taskPartitionId, String fileId, int stageId, long taskAttemptId) {
return String.format("%s_%s_%d_%s_%d_%d.parquet", partitionPath.replace("/", "-"), fileId, taskPartitionId, commitTime, stageId, taskAttemptId);
}
public static String maskWithoutFileId(String commitTime, int taskPartitionId) {
return String.format("*_%s_%s.parquet", taskPartitionId, commitTime);
}

View File

@@ -43,6 +43,18 @@ public class TestFSUtils {
.equals(fileName + "_" + taskPartitionId + "_" + commitTime + ".parquet"));
}
@Test
public void testMakeTempDataFileName() {
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
String partitionPath = "2017/12/31";
int taskPartitionId = Integer.MAX_VALUE;
int stageId = Integer.MAX_VALUE;
long taskAttemptId = Long.MAX_VALUE;
String fileName = UUID.randomUUID().toString();
assertTrue(FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId, taskAttemptId)
.equals(partitionPath.replace("/", "-") + "_" + fileName + "_" + taskPartitionId + "_" + commitTime + "_" + stageId + "_" + taskAttemptId + ".parquet"));
}
@Test
public void testMaskFileName() {
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());