1
0

[HUDI-1804] Continue to write when Flink write task restart because of container killing (#2843)

The `FlinkMergeHande` creates a marker file under the metadata path
each time it initializes, when a write task restarts from killing, it
tries to create the existing file and reports error.

To solve this problem, skip the creation and use the original data file
as base file to merge.
This commit is contained in:
Danny Chan
2021-04-19 19:43:41 +08:00
committed by GitHub
parent f7b6b68063
commit dab5114f16
3 changed files with 99 additions and 27 deletions

View File

@@ -20,13 +20,14 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
@@ -35,7 +36,6 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches).
@@ -68,13 +68,23 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
taskContextSupplier);
}
/**
* Called by the compactor code path.
*/
public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, taskContextSupplier);
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
boolean created = markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
if (!created) {
// If the marker file already exists, that means the write task
// was pulled up again with same data file name, removes the legacy
// data file first.
try {
if (fs.exists(path)) {
fs.delete(path, false);
LOG.warn("Legacy data file: " + path + " delete success");
}
} catch (IOException e) {
throw new HoodieException("Error while deleting legacy data file: " + path, e);
}
}
}
/**
@@ -109,7 +119,7 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
}
@Override
protected long computeFileSizeInBytes() throws IOException {
protected long computeFileSizeInBytes() {
return fileWriter.getBytesWritten();
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -70,13 +71,16 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
/**
* Records the rolled over file paths.
*/
private final List<Path> rolloverPaths;
private List<Path> rolloverPaths;
public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier);
rolloverPaths = new ArrayList<>();
if (rolloverPaths == null) {
// #createMarkerFile may already initialize it already
rolloverPaths = new ArrayList<>();
}
}
/**
@@ -104,6 +108,25 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
return false;
}
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
boolean created = markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
if (!created) {
// If the marker file already exists, that means the write task
// was pulled up again with same data file name, performs rolling over action here:
// use the new file path as the base file path (file1),
// and generates new file path with roll over number (file2).
// the incremental data set would merge into the file2 instead of file1.
//
// When the task do finalization in #finishWrite, the intermediate files would be cleaned.
oldFilePath = newFilePath;
rolloverPaths = new ArrayList<>();
rolloverPaths.add(oldFilePath);
newFilePath = makeNewFilePathWithRollover();
}
}
/**
*
* Rolls over the write handle to prepare for the next batch write.
@@ -132,11 +155,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
rolloverPaths.add(newFilePath);
oldFilePath = newFilePath;
// Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
String newFileName = generatesDataFileNameWithRollover();
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ newFileName).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
newFilePath = makeNewFilePathWithRollover();
try {
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
@@ -148,6 +167,16 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
newFilePath.toString()));
}
/**
* Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
*/
private Path makeNewFilePathWithRollover() {
String newFileName = generatesDataFileNameWithRollover();
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ newFileName).toString();
return new Path(config.getBasePath(), relativePath);
}
public void finishWrite() {
// The file visibility should be kept by the configured ConsistencyGuard instance.
rolloverPaths.add(newFilePath);