diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java index 836870e6e..878457f54 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java @@ -197,12 +197,17 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader { switch (blockType) { // based on type read the block case AVRO_DATA_BLOCK: + Schema readerSchemaForBlock = readerSchema; + if (header != null) { + String schema = header.get(HeaderMetadataType.SCHEMA); + readerSchemaForBlock = schema != null ? new Schema.Parser().parse(schema) : readerSchema; + } if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { - return HoodieAvroDataBlock.getBlock(content, readerSchema); + return HoodieAvroDataBlock.getBlock(content, readerSchemaForBlock); } else { return HoodieAvroDataBlock .getBlock(logFile, inputStream, Optional.ofNullable(content), readBlockLazily, - contentPosition, contentLength, blockEndPos, readerSchema, header, footer); + contentPosition, contentLength, blockEndPos, readerSchemaForBlock, header, footer); } case DELETE_BLOCK: return HoodieDeleteBlock diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index cc795ffcb..80b1d58f8 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.table.log; +import static com.uber.hoodie.common.util.SchemaTestUtil.getEvolvedSchema; import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1321,4 +1322,48 @@ public class HoodieLogFormatTest { assertFalse(reader.hasPrev()); reader.close(); } + + @SuppressWarnings("unchecked") + @Test + public void testAppendWithSchemaEvolution() throws IOException, URISyntaxException, InterruptedException { + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + Schema schema = getSimpleSchema(); + List records1 = SchemaTestUtil.generateTestRecords(0, 100); + List copyOfRecords1 = records1.stream().map( + record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); + Map header = Maps.newHashMap(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(copyOfRecords1, header); + writer = writer.appendBlock(dataBlock); + writer.close(); + + Schema evolvedSchema = getEvolvedSchema(); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, evolvedSchema.toString()); + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100") + .withFs(fs).build(); + List records2 = SchemaTestUtil.generateEvolvedTestRecords(0, 100); + List copyOfRecords2 = records2.stream().map( + record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, evolvedSchema)).collect(Collectors.toList()); + dataBlock = new HoodieAvroDataBlock(copyOfRecords2, header); + writer = writer.appendBlock(dataBlock); + writer.close(); + + // Pass the evolved schema as the latest schema + HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getEvolvedSchema(), + bufferSize, readBlocksLazily, true); + + assertTrue("First block should be available", reader.hasNext()); + // Read the 100 records from the first log block + assertEquals(((HoodieAvroDataBlock) reader.next()).getRecords().size(), 100); + + assertTrue("Second block should be available", reader.hasNext()); + // Read the 100 records from the second log block + assertEquals(((HoodieAvroDataBlock) reader.next()).getRecords().size(), 100); + + reader.close(); + } }