1
0

[HUDI-4393] Add marker file for target file when flink merge handle rolls over (#6103)

This commit is contained in:
Danny Chan
2022-07-14 16:00:08 +08:00
committed by GitHub
parent aaccc63ad5
commit 05606708fa
3 changed files with 8 additions and 24 deletions

View File

@@ -655,7 +655,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
invalidDataPaths.removeAll(validDataPaths); invalidDataPaths.removeAll(validDataPaths);
if (!invalidDataPaths.isEmpty()) { if (!invalidDataPaths.isEmpty()) {
LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths); LOG.info("Removing duplicate data files created due to task retries before committing. Paths=" + invalidDataPaths);
Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream() Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), new Path(basePath, dp).toString())) .map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(), new Path(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey)); .collect(Collectors.groupingBy(Pair::getKey));

View File

@@ -167,7 +167,7 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
try { try {
fs.rename(newFilePath, oldFilePath); fs.rename(newFilePath, oldFilePath);
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Error while renaming the temporary roll file: " throw new HoodieIOException("Error while renaming the temporary rollover file: "
+ newFilePath + " to old base file name: " + oldFilePath, e); + newFilePath + " to old base file name: " + oldFilePath, e);
} }
} }

View File

@@ -43,7 +43,7 @@ import java.util.List;
/** /**
* A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers). * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers).
* *
* <p>For a new data buffer, it initialize and set up the next file path to write, * <p>For a new data buffer, it initializes and set up the next file path to write,
* and closes the file path when the data buffer write finish. When next data buffer * and closes the file path when the data buffer write finish. When next data buffer
* write starts, it rolls over to another new file. If all the data buffers write finish * write starts, it rolls over to another new file. If all the data buffers write finish
* for a checkpoint round, it renames the last new file path as the desired file name * for a checkpoint round, it renames the last new file path as the desired file name
@@ -143,8 +143,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
break; break;
} }
oldFilePath = newFilePath; // override the old file name rolloverPaths.add(newFilePath);
rolloverPaths.add(oldFilePath);
newFileName = newFileNameWithRollover(rollNumber++); newFileName = newFileNameWithRollover(rollNumber++);
newFilePath = makeNewFilePath(partitionPath, newFileName); newFilePath = makeNewFilePath(partitionPath, newFileName);
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath); LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);
@@ -162,13 +161,6 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
this.fileId, hoodieTable.getBaseFileExtension()); this.fileId, hoodieTable.getBaseFileExtension());
} }
@Override
protected void setWriteStatusPath() {
// if there was rollover, should set up the path as the initial new file path.
Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
writeStatus.getStat().setPath(new Path(config.getBasePath()), path);
}
@Override @Override
public List<WriteStatus> close() { public List<WriteStatus> close() {
try { try {
@@ -188,27 +180,19 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
public void finalizeWrite() { public void finalizeWrite() {
// The file visibility should be kept by the configured ConsistencyGuard instance. // The file visibility should be kept by the configured ConsistencyGuard instance.
rolloverPaths.add(newFilePath); if (rolloverPaths.size() == 0) {
if (rolloverPaths.size() == 1) {
// only one flush action, no need to roll over // only one flush action, no need to roll over
return; return;
} }
for (int i = 0; i < rolloverPaths.size() - 1; i++) { for (Path path : rolloverPaths) {
Path path = rolloverPaths.get(i);
try { try {
fs.delete(path, false); fs.delete(path, false);
LOG.info("Delete the rollover data file: " + path + " success!");
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("Error when clean the temporary roll file: " + path, e); throw new HoodieIOException("Error when clean the temporary rollover data file: " + path, e);
} }
} }
final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1);
final Path desiredPath = rolloverPaths.get(0);
try {
fs.rename(lastPath, desiredPath);
} catch (IOException e) {
throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e);
}
} }
@Override @Override