HUDI-117 Close file handle before throwing an exception due to append failure.
Add test cases to handle/verify stage failure scenarios.
This commit is contained in:
committed by
n3nash
parent
c06ec8bfc7
commit
6f34be1b8d
@@ -72,6 +72,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
this.replication = replication;
|
this.replication = replication;
|
||||||
this.logWriteToken = logWriteToken;
|
this.logWriteToken = logWriteToken;
|
||||||
this.rolloverLogWriteToken = rolloverLogWriteToken;
|
this.rolloverLogWriteToken = rolloverLogWriteToken;
|
||||||
|
addShutDownHook();
|
||||||
Path path = logFile.getPath();
|
Path path = logFile.getPath();
|
||||||
if (fs.exists(path)) {
|
if (fs.exists(path)) {
|
||||||
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
|
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
|
||||||
@@ -87,6 +88,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
// may still happen if scheme is viewfs.
|
// may still happen if scheme is viewfs.
|
||||||
isAppendSupported = false;
|
isAppendSupported = false;
|
||||||
} else {
|
} else {
|
||||||
|
/*
|
||||||
|
* Before throwing an exception, close the outputstream,
|
||||||
|
* to ensure that the lease on the log file is released.
|
||||||
|
*/
|
||||||
|
close();
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -221,6 +227,24 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
return output.getPos();
|
return output.getPos();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the output stream when the JVM exits.
|
||||||
|
*/
|
||||||
|
private void addShutDownHook() {
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
if (output != null) {
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("unable to close output stream for log file " + logFile, e);
|
||||||
|
// fail silently for any sort of exception
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e)
|
private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
|
if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
|
||||||
@@ -256,7 +280,23 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
throw new HoodieException(e);
|
throw new HoodieException(e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new HoodieIOException("Failed to open an append stream ", e);
|
// When fs.append() has failed and an exception is thrown, by closing the output stream
|
||||||
|
// we shall force hdfs to release the lease on the log file. When Spark retries this task (with
|
||||||
|
// new attemptId, say taskId.1) it will be able to acquire lease on the log file (as output stream was
|
||||||
|
// closed properly by taskId.0).
|
||||||
|
//
|
||||||
|
// If close() call were to fail throwing an exception, our best bet is to rollover to a new log file.
|
||||||
|
try {
|
||||||
|
close();
|
||||||
|
// output stream has been successfully closed and lease on the log file has been released,
|
||||||
|
// before throwing an exception for the append failure.
|
||||||
|
throw new HoodieIOException("Failed to append to the output stream ", e);
|
||||||
|
} catch (Exception ce) {
|
||||||
|
LOG.warn("Failed to close the output stream for " + fs.getClass().getName() + " on path " + path
|
||||||
|
+ ". Rolling over to a new log file.");
|
||||||
|
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
|
||||||
|
createNewFile();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1113,6 +1113,99 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
|||||||
assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords());
|
assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* During a spark stage failure, when the stage is retried, tasks that are part of the previous attempt
|
||||||
|
* of the stage would continue to run. As a result two different tasks could be performing the same operation.
|
||||||
|
* When trying to update the log file, only one of the tasks would succeed (one holding lease on the log file).
|
||||||
|
*
|
||||||
|
* In order to make progress in this scenario, second task attempting to update the log file would rollover to
|
||||||
|
* a new version of the log file. As a result, we might end up with two log files with same set of data records
|
||||||
|
* present in both of them.
|
||||||
|
*
|
||||||
|
* Following uint tests mimic this scenario to ensure that the reader can handle merging multiple log files with
|
||||||
|
* duplicate data.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2)
|
||||||
|
throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
try {
|
||||||
|
// Write one Data block with same InstantTime (written in same batch)
|
||||||
|
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
||||||
|
List<IndexedRecord> records = SchemaTestUtil.generateHoodieTestRecords(0, 101);
|
||||||
|
List<IndexedRecord> records2 = new ArrayList<>(records);
|
||||||
|
|
||||||
|
// Write1 with numRecordsInLog1 records written to log.1
|
||||||
|
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
|
||||||
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
|
||||||
|
.overBaseCommit("100").withFs(fs).build();
|
||||||
|
|
||||||
|
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||||
|
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records.subList(0, numRecordsInLog1), header);
|
||||||
|
writer = writer.appendBlock(dataBlock);
|
||||||
|
// Get the size of the block
|
||||||
|
long size = writer.getCurrentSize();
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
// write2 with numRecordsInLog2 records written to log.2
|
||||||
|
Writer writer2 = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
|
||||||
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
|
||||||
|
.overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build();
|
||||||
|
|
||||||
|
Map<HoodieLogBlock.HeaderMetadataType, String> header2 = Maps.newHashMap();
|
||||||
|
header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||||
|
header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||||
|
HoodieAvroDataBlock dataBlock2 = new HoodieAvroDataBlock(records2.subList(0, numRecordsInLog2), header2);
|
||||||
|
writer2 = writer2.appendBlock(dataBlock2);
|
||||||
|
// Get the size of the block
|
||||||
|
writer2.close();
|
||||||
|
|
||||||
|
// From the two log files generated, read the records
|
||||||
|
List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
|
||||||
|
HoodieLogFile.DELTA_EXTENSION, "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
|
||||||
|
|
||||||
|
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema,
|
||||||
|
"100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
|
||||||
|
|
||||||
|
assertEquals("We would read 100 records",
|
||||||
|
Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog());
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt()
|
||||||
|
throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
/*
|
||||||
|
* FIRST_ATTEMPT_FAILED:
|
||||||
|
* Original task from the stage attempt failed, but subsequent stage retry succeeded.
|
||||||
|
*/
|
||||||
|
testAvroLogRecordReaderMergingMultipleLogFiles(77, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt()
|
||||||
|
throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
/*
|
||||||
|
* SECOND_ATTEMPT_FAILED:
|
||||||
|
* Original task from stage attempt succeeded, but subsequent retry attempt failed.
|
||||||
|
*/
|
||||||
|
testAvroLogRecordReaderMergingMultipleLogFiles(100, 66);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts()
|
||||||
|
throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
/*
|
||||||
|
* BOTH_ATTEMPTS_SUCCEEDED:
|
||||||
|
* Original task from the stage attempt and duplicate task from the stage retry succeeded.
|
||||||
|
*/
|
||||||
|
testAvroLogRecordReaderMergingMultipleLogFiles(100, 100);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {
|
public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
|||||||
Reference in New Issue
Block a user