From 0811bb38fb61c19cb2759b96c268682a47516efe Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 15 Jun 2022 14:23:23 +0800 Subject: [PATCH] [HUDI-4255] Make the flink merge and replace handle intermediate file visible (#5866) --- .../main/java/org/apache/hudi/io/HoodieMergeHandle.java | 6 ++++-- .../org/apache/hudi/io/FlinkMergeAndReplaceHandle.java | 4 +--- .../main/java/org/apache/hudi/io/FlinkMergeHandle.java | 8 +++++++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index a85df2a23..82c6de576 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -195,8 +195,10 @@ public class HoodieMergeHandle extends H writeStatus.getStat().setFileId(fileId); setWriteStatusPath(); - // Create Marker file - createMarkerFile(partitionPath, newFileName); + // Create Marker file, + // uses name of `newFilePath` instead of `newFileName` + // in case the sub-class may roll over the file handle name. + createMarkerFile(partitionPath, newFilePath.getName()); // Create the writer for writing the new version file fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java index 24da25b20..cf912f620 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java @@ -137,10 +137,8 @@ public class FlinkMergeAndReplaceHandle * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write. */ protected String newFileNameWithRollover(int rollNumber) { - // make the intermediate file as hidden - final String fileID = "." + this.fileId; return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber, - fileID, hoodieTable.getBaseFileExtension()); + this.fileId, hoodieTable.getBaseFileExtension()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index e11177126..1bff89713 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -158,11 +158,17 @@ public class FlinkMergeHandle * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write. */ protected String newFileNameWithRollover(int rollNumber) { - // make the intermediate file as hidden return FSUtils.makeBaseFileName(instantTime, writeToken + "-" + rollNumber, this.fileId, hoodieTable.getBaseFileExtension()); } + @Override + protected void setWriteStatusPath() { + // if there was rollover, should set up the path as the initial new file path. + Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath; + writeStatus.getStat().setPath(new Path(config.getBasePath()), path); + } + @Override public List close() { try {