1
0

Add finalizeWrite support for HoodieMergeHandle

This commit is contained in:
Jian Xu
2018-01-18 11:29:10 -08:00
committed by vinoth chandar
parent acae6586f3
commit 363e35bb0f
4 changed files with 33 additions and 12 deletions

View File

@@ -60,6 +60,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "hoodie.copyonwrite.use.temp.folder";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER = "false";
private static final String HOODIE_MERGEHANDLE_USE_TEMP_FOLDER = "hoodie.mergehandle.use.temp.folder";
private static final String DEFAULT_HOODIE_MERGEHANDLE_USE_TEMP_FOLDER = "false";
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
@@ -122,6 +124,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER));
}
public boolean shouldUseTempFolderForMergeHandle() {
return Boolean.parseBoolean(props.getProperty(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER));
}
public int getFinalizeWriteParallelism() {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}
@@ -402,6 +408,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withUseTempFolderMergeHandle(boolean shouldUseTempFolderMergeHandle) {
props.setProperty(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER, String.valueOf(shouldUseTempFolderMergeHandle));
return this;
}
public Builder withFinalizeWriteParallelism(int parallelism) {
props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism));
return this;
@@ -432,6 +443,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER, DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER);
setDefaultOnCondition(props, !props.containsKey(HOODIE_MERGEHANDLE_USE_TEMP_FOLDER),
HOODIE_MERGEHANDLE_USE_TEMP_FOLDER, DEFAULT_HOODIE_MERGEHANDLE_USE_TEMP_FOLDER);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);

View File

@@ -52,6 +52,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
private TableFileSystemView.ReadOptimizedView fileSystemView;
private Path newFilePath;
private Path oldFilePath;
private Path tempPath = null;
private long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
@@ -100,6 +101,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
String relativePath = new Path(record.getPartitionPath() + "/" + FSUtils
.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
if (config.shouldUseTempFolderForCopyOnWrite() && config.shouldUseTempFolderForMergeHandle()) {
this.tempPath = makeTempPath(record.getPartitionPath(), TaskContext.getPartitionId(), fileId, TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
}
// handle cases of partial failures, for update task
if (fs.exists(newFilePath)) {
@@ -107,12 +111,12 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
}
logger.info(String.format("Merging new data into oldPath %s, as newPath %s",
oldFilePath.toString(), newFilePath.toString()));
oldFilePath.toString(), getNewFilePath().toString()));
// file name is same for all records, in this bunch
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(record.getPartitionPath());
writeStatus.getStat().setFileId(fileId);
writeStatus.getStat().setPath(relativePath);
writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
}
keyToNewRecords.put(record.getRecordKey(), record);
// update the new location of the record, so we know where to find it next
@@ -120,7 +124,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
}
// Create the writer for writing the new version file
storageWriter = HoodieStorageWriterFactory
.getStorageWriter(commitTime, newFilePath, hoodieTable, config, schema);
.getStorageWriter(commitTime, getNewFilePath(), hoodieTable, config, schema);
} catch (Exception e) {
logger.error("Error in update task at commit " + commitTime, e);
@@ -186,18 +190,18 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
if (copyOldRecord) {
// this should work as it is, since this is an existing record
String errMsg = "Failed to merge old record into new file for key " + key + " from old file "
+ getOldFilePath() + " to new file " + newFilePath;
+ getOldFilePath() + " to new file " + getNewFilePath();
try {
storageWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
logger.error(
"Schema mismatch when rewriting old record " + oldRecord + " from file "
+ getOldFilePath() + " to file " + newFilePath + " with schema " + schema
+ getOldFilePath() + " to file " + getNewFilePath() + " with schema " + schema
.toString(true));
throw new HoodieUpsertException(errMsg, e);
} catch (IOException e) {
logger.error("Failed to merge old record into new file for key " + key + " from old file "
+ getOldFilePath() + " to new file " + newFilePath, e);
+ getOldFilePath() + " to new file " + getNewFilePath(), e);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
@@ -219,7 +223,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
storageWriter.close();
}
writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, newFilePath));
writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, getNewFilePath()));
writeStatus.getStat().setNumWrites(recordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
@@ -233,6 +237,14 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
return oldFilePath;
}
private Path getNewFilePath() {
// Use tempPath for storage writer if possible
if (this.tempPath != null) {
return this.tempPath;
}
return this.newFilePath;
}
public WriteStatus getWriteStatus() {
return writeStatus;
}

View File

@@ -255,9 +255,4 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
// do nothing for MOR tables
return Optional.empty();
}
@Override
public void cleanTemporaryDataFiles(JavaSparkContext jsc) {
// do nothing for MOR tables
}
}