diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 870a6aae2..b896bfebb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -94,6 +94,14 @@ public class FlinkMergeHandle final String lastDataFileName = FSUtils.makeDataFileName(instantTime, lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension()); final Path path = makeNewFilePath(partitionPath, lastDataFileName); + if (path.equals(oldFilePath)) { + // In some rare cases, the old attempt file is used as the old base file to merge + // because the flink index eagerly records that. + // + // The merge handle has the 'UPSERT' semantics so there is no need to roll over + // and the file can still be used as the merge base file. + return; + } try { if (fs.exists(path)) { LOG.info("Deleting invalid MERGE base file due to task retry: " + lastDataFileName); @@ -104,6 +112,13 @@ public class FlinkMergeHandle } } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName) { + // no need to create marker file for flink merge handle, + // the flink write handle does not rely on MARKER files for + // corrupt files cleaning, see HoodieFlinkCopyOnWriteTable#getInvalidDataPaths for details. + } + @Override protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) { // If the data file already exists, it means the write task write merge data bucket multiple times @@ -119,6 +134,13 @@ public class FlinkMergeHandle try { int rollNumber = 0; while (fs.exists(newFilePath)) { + // in case there is empty file because of task failover attempt. + if (fs.getFileStatus(newFilePath).getLen() <= 0) { + fs.delete(newFilePath, false); + LOG.warn("Delete empty write file for MERGE bucket: " + newFilePath); + break; + } + oldFilePath = newFilePath; // override the old file name rolloverPaths.add(oldFilePath); newFileName = newFileNameWithRollover(rollNumber++);