1
0

Incorporating code review feedback for finalizeWrite for COW #4

This commit is contained in:
Jian Xu
2018-01-30 11:18:00 -08:00
committed by vinoth chandar
parent 3736243fb3
commit 15e669c60c
6 changed files with 98 additions and 126 deletions

View File

@@ -58,10 +58,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "hoodie.copyonwrite.use.temp.folder";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "false";
private static final String HOODIE_MERGEHANDLE_USE_TEMP_FOLDER = "hoodie.mergehandle.use.temp.folder";
private static final String DEFAULT_HOODIE_MERGEHANDLE_USE_TEMP_FOLDER = "false";
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE = "hoodie.copyonwrite.use.temp.folder.for.create";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE = "false";
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "hoodie.copyonwrite.use.temp.folder.for.merge";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false";
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
@@ -120,12 +120,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
}
public boolean shouldUseTempFolderForCopyOnWrite() {
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER));
public boolean shouldUseTempFolderForCopyOnWriteForCreate() {
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE));
}
public boolean shouldUseTempFolderForMergeHandle() {
return Boolean.parseBoolean(props.getProperty(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER));
public boolean shouldUseTempFolderForCopyOnWriteForMerge() {
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE));
}
public boolean shouldUseTempFolderForCopyOnWrite() {
return shouldUseTempFolderForCopyOnWriteForCreate() ||
shouldUseTempFolderForCopyOnWriteForMerge();
}
public int getFinalizeWriteParallelism() {
@@ -403,13 +408,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withUseTempFolderCopyOnWrite(boolean shouldUseTempFolderCopyOnWrite) {
props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER, String.valueOf(shouldUseTempFolderCopyOnWrite));
public Builder withUseTempFolderCopyOnWriteForCreate(
boolean shouldUseTempFolderCopyOnWriteForCreate) {
props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE, String.valueOf
(shouldUseTempFolderCopyOnWriteForCreate));
return this;
}
public Builder withUseTempFolderMergeHandle(boolean shouldUseTempFolderMergeHandle) {
props.setProperty(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER, String.valueOf(shouldUseTempFolderMergeHandle));
public Builder withUseTempFolderCopyOnWriteForMerge(
boolean shouldUseTempFolderCopyOnWriteForMerge) {
props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE, String.valueOf
(shouldUseTempFolderCopyOnWriteForMerge));
return this;
}
@@ -441,10 +450,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER);
setDefaultOnCondition(props, !props.containsKey(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER),
HOODIE_MERGEHANDLE_USE_TEMP_FOLDER, DEFAULT_HOODIE_MERGEHANDLE_USE_TEMP_FOLDER);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);

View File

@@ -58,8 +58,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
final int sparkPartitionId = TaskContext.getPartitionId();
this.path = makeNewPath(partitionPath, sparkPartitionId, status.getFileId());
if (config.shouldUseTempFolderForCopyOnWrite()) {
this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(), TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
if (config.shouldUseTempFolderForCopyOnWriteForCreate()) {
this.tempPath = makeTempPath(partitionPath, sparkPartitionId, status.getFileId(),
TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
}
try {
@@ -144,9 +145,6 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
private Path getStorageWriterPath() {
// Use tempPath for storage writer if possible
if (this.tempPath != null) {
return this.tempPath;
}
return this.path;
return (this.tempPath == null) ? this.path : this.tempPath;
}
}

View File

@@ -101,7 +101,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
String relativePath = new Path(record.getPartitionPath() + "/" + FSUtils
.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
if (config.shouldUseTempFolderForCopyOnWrite() && config.shouldUseTempFolderForMergeHandle()) {
if (config.shouldUseTempFolderForCopyOnWriteForMerge()) {
this.tempPath = makeTempPath(record.getPartitionPath(), TaskContext.getPartitionId(), fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
}
@@ -111,7 +111,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
}
logger.info(String.format("Merging new data into oldPath %s, as newPath %s",
oldFilePath.toString(), getNewFilePath().toString()));
oldFilePath.toString(), getStorageWriterPath().toString()));
// file name is same for all records, in this bunch
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(record.getPartitionPath());
@@ -124,7 +124,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
}
// Create the writer for writing the new version file
storageWriter = HoodieStorageWriterFactory
.getStorageWriter(commitTime, getNewFilePath(), hoodieTable, config, schema);
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema);
} catch (Exception e) {
logger.error("Error in update task at commit " + commitTime, e);
@@ -190,18 +190,18 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
if (copyOldRecord) {
// this should work as it is, since this is an existing record
String errMsg = "Failed to merge old record into new file for key " + key + " from old file "
+ getOldFilePath() + " to new file " + getNewFilePath();
+ getOldFilePath() + " to new file " + getStorageWriterPath();
try {
storageWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
logger.error(
"Schema mismatch when rewriting old record " + oldRecord + " from file "
+ getOldFilePath() + " to file " + getNewFilePath() + " with schema " + schema
+ getOldFilePath() + " to file " + getStorageWriterPath() + " with schema " + schema
.toString(true));
throw new HoodieUpsertException(errMsg, e);
} catch (IOException e) {
logger.error("Failed to merge old record into new file for key " + key + " from old file "
+ getOldFilePath() + " to new file " + getNewFilePath(), e);
+ getOldFilePath() + " to new file " + getStorageWriterPath(), e);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
@@ -223,7 +223,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
storageWriter.close();
}
writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, getNewFilePath()));
writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
writeStatus.getStat().setNumWrites(recordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
@@ -237,12 +237,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
return oldFilePath;
}
private Path getNewFilePath() {
private Path getStorageWriterPath() {
// Use tempPath for storage writer if possible
if (this.tempPath != null) {
return this.tempPath;
}
return this.newFilePath;
return (this.tempPath == null) ? this.newFilePath : this.tempPath;
}
public WriteStatus getWriteStatus() {

View File

@@ -579,6 +579,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return stats;
}
/**
* Finalize the written data files
*
* @param writeStatuses List of WriteStatus
* @return number of files finalized
*/
@Override
@SuppressWarnings("unchecked")
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
@@ -618,6 +624,10 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return Optional.of(results.size());
}
/**
* Clean temporary data files that are produced from previous failed commit or retried spark
* stages.
*/
private void cleanTemporaryDataFiles(JavaSparkContext jsc) {
if (!config.shouldUseTempFolderForCopyOnWrite()) {
return;

View File

@@ -280,5 +280,4 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* @return number of files finalized
*/
public abstract Optional<Integer> finalizeWrite(JavaSparkContext jsc, List<Tuple2<String, HoodieWriteStat>> writeStatuses);
}