1
0

[HUDI-840] Avoid blank file created by HoodieLogFormatWriter (#1567)

This commit is contained in:
hongdd
2020-09-29 23:02:15 +08:00
committed by GitHub
parent 20b9b399c9
commit 32c9cad52c
3 changed files with 83 additions and 53 deletions

View File

@@ -54,6 +54,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
private final String logWriteToken; private final String logWriteToken;
private final String rolloverLogWriteToken; private final String rolloverLogWriteToken;
private FSDataOutputStream output; private FSDataOutputStream output;
private boolean closed = false;
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet"; private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";
/** /**
@@ -64,7 +65,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
* @param sizeThreshold * @param sizeThreshold
*/ */
HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold,
String logWriteToken, String rolloverLogWriteToken) throws IOException, InterruptedException { String logWriteToken, String rolloverLogWriteToken) {
this.fs = fs; this.fs = fs;
this.logFile = logFile; this.logFile = logFile;
this.sizeThreshold = sizeThreshold; this.sizeThreshold = sizeThreshold;
@@ -73,40 +74,6 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
this.logWriteToken = logWriteToken; this.logWriteToken = logWriteToken;
this.rolloverLogWriteToken = rolloverLogWriteToken; this.rolloverLogWriteToken = rolloverLogWriteToken;
addShutDownHook(); addShutDownHook();
Path path = logFile.getPath();
if (fs.exists(path)) {
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
if (isAppendSupported) {
LOG.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
LOG.warn("Remote Exception, attempting to handle or recover lease", e);
handleAppendExceptionOrRecoverLease(path, e);
} catch (IOException ioe) {
if (ioe.getMessage().toLowerCase().contains("not supported")) {
// may still happen if scheme is viewfs.
isAppendSupported = false;
} else {
/*
* Before throwing an exception, close the outputstream,
* to ensure that the lease on the log file is released.
*/
close();
throw ioe;
}
}
}
if (!isAppendSupported) {
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
LOG.info("Append not supported.. Rolling over to " + logFile);
createNewFile();
}
} else {
LOG.info(logFile + " does not exist. Create a new file");
// Block size does not matter as we will always manually autoflush
createNewFile();
}
} }
public FileSystem getFs() { public FileSystem getFs() {
@@ -122,16 +89,64 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
return sizeThreshold; return sizeThreshold;
} }
/**
* Lazily opens the output stream if needed for writing.
* @return OutputStream for writing to current log file.
* @throws IOException
* @throws InterruptedException
*/
private FSDataOutputStream getOutputStream() throws IOException, InterruptedException {
if (this.output == null) {
Path path = logFile.getPath();
if (fs.exists(path)) {
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
if (isAppendSupported) {
LOG.info(logFile + " exists. Appending to existing file");
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
LOG.warn("Remote Exception, attempting to handle or recover lease", e);
handleAppendExceptionOrRecoverLease(path, e);
} catch (IOException ioe) {
if (ioe.getMessage().toLowerCase().contains("not supported")) {
// may still happen if scheme is viewfs.
isAppendSupported = false;
} else {
/*
* Before throwing an exception, close the outputstream,
* to ensure that the lease on the log file is released.
*/
close();
throw ioe;
}
}
}
if (!isAppendSupported) {
this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
LOG.info("Append not supported.. Rolling over to " + logFile);
createNewFile();
}
} else {
LOG.info(logFile + " does not exist. Create a new file");
// Block size does not matter as we will always manually autoflush
createNewFile();
}
}
return output;
}
@Override @Override
public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException { public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException {
// Find current version // Find current version
HoodieLogFormat.LogFormatVersion currentLogFormatVersion = HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION); new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
long currentSize = this.output.size();
FSDataOutputStream outputStream = getOutputStream();
long currentSize = outputStream.size();
// 1. Write the magic header for the start of the block // 1. Write the magic header for the start of the block
this.output.write(HoodieLogFormat.MAGIC); outputStream.write(HoodieLogFormat.MAGIC);
// bytes for header // bytes for header
byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader()); byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
@@ -141,27 +156,27 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter()); byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
// 2. Write the total size of the block (excluding Magic) // 2. Write the total size of the block (excluding Magic)
this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length)); outputStream.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));
// 3. Write the version of this log block // 3. Write the version of this log block
this.output.writeInt(currentLogFormatVersion.getVersion()); outputStream.writeInt(currentLogFormatVersion.getVersion());
// 4. Write the block type // 4. Write the block type
this.output.writeInt(block.getBlockType().ordinal()); outputStream.writeInt(block.getBlockType().ordinal());
// 5. Write the headers for the log block // 5. Write the headers for the log block
this.output.write(headerBytes); outputStream.write(headerBytes);
// 6. Write the size of the content block // 6. Write the size of the content block
this.output.writeLong(content.length); outputStream.writeLong(content.length);
// 7. Write the contents of the data block // 7. Write the contents of the data block
this.output.write(content); outputStream.write(content);
// 8. Write the footers for the log block // 8. Write the footers for the log block
this.output.write(footerBytes); outputStream.write(footerBytes);
// 9. Write the total size of the log block (including magic) which is everything written // 9. Write the total size of the log block (including magic) which is everything written
// until now (for reverse pointer) // until now (for reverse pointer)
// Update: this information is now used in determining if a block is corrupt by comparing to the // Update: this information is now used in determining if a block is corrupt by comparing to the
// block size in header. This change assumes that the block size will be the last data written // block size in header. This change assumes that the block size will be the last data written
// to a block. Read will break if any data is written past this point for a block. // to a block. Read will break if any data is written past this point for a block.
this.output.writeLong(this.output.size() - currentSize); outputStream.writeLong(outputStream.size() - currentSize);
// Flush every block to disk // Flush every block to disk
flush(); flush();
@@ -207,9 +222,12 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
flush(); if (output != null) {
output.close(); flush();
output = null; output.close();
output = null;
closed = true;
}
} }
private void flush() throws IOException { private void flush() throws IOException {
@@ -224,9 +242,13 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
@Override @Override
public long getCurrentSize() throws IOException { public long getCurrentSize() throws IOException {
if (output == null) { if (closed) {
throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already"); throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already");
} }
if (output == null) {
return 0;
}
return output.getPos(); return output.getPos();
} }
@@ -302,5 +324,4 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
} }
} }
} }
} }

View File

@@ -145,6 +145,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertEquals(size, fs.getFileStatus(writer.getLogFile().getPath()).getLen(), assertEquals(size, fs.getFileStatus(writer.getLogFile().getPath()).getLen(),
"Write should be auto-flushed. The size reported by FileStatus and the writer should match"); "Write should be auto-flushed. The size reported by FileStatus and the writer should match");
writer.close(); writer.close();
} }
@ParameterizedTest @ParameterizedTest
@@ -174,6 +175,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
assertEquals(0, writer.getCurrentSize(), "This should be a new log file and hence size should be 0"); assertEquals(0, writer.getCurrentSize(), "This should be a new log file and hence size should be 0");
assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be rolled to 2"); assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be rolled to 2");
Path logFilePath = writer.getLogFile().getPath();
assertFalse(fs.exists(logFilePath), "Path (" + logFilePath + ") must not exist");
writer.close(); writer.close();
} }
@@ -216,16 +219,16 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
} }
Writer writer = builder1.build(); Writer writer = builder1.build();
Writer writer2 = builder2.build();
HoodieLogFile logFile1 = writer.getLogFile();
HoodieLogFile logFile2 = writer2.getLogFile();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100); List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(records, header); HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
Writer writer2 = builder2.build();
writer2 = writer2.appendBlock(dataBlock); writer2 = writer2.appendBlock(dataBlock);
HoodieLogFile logFile1 = writer.getLogFile();
HoodieLogFile logFile2 = writer2.getLogFile();
writer.close(); writer.close();
writer2.close(); writer2.close();
assertNotNull(logFile1.getLogWriteToken()); assertNotNull(logFile1.getLogWriteToken());

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
@@ -139,6 +140,11 @@ public class TestHoodieLogFormatAppendFailure {
writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath) writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive") .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive")
.overBaseCommit("").withFs(fs).build(); .overBaseCommit("").withFs(fs).build();
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
writer.appendBlock(new HoodieCommandBlock(header));
// The log version should be different for this new writer // The log version should be different for this new writer
assertNotEquals(writer.getLogFile().getLogVersion(), logFileVersion); assertNotEquals(writer.getLogFile().getLogVersion(), logFileVersion);
} }