[HUDI-4408] Reuse old rollover file as base file for flink merge handle (#6120)
This commit is contained in:
@@ -143,7 +143,13 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
rolloverPaths.add(newFilePath);
|
// Override the old file name,
|
||||||
|
// In rare cases, when a checkpoint was aborted and the instant time
|
||||||
|
// is reused, the merge handle generates a new file name
|
||||||
|
// with the reused instant time of last checkpoint, which is duplicate,
|
||||||
|
// use the same name file as new base file in case data loss.
|
||||||
|
oldFilePath = newFilePath;
|
||||||
|
rolloverPaths.add(oldFilePath);
|
||||||
newFileName = newFileNameWithRollover(rollNumber++);
|
newFileName = newFileNameWithRollover(rollNumber++);
|
||||||
newFilePath = makeNewFilePath(partitionPath, newFileName);
|
newFilePath = makeNewFilePath(partitionPath, newFileName);
|
||||||
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);
|
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);
|
||||||
@@ -161,6 +167,12 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
this.fileId, hoodieTable.getBaseFileExtension());
|
this.fileId, hoodieTable.getBaseFileExtension());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setWriteStatusPath() {
|
||||||
|
// if there was rollover, should set up the path as the initial new file path.
|
||||||
|
writeStatus.getStat().setPath(new Path(config.getBasePath()), getWritePath());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<WriteStatus> close() {
|
public List<WriteStatus> close() {
|
||||||
try {
|
try {
|
||||||
@@ -193,6 +205,12 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
throw new HoodieIOException("Error when clean the temporary rollover data file: " + path, e);
|
throw new HoodieIOException("Error when clean the temporary rollover data file: " + path, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
final Path desiredPath = rolloverPaths.get(0);
|
||||||
|
try {
|
||||||
|
fs.rename(newFilePath, desiredPath);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Error when rename the temporary roll file: " + newFilePath + " to: " + desiredPath, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -216,6 +234,6 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path getWritePath() {
|
public Path getWritePath() {
|
||||||
return newFilePath;
|
return rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -408,16 +408,6 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
|
|||||||
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
|
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanWriteHandles() {
|
|
||||||
if (freshInstant(currentInstant)) {
|
|
||||||
// In rare cases, when a checkpoint was aborted and the instant time
|
|
||||||
// is reused, the merge handle generates a new file name
|
|
||||||
// with the reused instant time of last checkpoint, the write handles
|
|
||||||
// should be kept and reused in case data loss.
|
|
||||||
this.writeClient.cleanHandles();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked, rawtypes")
|
@SuppressWarnings("unchecked, rawtypes")
|
||||||
private boolean flushBucket(DataBucket bucket) {
|
private boolean flushBucket(DataBucket bucket) {
|
||||||
String instant = instantToWrite(true);
|
String instant = instantToWrite(true);
|
||||||
@@ -489,7 +479,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
|
|||||||
this.eventGateway.sendEventToCoordinator(event);
|
this.eventGateway.sendEventToCoordinator(event);
|
||||||
this.buckets.clear();
|
this.buckets.clear();
|
||||||
this.tracer.reset();
|
this.tracer.reset();
|
||||||
cleanWriteHandles();
|
this.writeClient.cleanHandles();
|
||||||
this.writeStatuses.addAll(writeStatus);
|
this.writeStatuses.addAll(writeStatus);
|
||||||
// blocks flushing until the coordinator starts a new instant
|
// blocks flushing until the coordinator starts a new instant
|
||||||
this.confirming = true;
|
this.confirming = true;
|
||||||
|
|||||||
Reference in New Issue
Block a user