[HUDI -409] Match header and footer block length to improve corrupted block detection (#1332)
This commit is contained in:
@@ -231,6 +231,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
inputStream.seek(currentPos + blocksize);
|
inputStream.seek(currentPos + blocksize);
|
||||||
}
|
}
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
|
LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF");
|
||||||
// this is corrupt
|
// this is corrupt
|
||||||
// This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
|
// This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream
|
||||||
// release-3.1.0-RC1/DFSInputStream.java#L1455
|
// release-3.1.0-RC1/DFSInputStream.java#L1455
|
||||||
@@ -239,12 +240,26 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if the blocksize mentioned in the footer is the same as the header; by seeking back the length of a long
|
||||||
|
// the backward seek does not incur additional IO as {@link org.apache.hadoop.hdfs.DFSInputStream#seek()}
|
||||||
|
// only moves the index. actual IO happens on the next read operation
|
||||||
|
inputStream.seek(inputStream.getPos() - Long.BYTES);
|
||||||
|
// Block size in the footer includes the magic header, which the header does not include.
|
||||||
|
// So we have to shorten the footer block size by the size of magic hash
|
||||||
|
long blockSizeFromFooter = inputStream.readLong() - MAGIC_BUFFER.length;
|
||||||
|
if (blocksize != blockSizeFromFooter) {
|
||||||
|
LOG.info("Found corrupted block in file " + logFile + ". Header block size(" + blocksize
|
||||||
|
+ ") did not match the footer block size(" + blockSizeFromFooter + ")");
|
||||||
|
inputStream.seek(currentPos);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
readMagic();
|
readMagic();
|
||||||
// all good - either we found the sync marker or EOF. Reset position and continue
|
// all good - either we found the sync marker or EOF. Reset position and continue
|
||||||
return false;
|
return false;
|
||||||
} catch (CorruptedLogFileException e) {
|
} catch (CorruptedLogFileException e) {
|
||||||
// This is a corrupted block
|
// This is a corrupted block
|
||||||
|
LOG.info("Found corrupted block in file " + logFile + ". No magic hash found right after footer block size entry");
|
||||||
return true;
|
return true;
|
||||||
} finally {
|
} finally {
|
||||||
inputStream.seek(currentPos);
|
inputStream.seek(currentPos);
|
||||||
@@ -310,7 +325,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasNextMagic() throws IOException {
|
private boolean hasNextMagic() throws IOException {
|
||||||
long pos = inputStream.getPos();
|
|
||||||
// 1. Read magic header from the start of the block
|
// 1. Read magic header from the start of the block
|
||||||
inputStream.readFully(MAGIC_BUFFER, 0, 6);
|
inputStream.readFully(MAGIC_BUFFER, 0, 6);
|
||||||
return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC);
|
return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC);
|
||||||
|
|||||||
@@ -158,6 +158,9 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
this.output.write(footerBytes);
|
this.output.write(footerBytes);
|
||||||
// 9. Write the total size of the log block (including magic) which is everything written
|
// 9. Write the total size of the log block (including magic) which is everything written
|
||||||
// until now (for reverse pointer)
|
// until now (for reverse pointer)
|
||||||
|
// Update: this information is now used in determining if a block is corrupt by comparing to the
|
||||||
|
// block size in header. This change assumes that the block size will be the last data written
|
||||||
|
// to a block. Read will break if any data is written past this point for a block.
|
||||||
this.output.writeLong(this.output.size() - currentSize);
|
this.output.writeLong(this.output.size() - currentSize);
|
||||||
// Flush every block to disk
|
// Flush every block to disk
|
||||||
flush();
|
flush();
|
||||||
|
|||||||
@@ -491,16 +491,27 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
|||||||
// create a block with
|
// create a block with
|
||||||
outputStream.write(HoodieLogFormat.MAGIC);
|
outputStream.write(HoodieLogFormat.MAGIC);
|
||||||
// Write out a length that does not confirm with the content
|
// Write out a length that does not confirm with the content
|
||||||
outputStream.writeLong(1000);
|
outputStream.writeLong(474);
|
||||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||||
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
||||||
// Write out a length that does not confirm with the content
|
// Write out a length that does not confirm with the content
|
||||||
outputStream.writeLong(500);
|
outputStream.writeLong(400);
|
||||||
// Write out some bytes
|
// Write out incomplete content
|
||||||
outputStream.write("something-random".getBytes());
|
outputStream.write("something-random".getBytes());
|
||||||
outputStream.flush();
|
outputStream.flush();
|
||||||
outputStream.close();
|
outputStream.close();
|
||||||
|
|
||||||
|
// Append a proper block that is of the missing length of the corrupted block
|
||||||
|
writer =
|
||||||
|
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||||
|
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||||
|
records = SchemaTestUtil.generateTestRecords(0, 10);
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||||
|
dataBlock = new HoodieAvroDataBlock(records, header);
|
||||||
|
writer = writer.appendBlock(dataBlock);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
|
||||||
// First round of reads - we should be able to read the first block and then EOF
|
// First round of reads - we should be able to read the first block and then EOF
|
||||||
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
|
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
|
||||||
assertTrue("First block should be available", reader.hasNext());
|
assertTrue("First block should be available", reader.hasNext());
|
||||||
@@ -508,6 +519,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
|||||||
assertTrue("We should have corrupted block next", reader.hasNext());
|
assertTrue("We should have corrupted block next", reader.hasNext());
|
||||||
HoodieLogBlock block = reader.next();
|
HoodieLogBlock block = reader.next();
|
||||||
assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());
|
assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());
|
||||||
|
assertTrue("Third block should be available", reader.hasNext());
|
||||||
|
reader.next();
|
||||||
assertFalse("There should be no more block left", reader.hasNext());
|
assertFalse("There should be no more block left", reader.hasNext());
|
||||||
|
|
||||||
reader.close();
|
reader.close();
|
||||||
@@ -543,6 +556,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
|||||||
reader.next();
|
reader.next();
|
||||||
assertTrue("We should get the 1st corrupted block next", reader.hasNext());
|
assertTrue("We should get the 1st corrupted block next", reader.hasNext());
|
||||||
reader.next();
|
reader.next();
|
||||||
|
assertTrue("Third block should be available", reader.hasNext());
|
||||||
|
reader.next();
|
||||||
assertTrue("We should get the 2nd corrupted block next", reader.hasNext());
|
assertTrue("We should get the 2nd corrupted block next", reader.hasNext());
|
||||||
block = reader.next();
|
block = reader.next();
|
||||||
assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());
|
assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType());
|
||||||
|
|||||||
Reference in New Issue
Block a user