Read and apply schema for each log block from the metadata header instead of the latest schema
This commit is contained in:
committed by
vinoth chandar
parent
83b6aa5e91
commit
9e7ce19b06
@@ -197,12 +197,17 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
switch (blockType) {
|
switch (blockType) {
|
||||||
// based on type read the block
|
// based on type read the block
|
||||||
case AVRO_DATA_BLOCK:
|
case AVRO_DATA_BLOCK:
|
||||||
|
Schema readerSchemaForBlock = readerSchema;
|
||||||
|
if (header != null) {
|
||||||
|
String schema = header.get(HeaderMetadataType.SCHEMA);
|
||||||
|
readerSchemaForBlock = schema != null ? new Schema.Parser().parse(schema) : readerSchema;
|
||||||
|
}
|
||||||
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
|
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
|
||||||
return HoodieAvroDataBlock.getBlock(content, readerSchema);
|
return HoodieAvroDataBlock.getBlock(content, readerSchemaForBlock);
|
||||||
} else {
|
} else {
|
||||||
return HoodieAvroDataBlock
|
return HoodieAvroDataBlock
|
||||||
.getBlock(logFile, inputStream, Optional.ofNullable(content), readBlockLazily,
|
.getBlock(logFile, inputStream, Optional.ofNullable(content), readBlockLazily,
|
||||||
contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
|
contentPosition, contentLength, blockEndPos, readerSchemaForBlock, header, footer);
|
||||||
}
|
}
|
||||||
case DELETE_BLOCK:
|
case DELETE_BLOCK:
|
||||||
return HoodieDeleteBlock
|
return HoodieDeleteBlock
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
package com.uber.hoodie.common.table.log;
|
package com.uber.hoodie.common.table.log;
|
||||||
|
|
||||||
|
import static com.uber.hoodie.common.util.SchemaTestUtil.getEvolvedSchema;
|
||||||
import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;
|
import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
@@ -1321,4 +1322,48 @@ public class HoodieLogFormatTest {
|
|||||||
assertFalse(reader.hasPrev());
|
assertFalse(reader.hasPrev());
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void testAppendWithSchemaEvolution() throws IOException, URISyntaxException, InterruptedException {
|
||||||
|
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
|
||||||
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
|
||||||
|
.overBaseCommit("100").withFs(fs).build();
|
||||||
|
Schema schema = getSimpleSchema();
|
||||||
|
List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
|
||||||
|
List<IndexedRecord> copyOfRecords1 = records1.stream().map(
|
||||||
|
record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
|
||||||
|
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
||||||
|
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(copyOfRecords1, header);
|
||||||
|
writer = writer.appendBlock(dataBlock);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
Schema evolvedSchema = getEvolvedSchema();
|
||||||
|
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, evolvedSchema.toString());
|
||||||
|
writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
|
||||||
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
|
||||||
|
.withFs(fs).build();
|
||||||
|
List<IndexedRecord> records2 = SchemaTestUtil.generateEvolvedTestRecords(0, 100);
|
||||||
|
List<IndexedRecord> copyOfRecords2 = records2.stream().map(
|
||||||
|
record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, evolvedSchema)).collect(Collectors.toList());
|
||||||
|
dataBlock = new HoodieAvroDataBlock(copyOfRecords2, header);
|
||||||
|
writer = writer.appendBlock(dataBlock);
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
// Pass the evolved schema as the latest schema
|
||||||
|
HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getEvolvedSchema(),
|
||||||
|
bufferSize, readBlocksLazily, true);
|
||||||
|
|
||||||
|
assertTrue("First block should be available", reader.hasNext());
|
||||||
|
// Read the 100 records from the first log block
|
||||||
|
assertEquals(((HoodieAvroDataBlock) reader.next()).getRecords().size(), 100);
|
||||||
|
|
||||||
|
assertTrue("Second block should be available", reader.hasNext());
|
||||||
|
// Read the 100 records from the second log block
|
||||||
|
assertEquals(((HoodieAvroDataBlock) reader.next()).getRecords().size(), 100);
|
||||||
|
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user