diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index b8d5f8933..201b87987 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -72,6 +72,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { this.replication = replication; this.logWriteToken = logWriteToken; this.rolloverLogWriteToken = rolloverLogWriteToken; + addShutDownHook(); Path path = logFile.getPath(); if (fs.exists(path)) { boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme()); @@ -87,6 +88,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { // may still happen if scheme is viewfs. isAppendSupported = false; } else { + /* + * Before throwing an exception, close the outputstream, + * to ensure that the lease on the log file is released. + */ + close(); throw ioe; } } @@ -221,6 +227,24 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { return output.getPos(); } + /** + * Close the output stream when the JVM exits. + */ + private void addShutDownHook() { + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + try { + if (output != null) { + close(); + } + } catch (Exception e) { + LOG.warn("unable to close output stream for log file " + logFile, e); + // fail silently for any sort of exception + } + } + }); + } + private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e) throws IOException, InterruptedException { if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) { @@ -256,7 +280,23 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { throw new HoodieException(e); } } else { - throw new HoodieIOException("Failed to open an append stream ", e); + // When fs.append() has failed and an exception is thrown, by closing the output stream + // we shall force hdfs to release the lease on the log file. When Spark retries this task (with + // new attemptId, say taskId.1) it will be able to acquire lease on the log file (as output stream was + // closed properly by taskId.0). + // + // If close() call were to fail throwing an exception, our best bet is to rollover to a new log file. + try { + close(); + // output stream has been successfully closed and lease on the log file has been released, + // before throwing an exception for the append failure. + throw new HoodieIOException("Failed to append to the output stream ", e); + } catch (Exception ce) { + LOG.warn("Failed to close the output stream for " + fs.getClass().getName() + " on path " + path + + ". Rolling over to a new log file."); + this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); + createNewFile(); + } } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java index f0a490251..c4853cb3a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java @@ -1113,6 +1113,99 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); } + /* + * During a spark stage failure, when the stage is retried, tasks that are part of the previous attempt + * of the stage would continue to run. As a result two different tasks could be performing the same operation. + * When trying to update the log file, only one of the tasks would succeed (one holding lease on the log file). + * + * In order to make progress in this scenario, second task attempting to update the log file would rollover to + * a new version of the log file. As a result, we might end up with two log files with same set of data records + * present in both of them. + * + * Following uint tests mimic this scenario to ensure that the reader can handle merging multiple log files with + * duplicate data. + * + */ + private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2) + throws IOException, URISyntaxException, InterruptedException { + try { + // Write one Data block with same InstantTime (written in same batch) + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + List records = SchemaTestUtil.generateHoodieTestRecords(0, 101); + List records2 = new ArrayList<>(records); + + // Write1 with numRecordsInLog1 records written to log.1 + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + Map header = Maps.newHashMap(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records.subList(0, numRecordsInLog1), header); + writer = writer.appendBlock(dataBlock); + // Get the size of the block + long size = writer.getCurrentSize(); + writer.close(); + + // write2 with numRecordsInLog2 records written to log.2 + Writer writer2 = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build(); + + Map header2 = Maps.newHashMap(); + header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + HoodieAvroDataBlock dataBlock2 = new HoodieAvroDataBlock(records2.subList(0, numRecordsInLog2), header2); + writer2 = writer2.appendBlock(dataBlock2); + // Get the size of the block + writer2.close(); + + // From the two log files generated, read the records + List allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", + HoodieLogFile.DELTA_EXTENSION, "100").map(s -> s.getPath().toString()).collect(Collectors.toList()); + + HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, + "100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); + + assertEquals("We would read 100 records", + Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog()); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt() + throws IOException, URISyntaxException, InterruptedException { + /* + * FIRST_ATTEMPT_FAILED: + * Original task from the stage attempt failed, but subsequent stage retry succeeded. + */ + testAvroLogRecordReaderMergingMultipleLogFiles(77, 100); + } + + @Test + public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt() + throws IOException, URISyntaxException, InterruptedException { + /* + * SECOND_ATTEMPT_FAILED: + * Original task from stage attempt succeeded, but subsequent retry attempt failed. + */ + testAvroLogRecordReaderMergingMultipleLogFiles(100, 66); + } + + @Test + public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts() + throws IOException, URISyntaxException, InterruptedException { + /* + * BOTH_ATTEMPTS_SUCCEEDED: + * Original task from the stage attempt and duplicate task from the stage retry succeeded. + */ + testAvroLogRecordReaderMergingMultipleLogFiles(100, 100); + } + @SuppressWarnings("unchecked") @Test public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {