[HUDI-4255] Make the flink merge and replace handle intermediate file visible (#5866)
This commit is contained in:
@@ -195,8 +195,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
||||
writeStatus.getStat().setFileId(fileId);
|
||||
setWriteStatusPath();
|
||||
|
||||
// Create Marker file
|
||||
createMarkerFile(partitionPath, newFileName);
|
||||
// Create Marker file,
|
||||
// uses name of `newFilePath` instead of `newFileName`
|
||||
// in case the sub-class may roll over the file handle name.
|
||||
createMarkerFile(partitionPath, newFilePath.getName());
|
||||
|
||||
// Create the writer for writing the new version file
|
||||
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
|
||||
|
||||
@@ -137,10 +137,8 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
|
||||
* Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
|
||||
*/
|
||||
protected String newFileNameWithRollover(int rollNumber) {
|
||||
// make the intermediate file as hidden
|
||||
final String fileID = "." + this.fileId;
|
||||
return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber,
|
||||
fileID, hoodieTable.getBaseFileExtension());
|
||||
this.fileId, hoodieTable.getBaseFileExtension());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -158,11 +158,17 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
||||
* Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
|
||||
*/
|
||||
protected String newFileNameWithRollover(int rollNumber) {
|
||||
// make the intermediate file as hidden
|
||||
return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber,
|
||||
this.fileId, hoodieTable.getBaseFileExtension());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setWriteStatusPath() {
|
||||
// if there was rollover, should set up the path as the initial new file path.
|
||||
Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
|
||||
writeStatus.getStat().setPath(new Path(config.getBasePath()), path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> close() {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user