[HUDI-2665] Fix overflow of huge log file in HoodieLogFormatWriter (#3912)
Co-authored-by: guanziyue.gzy <guanziyue.gzy@bytedance.com>
This commit is contained in:
@@ -148,10 +148,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
|
HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
|
||||||
new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
|
new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
|
||||||
|
|
||||||
FSDataOutputStream outputStream = getOutputStream();
|
FSDataOutputStream originalOutputStream = getOutputStream();
|
||||||
long startPos = outputStream.getPos();
|
long startPos = originalOutputStream.getPos();
|
||||||
long sizeWritten = 0;
|
long sizeWritten = 0;
|
||||||
|
// HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can be correctly written
|
||||||
|
FSDataOutputStream outputStream = new FSDataOutputStream(originalOutputStream, new FileSystem.Statistics(fs.getScheme()), startPos);
|
||||||
for (HoodieLogBlock block: blocks) {
|
for (HoodieLogBlock block: blocks) {
|
||||||
long startSize = outputStream.size();
|
long startSize = outputStream.size();
|
||||||
|
|
||||||
@@ -189,6 +190,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
outputStream.writeLong(outputStream.size() - startSize);
|
outputStream.writeLong(outputStream.size() - startSize);
|
||||||
|
|
||||||
// Fetch the size again, so it accounts also (9).
|
// Fetch the size again, so it accounts also (9).
|
||||||
|
|
||||||
|
// HUDI-2655. Check the size written to avoid log blocks whose size overflow.
|
||||||
|
if (outputStream.size() == Integer.MAX_VALUE) {
|
||||||
|
throw new HoodieIOException("Blocks appended may overflow. Please decrease log block size or log block amount");
|
||||||
|
}
|
||||||
sizeWritten += outputStream.size() - startSize;
|
sizeWritten += outputStream.size() - startSize;
|
||||||
}
|
}
|
||||||
// Flush all blocks to disk
|
// Flush all blocks to disk
|
||||||
|
|||||||
@@ -51,9 +51,11 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
|||||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||||
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
|
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
||||||
import org.apache.hudi.exception.CorruptedLogFileException;
|
import org.apache.hudi.exception.CorruptedLogFileException;
|
||||||
|
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
import org.junit.jupiter.api.AfterAll;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
@@ -385,6 +387,66 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
|
|||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHugeLogFileWrite() throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
Writer writer =
|
||||||
|
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||||
|
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).withSizeThreshold(3L * 1024 * 1024 * 1024)
|
||||||
|
.build();
|
||||||
|
Schema schema = getSimpleSchema();
|
||||||
|
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 1000);
|
||||||
|
List<IndexedRecord> copyOfRecords = records.stream()
|
||||||
|
.map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||||
|
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
|
||||||
|
byte[] dataBlockContentBytes = getDataBlock(records, header).getContentBytes();
|
||||||
|
HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null, null,
|
||||||
|
Option.ofNullable(dataBlockContentBytes), false, 0, dataBlockContentBytes.length,
|
||||||
|
0, getSimpleSchema(), header, new HashMap<>());
|
||||||
|
long writtenSize = 0;
|
||||||
|
int logBlockWrittenNum = 0;
|
||||||
|
while (writtenSize < Integer.MAX_VALUE) {
|
||||||
|
AppendResult appendResult = writer.appendBlock(reusableDataBlock);
|
||||||
|
assertTrue(appendResult.size() > 0);
|
||||||
|
writtenSize += appendResult.size();
|
||||||
|
logBlockWrittenNum++;
|
||||||
|
}
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(),
|
||||||
|
true, true);
|
||||||
|
assertTrue(reader.hasNext(), "We wrote a block, we should be able to read it");
|
||||||
|
HoodieLogBlock nextBlock = reader.next();
|
||||||
|
assertEquals(dataBlockType, nextBlock.getBlockType(), "The next block should be a data block");
|
||||||
|
HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
|
||||||
|
assertEquals(copyOfRecords.size(), dataBlockRead.getRecords().size(),
|
||||||
|
"Read records size should be equal to the written records size");
|
||||||
|
assertEquals(copyOfRecords, dataBlockRead.getRecords(),
|
||||||
|
"Both records lists should be the same. (ordering guaranteed)");
|
||||||
|
int logBlockReadNum = 1;
|
||||||
|
while (reader.hasNext()) {
|
||||||
|
reader.next();
|
||||||
|
logBlockReadNum++;
|
||||||
|
}
|
||||||
|
assertEquals(logBlockWrittenNum, logBlockReadNum, "All written log should be correctly found");
|
||||||
|
reader.close();
|
||||||
|
|
||||||
|
// test writing oversize data block which should be rejected
|
||||||
|
Writer oversizeWriter =
|
||||||
|
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||||
|
.withFileId("test-fileid1").overBaseCommit("100").withSizeThreshold(3L * 1024 * 1024 * 1024).withFs(fs)
|
||||||
|
.build();
|
||||||
|
List<HoodieLogBlock> dataBlocks = new ArrayList<>(logBlockWrittenNum + 1);
|
||||||
|
for (int i = 0; i < logBlockWrittenNum + 1; i++) {
|
||||||
|
dataBlocks.add(reusableDataBlock);
|
||||||
|
}
|
||||||
|
assertThrows(HoodieIOException.class, () -> {
|
||||||
|
oversizeWriter.appendBlocks(dataBlocks);
|
||||||
|
}, "Blocks appended may overflow. Please decrease log block size or log block amount");
|
||||||
|
oversizeWriter.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException {
|
public void testBasicAppendAndRead() throws IOException, URISyntaxException, InterruptedException {
|
||||||
Writer writer =
|
Writer writer =
|
||||||
|
|||||||
Reference in New Issue
Block a user