From 9c8ad0f0faab29e812628d26534a0f6fca9faf60 Mon Sep 17 00:00:00 2001 From: guanziyue <30882822+guanziyue@users.noreply.github.com> Date: Thu, 9 Dec 2021 10:47:13 +0800 Subject: [PATCH] [HUDI-2665] Fix overflow of huge log file in HoodieLogFormatWriter (#3912) Co-authored-by: guanziyue.gzy --- .../table/log/HoodieLogFormatWriter.java | 12 +++- .../functional/TestHoodieLogFormat.java | 62 +++++++++++++++++++ 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index d7e4f7ef1..1c33b8124 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -148,10 +148,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { HoodieLogFormat.LogFormatVersion currentLogFormatVersion = new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION); - FSDataOutputStream outputStream = getOutputStream(); - long startPos = outputStream.getPos(); + FSDataOutputStream originalOutputStream = getOutputStream(); + long startPos = originalOutputStream.getPos(); long sizeWritten = 0; - + // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can be correctly written + FSDataOutputStream outputStream = new FSDataOutputStream(originalOutputStream, new FileSystem.Statistics(fs.getScheme()), startPos); for (HoodieLogBlock block: blocks) { long startSize = outputStream.size(); @@ -189,6 +190,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { outputStream.writeLong(outputStream.size() - startSize); // Fetch the size again, so it accounts also (9). + + // HUDI-2655. Check the size written to avoid log blocks whose size overflow. + if (outputStream.size() == Integer.MAX_VALUE) { + throw new HoodieIOException("Blocks appended may overflow. Please decrease log block size or log block amount"); + } sizeWritten += outputStream.size() - startSize; } // Flush all blocks to disk diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index e95d2d00e..517aa7f70 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -51,9 +51,11 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.exception.CorruptedLogFileException; +import org.apache.hudi.exception.HoodieIOException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -385,6 +387,66 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { reader.close(); } + @Test + public void testHugeLogFileWrite() throws IOException, URISyntaxException, InterruptedException { + Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).withSizeThreshold(3L * 1024 * 1024 * 1024) + .build(); + Schema schema = getSimpleSchema(); + List records = SchemaTestUtil.generateTestRecords(0, 1000); + List copyOfRecords = records.stream() + .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); + byte[] dataBlockContentBytes = getDataBlock(records, header).getContentBytes(); + HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, null, + Option.ofNullable(dataBlockContentBytes), false, 0, dataBlockContentBytes.length, + 0, getSimpleSchema(), header, new HashMap<>()); + long writtenSize = 0; + int logBlockWrittenNum = 0; + while (writtenSize < Integer.MAX_VALUE) { + AppendResult appendResult = writer.appendBlock(reusableDataBlock); + assertTrue(appendResult.size() > 0); + writtenSize += appendResult.size(); + logBlockWrittenNum++; + } + writer.close(); + + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), + true, true); + assertTrue(reader.hasNext(), "We wrote a block, we should be able to read it"); + HoodieLogBlock nextBlock = reader.next(); + assertEquals(dataBlockType, nextBlock.getBlockType(), "The next block should be a data block"); + HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock; + assertEquals(copyOfRecords.size(), dataBlockRead.getRecords().size(), + "Read records size should be equal to the written records size"); + assertEquals(copyOfRecords, dataBlockRead.getRecords(), + "Both records lists should be the same. (ordering guaranteed)"); + int logBlockReadNum = 1; + while (reader.hasNext()) { + reader.next(); + logBlockReadNum++; + } + assertEquals(logBlockWrittenNum, logBlockReadNum, "All written log should be correctly found"); + reader.close(); + + // test writing oversize data block which should be rejected + Writer oversizeWriter = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1").overBaseCommit("100").withSizeThreshold(3L * 1024 * 1024 * 1024).withFs(fs) + .build(); + List dataBlocks = new ArrayList<>(logBlockWrittenNum + 1); + for (int i = 0; i < logBlockWrittenNum + 1; i++) { + dataBlocks.add(reusableDataBlock); + } + assertThrows(HoodieIOException.class, () -> { + oversizeWriter.appendBlocks(dataBlocks); + }, "Blocks appended may overflow. Please decrease log block size or log block amount"); + oversizeWriter.close(); + } + @Test public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException { Writer writer =