1
0

Revert "Read and apply schema for each log block from the metadata header instead of the latest schema"

This reverts commit 9e7ce19b06.
This commit is contained in:
Balaji Varadarajan
2019-04-18 00:44:01 -07:00
committed by Balaji Varadarajan
parent 9ef51deb84
commit 2f1e3e15fb
2 changed files with 2 additions and 52 deletions

View File

@@ -197,17 +197,12 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
switch (blockType) {
// based on type read the block
case AVRO_DATA_BLOCK:
Schema readerSchemaForBlock = readerSchema;
if (header != null) {
String schema = header.get(HeaderMetadataType.SCHEMA);
readerSchemaForBlock = schema != null ? new Schema.Parser().parse(schema) : readerSchema;
}
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
return HoodieAvroDataBlock.getBlock(content, readerSchemaForBlock);
return HoodieAvroDataBlock.getBlock(content, readerSchema);
} else {
return HoodieAvroDataBlock
.getBlock(logFile, inputStream, Optional.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, readerSchemaForBlock, header, footer);
contentPosition, contentLength, blockEndPos, readerSchema, header, footer);
}
case DELETE_BLOCK:
return HoodieDeleteBlock

View File

@@ -16,7 +16,6 @@
package com.uber.hoodie.common.table.log;
import static com.uber.hoodie.common.util.SchemaTestUtil.getEvolvedSchema;
import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -1322,48 +1321,4 @@ public class HoodieLogFormatTest {
assertFalse(reader.hasPrev());
reader.close();
}
@SuppressWarnings("unchecked")
@Test
public void testAppendWithSchemaEvolution() throws IOException, URISyntaxException, InterruptedException {
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
Schema schema = getSimpleSchema();
List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream().map(
record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(copyOfRecords1, header);
writer = writer.appendBlock(dataBlock);
writer.close();
Schema evolvedSchema = getEvolvedSchema();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, evolvedSchema.toString());
writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
.withFs(fs).build();
List<IndexedRecord> records2 = SchemaTestUtil.generateEvolvedTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream().map(
record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, evolvedSchema)).collect(Collectors.toList());
dataBlock = new HoodieAvroDataBlock(copyOfRecords2, header);
writer = writer.appendBlock(dataBlock);
writer.close();
// Pass the evolved schema as the latest schema
HoodieLogFileReader reader = new HoodieLogFileReader(fs, writer.getLogFile(), SchemaTestUtil.getEvolvedSchema(),
bufferSize, readBlocksLazily, true);
assertTrue("First block should be available", reader.hasNext());
// Read the 100 records from the first log block
assertEquals(((HoodieAvroDataBlock) reader.next()).getRecords().size(), 100);
assertTrue("Second block should be available", reader.hasNext());
// Read the 100 records from the second log block
assertEquals(((HoodieAvroDataBlock) reader.next()).getRecords().size(), 100);
reader.close();
}
}