From 6aec9d754f4a091136c2dba9643b27ae66db255b Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Sat, 16 Jul 2022 20:46:23 +0800 Subject: [PATCH] [HUDI-4408] Reuse old rollover file as base file for flink merge handle (#6120) --- .../org/apache/hudi/io/FlinkMergeHandle.java | 22 +++++++++++++++++-- .../apache/hudi/sink/StreamWriteFunction.java | 12 +--------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 99f111c82..69121a9a0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -143,7 +143,13 @@ public class FlinkMergeHandle break; } - rolloverPaths.add(newFilePath); + // Override the old file name, + // In rare cases, when a checkpoint was aborted and the instant time + // is reused, the merge handle generates a new file name + // with the reused instant time of last checkpoint, which is duplicate, + // use the same name file as new base file in case data loss. + oldFilePath = newFilePath; + rolloverPaths.add(oldFilePath); newFileName = newFileNameWithRollover(rollNumber++); newFilePath = makeNewFilePath(partitionPath, newFileName); LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath); @@ -161,6 +167,12 @@ public class FlinkMergeHandle this.fileId, hoodieTable.getBaseFileExtension()); } + @Override + protected void setWriteStatusPath() { + // if there was rollover, should set up the path as the initial new file path. + writeStatus.getStat().setPath(new Path(config.getBasePath()), getWritePath()); + } + @Override public List close() { try { @@ -193,6 +205,12 @@ public class FlinkMergeHandle throw new HoodieIOException("Error when clean the temporary rollover data file: " + path, e); } } + final Path desiredPath = rolloverPaths.get(0); + try { + fs.rename(newFilePath, desiredPath); + } catch (IOException e) { + throw new HoodieIOException("Error when rename the temporary roll file: " + newFilePath + " to: " + desiredPath, e); + } } @Override @@ -216,6 +234,6 @@ public class FlinkMergeHandle @Override public Path getWritePath() { - return newFilePath; + return rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath; } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index bbaba0414..2748af529 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -408,16 +408,6 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); } - private void cleanWriteHandles() { - if (freshInstant(currentInstant)) { - // In rare cases, when a checkpoint was aborted and the instant time - // is reused, the merge handle generates a new file name - // with the reused instant time of last checkpoint, the write handles - // should be kept and reused in case data loss. - this.writeClient.cleanHandles(); - } - } - @SuppressWarnings("unchecked, rawtypes") private boolean flushBucket(DataBucket bucket) { String instant = instantToWrite(true); @@ -489,7 +479,7 @@ public class StreamWriteFunction extends AbstractStreamWriteFunction { this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); - cleanWriteHandles(); + this.writeClient.cleanHandles(); this.writeStatuses.addAll(writeStatus); // blocks flushing until the coordinator starts a new instant this.confirming = true;