[HUDI-2900] Fix corrupt block end position (#4181)
* [HUDI-2900] Fix corrupt block end position * add a test
This commit is contained in:
@@ -284,7 +284,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
long contentPosition = inputStream.getPos();
|
long contentPosition = inputStream.getPos();
|
||||||
byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily);
|
byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily);
|
||||||
return HoodieCorruptBlock.getBlock(logFile, inputStream, Option.ofNullable(corruptedBytes), readBlockLazily,
|
return HoodieCorruptBlock.getBlock(logFile, inputStream, Option.ofNullable(corruptedBytes), readBlockLazily,
|
||||||
contentPosition, corruptedBlockSize, corruptedBlockSize, new HashMap<>(), new HashMap<>());
|
contentPosition, corruptedBlockSize, nextBlockOffset, new HashMap<>(), new HashMap<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isBlockCorrupt(int blocksize) throws IOException {
|
private boolean isBlockCorrupt(int blocksize) throws IOException {
|
||||||
|
|||||||
@@ -67,10 +67,10 @@ public class HoodieCommandBlock extends HoodieLogBlock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
|
public static HoodieLogBlock getBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content,
|
||||||
boolean readBlockLazily, long position, long blockSize, long blockEndpos, Map<HeaderMetadataType, String> header,
|
boolean readBlockLazily, long position, long blockSize, long blockEndPos, Map<HeaderMetadataType, String> header,
|
||||||
Map<HeaderMetadataType, String> footer) {
|
Map<HeaderMetadataType, String> footer) {
|
||||||
|
|
||||||
return new HoodieCommandBlock(content, inputStream, readBlockLazily,
|
return new HoodieCommandBlock(content, inputStream, readBlockLazily,
|
||||||
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), header, footer);
|
Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), header, footer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -603,6 +603,63 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
|||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
Writer writer =
|
||||||
|
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||||
|
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||||
|
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
|
||||||
|
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||||
|
HoodieDataBlock dataBlock = getDataBlock(records, header);
|
||||||
|
writer.appendBlock(dataBlock);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
// Append some arbit byte[] to the end of the log (mimics a partially written commit)
|
||||||
|
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
|
||||||
|
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
|
||||||
|
// create a block with
|
||||||
|
outputStream.write(HoodieLogFormat.MAGIC);
|
||||||
|
// Write out a length that does not confirm with the content
|
||||||
|
outputStream.writeLong(474);
|
||||||
|
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||||
|
outputStream.writeInt(HoodieLogFormat.CURRENT_VERSION);
|
||||||
|
// Write out a length that does not confirm with the content
|
||||||
|
outputStream.writeLong(400);
|
||||||
|
// Write out incomplete content
|
||||||
|
outputStream.write("something-random".getBytes());
|
||||||
|
// get corrupt block end position
|
||||||
|
long corruptBlockEndPos = outputStream.getPos();
|
||||||
|
outputStream.flush();
|
||||||
|
outputStream.close();
|
||||||
|
|
||||||
|
// Append a proper block again
|
||||||
|
writer =
|
||||||
|
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||||
|
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
|
||||||
|
records = SchemaTestUtil.generateTestRecords(0, 10);
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||||
|
dataBlock = getDataBlock(records, header);
|
||||||
|
writer.appendBlock(dataBlock);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
// Read data and corrupt block
|
||||||
|
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
|
||||||
|
assertTrue(reader.hasNext(), "First block should be available");
|
||||||
|
reader.next();
|
||||||
|
assertTrue(reader.hasNext(), "We should have corrupted block next");
|
||||||
|
HoodieLogBlock block = reader.next();
|
||||||
|
assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
|
||||||
|
// validate the corrupt block end position correctly.
|
||||||
|
assertEquals(corruptBlockEndPos, block.getBlockContentLocation().get().getBlockEndPos());
|
||||||
|
assertTrue(reader.hasNext(), "Third block should be available");
|
||||||
|
reader.next();
|
||||||
|
assertFalse(reader.hasNext(), "There should be no more block left");
|
||||||
|
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("testArguments")
|
@MethodSource("testArguments")
|
||||||
public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType,
|
public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType,
|
||||||
|
|||||||
Reference in New Issue
Block a user