[HUDI-1986] Skip creating marker files for flink merge handle (#3047)
This commit is contained in:
@@ -94,6 +94,14 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
final String lastDataFileName = FSUtils.makeDataFileName(instantTime,
|
final String lastDataFileName = FSUtils.makeDataFileName(instantTime,
|
||||||
lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension());
|
lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension());
|
||||||
final Path path = makeNewFilePath(partitionPath, lastDataFileName);
|
final Path path = makeNewFilePath(partitionPath, lastDataFileName);
|
||||||
|
if (path.equals(oldFilePath)) {
|
||||||
|
// In some rare cases, the old attempt file is used as the old base file to merge
|
||||||
|
// because the flink index eagerly records that.
|
||||||
|
//
|
||||||
|
// The merge handle has the 'UPSERT' semantics so there is no need to roll over
|
||||||
|
// and the file can still be used as the merge base file.
|
||||||
|
return;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
if (fs.exists(path)) {
|
if (fs.exists(path)) {
|
||||||
LOG.info("Deleting invalid MERGE base file due to task retry: " + lastDataFileName);
|
LOG.info("Deleting invalid MERGE base file due to task retry: " + lastDataFileName);
|
||||||
@@ -104,6 +112,13 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createMarkerFile(String partitionPath, String dataFileName) {
|
||||||
|
// no need to create marker file for flink merge handle,
|
||||||
|
// the flink write handle does not rely on MARKER files for
|
||||||
|
// corrupt files cleaning, see HoodieFlinkCopyOnWriteTable#getInvalidDataPaths for details.
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
|
protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
|
||||||
// If the data file already exists, it means the write task write merge data bucket multiple times
|
// If the data file already exists, it means the write task write merge data bucket multiple times
|
||||||
@@ -119,6 +134,13 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
try {
|
try {
|
||||||
int rollNumber = 0;
|
int rollNumber = 0;
|
||||||
while (fs.exists(newFilePath)) {
|
while (fs.exists(newFilePath)) {
|
||||||
|
// in case there is empty file because of task failover attempt.
|
||||||
|
if (fs.getFileStatus(newFilePath).getLen() <= 0) {
|
||||||
|
fs.delete(newFilePath, false);
|
||||||
|
LOG.warn("Delete empty write file for MERGE bucket: " + newFilePath);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
oldFilePath = newFilePath; // override the old file name
|
oldFilePath = newFilePath; // override the old file name
|
||||||
rolloverPaths.add(oldFilePath);
|
rolloverPaths.add(oldFilePath);
|
||||||
newFileName = newFileNameWithRollover(rollNumber++);
|
newFileName = newFileNameWithRollover(rollNumber++);
|
||||||
|
|||||||
Reference in New Issue
Block a user