Incorporating code review feedback for finalizeWrite for COW
This commit is contained in:
@@ -419,9 +419,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||||
|
|
||||||
List<Tuple2<String, HoodieWriteStat>> stats = writeStatuses
|
List<Tuple2<String, HoodieWriteStat>> stats = writeStatuses
|
||||||
.mapToPair((PairFunction<WriteStatus, String, HoodieWriteStat>) writeStatus ->
|
.mapToPair((PairFunction<WriteStatus, String, HoodieWriteStat>) writeStatus ->
|
||||||
new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()))
|
new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat()))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
||||||
for (Tuple2<String, HoodieWriteStat> stat : stats) {
|
for (Tuple2<String, HoodieWriteStat> stat : stats) {
|
||||||
@@ -434,14 +434,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
if (finalizeCtx != null && result.isPresent()) {
|
if (finalizeCtx != null && result.isPresent()) {
|
||||||
Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
|
Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
|
||||||
durationInMs.ifPresent(duration -> {
|
durationInMs.ifPresent(duration -> {
|
||||||
logger.info("Finalize write elapsed time (Seconds): " + duration / 1000);
|
logger.info("Finalize write elapsed time (milliseconds): " + duration);
|
||||||
metrics.updateFinalizeWriteMetrics(duration, result.get());
|
metrics.updateFinalizeWriteMetrics(duration, result.get());
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean temp files
|
// Clean temp files
|
||||||
cleanTemporaryDataFiles();
|
table.cleanTemporaryDataFiles(jsc);
|
||||||
|
|
||||||
// add in extra metadata
|
// add in extra metadata
|
||||||
if (extraMetadata.isPresent()) {
|
if (extraMetadata.isPresent()) {
|
||||||
@@ -699,7 +699,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
});
|
});
|
||||||
|
|
||||||
// clean data files in temporary folder
|
// clean data files in temporary folder
|
||||||
cleanTemporaryDataFiles();
|
table.cleanTemporaryDataFiles(jsc);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (commitTimeline.empty() && inflightTimeline.empty()) {
|
if (commitTimeline.empty() && inflightTimeline.empty()) {
|
||||||
@@ -763,35 +763,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanTemporaryDataFiles() {
|
|
||||||
if (!config.shouldFinalizeWrite()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
|
|
||||||
try {
|
|
||||||
if (!fs.exists(temporaryFolder)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder));
|
|
||||||
List<Tuple2<String, Boolean>> results = jsc.parallelize(fileStatusesList, config.getFinalizeParallelism())
|
|
||||||
.map(fileStatus -> {
|
|
||||||
FileSystem fs1 = FSUtils.getFs();
|
|
||||||
boolean success = fs1.delete(fileStatus.getPath(), false);
|
|
||||||
logger.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success);
|
|
||||||
return new Tuple2<>(fileStatus.getPath().toString(), success);
|
|
||||||
}).collect();
|
|
||||||
|
|
||||||
for (Tuple2<String, Boolean> result : results) {
|
|
||||||
if (!result._2()) {
|
|
||||||
logger.info("Failed to delete file: " + result._1());
|
|
||||||
throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Failed to clean data files in temporary folder: " + temporaryFolder);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* Releases any resources used by the client.
|
* Releases any resources used by the client.
|
||||||
*/
|
*/
|
||||||
@@ -877,18 +848,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
String commitActionType = table.getCommitActionType();
|
String commitActionType = table.getCommitActionType();
|
||||||
activeTimeline.createInflight(
|
activeTimeline.createInflight(
|
||||||
new HoodieInstant(true, commitActionType, commitTime));
|
new HoodieInstant(true, commitActionType, commitTime));
|
||||||
|
table.initializeFinalizeWrite();
|
||||||
// create temporary folder if needed
|
|
||||||
if (config.shouldFinalizeWrite()) {
|
|
||||||
final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
|
|
||||||
try {
|
|
||||||
if (!fs.exists(temporaryFolder)) {
|
|
||||||
fs.mkdirs(temporaryFolder);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Failed to create temporary folder: " + temporaryFolder);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -58,10 +58,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
|
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
|
||||||
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
|
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
|
||||||
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
|
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
|
||||||
private static final String HOODIE_FINALIZE_WRITE_BEFORE_COMMIT = "hoodie.finalize.write.before.commit";
|
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "hoodie.copyonwrite.use.temp.folder";
|
||||||
private static final String DEFAULT_HOODIE_FINALIZE_WRITE_BEFORE_COMMIT = "false";
|
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "false";
|
||||||
private static final String FINALIZE_PARALLELISM = "hoodie.finalize.parallelism";
|
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
|
||||||
private static final String DEFAULT_FINALIZE_PARALLELISM = "5";
|
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = "5";
|
||||||
|
|
||||||
private HoodieWriteConfig(Properties props) {
|
private HoodieWriteConfig(Properties props) {
|
||||||
super(props);
|
super(props);
|
||||||
@@ -118,12 +118,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
|
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean shouldFinalizeWrite() {
|
public boolean shouldUseTempFolderForCopyOnWrite() {
|
||||||
return Boolean.parseBoolean(props.getProperty(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT));
|
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER));
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getFinalizeParallelism() {
|
public int getFinalizeWriteParallelism() {
|
||||||
return Integer.parseInt(props.getProperty(FINALIZE_PARALLELISM));
|
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -397,13 +397,13 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withFinalizeWrite(boolean shouldFinalizeWrite) {
|
public Builder withUseTempFolderCopyOnWrite(boolean shouldUseTempFolderCopyOnWrite) {
|
||||||
props.setProperty(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT, String.valueOf(shouldFinalizeWrite));
|
props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER, String.valueOf(shouldUseTempFolderCopyOnWrite));
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withFinalizeParallelism(int parallelism) {
|
public Builder withFinalizeWriteParallelism(int parallelism) {
|
||||||
props.setProperty(FINALIZE_PARALLELISM, String.valueOf(parallelism));
|
props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism));
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -430,10 +430,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
|
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
|
||||||
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
|
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
|
||||||
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
|
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
|
||||||
setDefaultOnCondition(props, !props.containsKey(HOODIE_FINALIZE_WRITE_BEFORE_COMMIT),
|
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER),
|
||||||
HOODIE_FINALIZE_WRITE_BEFORE_COMMIT, DEFAULT_HOODIE_FINALIZE_WRITE_BEFORE_COMMIT);
|
HOODIE_COPYONWRITE_USE_TEMP_FOLDER, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER);
|
||||||
setDefaultOnCondition(props, !props.containsKey(FINALIZE_PARALLELISM),
|
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
|
||||||
FINALIZE_PARALLELISM, DEFAULT_FINALIZE_PARALLELISM);
|
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
|
||||||
|
|
||||||
// Make sure the props is propagated
|
// Make sure the props is propagated
|
||||||
setDefaultOnCondition(props, !isIndexConfigSet,
|
setDefaultOnCondition(props, !isIndexConfigSet,
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
private final WriteStatus status;
|
private final WriteStatus status;
|
||||||
private final HoodieStorageWriter<IndexedRecord> storageWriter;
|
private final HoodieStorageWriter<IndexedRecord> storageWriter;
|
||||||
private final Path path;
|
private final Path path;
|
||||||
private final Path tempPath;
|
private Path tempPath = null;
|
||||||
private long recordsWritten = 0;
|
private long recordsWritten = 0;
|
||||||
private long recordsDeleted = 0;
|
private long recordsDeleted = 0;
|
||||||
|
|
||||||
@@ -58,10 +58,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
|
|
||||||
final int sparkPartitionId = TaskContext.getPartitionId();
|
final int sparkPartitionId = TaskContext.getPartitionId();
|
||||||
this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId());
|
this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId());
|
||||||
if (config.shouldFinalizeWrite()) {
|
if (config.shouldUseTempFolderForCopyOnWrite()) {
|
||||||
this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
|
this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
|
||||||
} else {
|
|
||||||
this.tempPath = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ import java.io.IOException;
|
|||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
@@ -575,14 +576,33 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initializeFinalizeWrite() {
|
||||||
|
if (!config.shouldUseTempFolderForCopyOnWrite()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// create temporary folder if needed
|
||||||
|
final FileSystem fs = FSUtils.getFs();
|
||||||
|
final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
|
||||||
|
try {
|
||||||
|
if (!fs.exists(temporaryFolder)) {
|
||||||
|
fs.mkdirs(temporaryFolder);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Failed to create temporary folder: " + temporaryFolder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
|
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
|
||||||
if (!config.shouldFinalizeWrite()) {
|
if (!config.shouldUseTempFolderForCopyOnWrite()) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Tuple2<String, Boolean>> results = jsc.parallelize(writeStatuses, config.getFinalizeParallelism())
|
// This is to rename each data file from temporary path to its final location
|
||||||
|
List<Tuple2<String, Boolean>> results = jsc.parallelize(writeStatuses, config.getFinalizeWriteParallelism())
|
||||||
.map(writeStatus -> {
|
.map(writeStatus -> {
|
||||||
Tuple2<String, HoodieWriteStat> writeStatTuple2 = (Tuple2<String, HoodieWriteStat>) writeStatus;
|
Tuple2<String, HoodieWriteStat> writeStatTuple2 = (Tuple2<String, HoodieWriteStat>) writeStatus;
|
||||||
HoodieWriteStat writeStat = writeStatTuple2._2();
|
HoodieWriteStat writeStat = writeStatTuple2._2();
|
||||||
@@ -610,6 +630,44 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
return Optional.of(results.size());
|
return Optional.of(results.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanTemporaryDataFiles(JavaSparkContext jsc) {
|
||||||
|
if (!config.shouldUseTempFolderForCopyOnWrite()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final FileSystem fs = FSUtils.getFs();
|
||||||
|
final Path temporaryFolder = new Path(config.getBasePath(),
|
||||||
|
HoodieTableMetaClient.TEMPFOLDER_NAME);
|
||||||
|
try {
|
||||||
|
if (!fs.exists(temporaryFolder)) {
|
||||||
|
logger.info("Temporary folder does not exist: " + temporaryFolder);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder));
|
||||||
|
List<Tuple2<String, Boolean>> results = jsc
|
||||||
|
.parallelize(fileStatusesList, config.getFinalizeWriteParallelism())
|
||||||
|
.map(fileStatus -> {
|
||||||
|
FileSystem fs1 = FSUtils.getFs();
|
||||||
|
boolean success = fs1.delete(fileStatus.getPath(), false);
|
||||||
|
logger.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t"
|
||||||
|
+ success);
|
||||||
|
return new Tuple2<>(fileStatus.getPath().toString(), success);
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
for (Tuple2<String, Boolean> result : results) {
|
||||||
|
if (!result._2()) {
|
||||||
|
logger.info("Failed to delete file: " + result._1());
|
||||||
|
throw new HoodieIOException(
|
||||||
|
"Failed to delete file in temporary folder: " + result._1());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException(
|
||||||
|
"Failed to clean data files in temporary folder: " + temporaryFolder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class PartitionCleanStat implements Serializable {
|
private static class PartitionCleanStat implements Serializable {
|
||||||
|
|
||||||
private final String partitionPath;
|
private final String partitionPath;
|
||||||
|
|||||||
@@ -250,9 +250,19 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
return allRollbackStats;
|
return allRollbackStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initializeFinalizeWrite() {
|
||||||
|
// do nothing for MOR tables
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
|
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
|
||||||
// do nothing for MOR tables
|
// do nothing for MOR tables
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanTemporaryDataFiles(JavaSparkContext jsc) {
|
||||||
|
// do nothing for MOR tables
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -273,6 +273,11 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits)
|
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize resources needed for finalize write.
|
||||||
|
*/
|
||||||
|
public abstract void initializeFinalizeWrite();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finalize the written data files
|
* Finalize the written data files
|
||||||
*
|
*
|
||||||
@@ -280,4 +285,9 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
* @return number of files finalized
|
* @return number of files finalized
|
||||||
*/
|
*/
|
||||||
public abstract Optional<Integer> finalizeWrite(JavaSparkContext jsc, List<Tuple2<String, HoodieWriteStat>> writeStatuses);
|
public abstract Optional<Integer> finalizeWrite(JavaSparkContext jsc, List<Tuple2<String, HoodieWriteStat>> writeStatuses);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clean temporary data files after data files are finalized or commit is rolled back.
|
||||||
|
*/
|
||||||
|
public abstract void cleanTemporaryDataFiles(JavaSparkContext jsc);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -305,7 +305,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
|
|||||||
@Test
|
@Test
|
||||||
public void testUpsertsWithFinalizeWrite() throws Exception {
|
public void testUpsertsWithFinalizeWrite() throws Exception {
|
||||||
HoodieWriteConfig cfg = getConfigBuilder()
|
HoodieWriteConfig cfg = getConfigBuilder()
|
||||||
.withFinalizeWrite(true)
|
.withUseTempFolderCopyOnWrite(true)
|
||||||
.build();
|
.build();
|
||||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||||
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
|
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
|
||||||
|
|||||||
Reference in New Issue
Block a user