Removing OLD MAGIC header since a) it's no longer used b) causes issues when the data actually has OLD MAGIC
This commit is contained in:
committed by
vinoth chandar
parent
2f1e3e15fb
commit
26f24b6728
@@ -54,7 +54,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
|
|
||||||
private final FSDataInputStream inputStream;
|
private final FSDataInputStream inputStream;
|
||||||
private final HoodieLogFile logFile;
|
private final HoodieLogFile logFile;
|
||||||
private static final byte[] oldMagicBuffer = new byte[4];
|
|
||||||
private static final byte[] magicBuffer = new byte[6];
|
private static final byte[] magicBuffer = new byte[6];
|
||||||
private final Schema readerSchema;
|
private final Schema readerSchema;
|
||||||
private HoodieLogFormat.LogFormatVersion nextBlockVersion;
|
private HoodieLogFormat.LogFormatVersion nextBlockVersion;
|
||||||
@@ -121,23 +120,11 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
Map<HeaderMetadataType, String> header = null;
|
Map<HeaderMetadataType, String> header = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (isOldMagic()) {
|
// 1 Read the total size of the block
|
||||||
// 1 Read the block type for a log block
|
blocksize = (int) inputStream.readLong();
|
||||||
type = inputStream.readInt();
|
|
||||||
|
|
||||||
Preconditions.checkArgument(type < HoodieLogBlockType.values().length,
|
|
||||||
"Invalid block byte type found " + type);
|
|
||||||
blockType = HoodieLogBlockType.values()[type];
|
|
||||||
|
|
||||||
// 2 Read the total size of the block
|
|
||||||
blocksize = inputStream.readInt();
|
|
||||||
} else {
|
|
||||||
// 1 Read the total size of the block
|
|
||||||
blocksize = (int) inputStream.readLong();
|
|
||||||
}
|
|
||||||
} catch (EOFException | CorruptedLogFileException e) {
|
} catch (EOFException | CorruptedLogFileException e) {
|
||||||
// An exception reading any of the above indicates a corrupt block
|
// An exception reading any of the above indicates a corrupt block
|
||||||
// Create a corrupt block by finding the next OLD_MAGIC marker or EOF
|
// Create a corrupt block by finding the next MAGIC marker or EOF
|
||||||
return createCorruptBlock();
|
return createCorruptBlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -297,22 +284,12 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read log format version from log file, if present For old log files written with Magic header
|
* Read log format version from log file.
|
||||||
* OLD_MAGIC and without version, return DEFAULT_VERSION
|
|
||||||
*/
|
*/
|
||||||
private HoodieLogFormat.LogFormatVersion readVersion() throws IOException {
|
private HoodieLogFormat.LogFormatVersion readVersion() throws IOException {
|
||||||
// If not old log file format (written with Magic header OLD_MAGIC), then read log version
|
|
||||||
if (Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC)) {
|
|
||||||
Arrays.fill(oldMagicBuffer, (byte) 0);
|
|
||||||
return new HoodieLogFormatVersion(HoodieLogFormatVersion.DEFAULT_VERSION);
|
|
||||||
}
|
|
||||||
return new HoodieLogFormatVersion(inputStream.readInt());
|
return new HoodieLogFormatVersion(inputStream.readInt());
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isOldMagic() {
|
|
||||||
return Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private boolean readMagic() throws IOException {
|
private boolean readMagic() throws IOException {
|
||||||
try {
|
try {
|
||||||
@@ -334,13 +311,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
|||||||
// 1. Read magic header from the start of the block
|
// 1. Read magic header from the start of the block
|
||||||
inputStream.readFully(magicBuffer, 0, 6);
|
inputStream.readFully(magicBuffer, 0, 6);
|
||||||
if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) {
|
if (!Arrays.equals(magicBuffer, HoodieLogFormat.MAGIC)) {
|
||||||
inputStream.seek(pos);
|
return false;
|
||||||
// 1. Read old magic header from the start of the block
|
|
||||||
// (for backwards compatibility of older log files written without log version)
|
|
||||||
inputStream.readFully(oldMagicBuffer, 0, 4);
|
|
||||||
if (!Arrays.equals(oldMagicBuffer, HoodieLogFormat.OLD_MAGIC)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,25 +29,16 @@ import org.apache.log4j.LogManager;
|
|||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* File Format for Hoodie Log Files. The File Format consists of blocks each seperated with a
|
* File Format for Hoodie Log Files. The File Format consists of blocks each separated with a
|
||||||
* OLD_MAGIC sync marker. A Block can either be a Data block, Command block or Delete Block. Data
|
* MAGIC sync marker. A Block can either be a Data block, Command block or Delete Block. Data
|
||||||
* Block - Contains log records serialized as Avro Binary Format Command Block - Specific commands
|
* Block - Contains log records serialized as Avro Binary Format Command Block - Specific commands
|
||||||
* like RoLLBACK_PREVIOUS-BLOCK - Tombstone for the previously written block Delete Block - List of
|
* like ROLLBACK_PREVIOUS-BLOCK - Tombstone for the previously written block Delete Block - List of
|
||||||
* keys to delete - tombstone for keys
|
* keys to delete - tombstone for keys
|
||||||
*/
|
*/
|
||||||
public interface HoodieLogFormat {
|
public interface HoodieLogFormat {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Magic 4 bytes we put at the start of every block in the log file. Sync marker. We could make
|
* Magic 6 bytes we put at the start of every block in the log file.
|
||||||
* this file specific (generate a random 4 byte magic and stick it in the file header), but this I
|
|
||||||
* think is suffice for now - PR
|
|
||||||
*/
|
|
||||||
byte[] OLD_MAGIC = new byte[] {'H', 'U', 'D', 'I'};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Magic 6 bytes we put at the start of every block in the log file. This is added to maintain
|
|
||||||
* backwards compatiblity due to lack of log format/block version in older log files. All new log
|
|
||||||
* block will now write this OLD_MAGIC value
|
|
||||||
*/
|
*/
|
||||||
byte[] MAGIC = new byte[] {'#', 'H', 'U', 'D', 'I', '#'};
|
byte[] MAGIC = new byte[] {'#', 'H', 'U', 'D', 'I', '#'};
|
||||||
|
|
||||||
|
|||||||
@@ -1057,83 +1057,6 @@ public class HoodieLogFormatTest {
|
|||||||
assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords());
|
assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMagicAndLogVersionsBackwardsCompatibility()
|
|
||||||
throws IOException, InterruptedException, URISyntaxException {
|
|
||||||
// Create the log file
|
|
||||||
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
|
|
||||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
|
|
||||||
.overBaseCommit("100").withFs(fs).build();
|
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
|
|
||||||
List<IndexedRecord> records = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
|
||||||
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
|
|
||||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
|
|
||||||
writer.close();
|
|
||||||
|
|
||||||
// Write 1 with OLD_MAGIC and no log format version
|
|
||||||
// Append a log block to end of the log (mimics a log block with old format
|
|
||||||
// fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
|
|
||||||
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
|
|
||||||
// create a block with
|
|
||||||
outputStream.write(HoodieLogFormat.OLD_MAGIC);
|
|
||||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
|
||||||
// Write out a length that does not confirm with the content
|
|
||||||
records = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
|
||||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, new String(HoodieAvroUtils.compress(schema.toString())));
|
|
||||||
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, schema);
|
|
||||||
byte[] content = dataBlock.getBytes(schema);
|
|
||||||
outputStream.writeInt(content.length);
|
|
||||||
// Write out some content
|
|
||||||
outputStream.write(content);
|
|
||||||
outputStream.flush();
|
|
||||||
outputStream.hflush();
|
|
||||||
outputStream.close();
|
|
||||||
|
|
||||||
writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
|
|
||||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
|
|
||||||
.withFs(fs).build();
|
|
||||||
|
|
||||||
// Write 2 with MAGIC and latest log format version
|
|
||||||
records = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
|
||||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
|
||||||
dataBlock = new HoodieAvroDataBlock(records, header);
|
|
||||||
writer = writer.appendBlock(dataBlock);
|
|
||||||
|
|
||||||
// Write 3 with MAGIC and latest log format version
|
|
||||||
writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
|
|
||||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
|
|
||||||
.withFs(fs).build();
|
|
||||||
records = SchemaTestUtil.generateHoodieTestRecords(0, 100);
|
|
||||||
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
|
|
||||||
dataBlock = new HoodieAvroDataBlock(records, header);
|
|
||||||
writer = writer.appendBlock(dataBlock);
|
|
||||||
writer.close();
|
|
||||||
|
|
||||||
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), schema);
|
|
||||||
|
|
||||||
// Read the first block written with latest version and magic
|
|
||||||
reader.hasNext();
|
|
||||||
HoodieLogBlock block = reader.next();
|
|
||||||
assertEquals(block.getBlockType(), HoodieLogBlockType.AVRO_DATA_BLOCK);
|
|
||||||
HoodieAvroDataBlock dBlock = (HoodieAvroDataBlock) block;
|
|
||||||
assertEquals(dBlock.getRecords().size(), 100);
|
|
||||||
|
|
||||||
// Read second block written with old magic and no version
|
|
||||||
reader.hasNext();
|
|
||||||
block = reader.next();
|
|
||||||
assertEquals(block.getBlockType(), HoodieLogBlockType.AVRO_DATA_BLOCK);
|
|
||||||
dBlock = (HoodieAvroDataBlock) block;
|
|
||||||
assertEquals(dBlock.getRecords().size(), 100);
|
|
||||||
|
|
||||||
//Read third block written with latest version and magic
|
|
||||||
reader.hasNext();
|
|
||||||
block = reader.next();
|
|
||||||
assertEquals(block.getBlockType(), HoodieLogBlockType.AVRO_DATA_BLOCK);
|
|
||||||
dBlock = (HoodieAvroDataBlock) block;
|
|
||||||
assertEquals(dBlock.getRecords().size(), 100);
|
|
||||||
reader.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {
|
public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {
|
||||||
@@ -1222,7 +1145,7 @@ public class HoodieLogFormatTest {
|
|||||||
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
|
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
|
||||||
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
|
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
|
||||||
// create a block with
|
// create a block with
|
||||||
outputStream.write(HoodieLogFormat.OLD_MAGIC);
|
outputStream.write(HoodieLogFormat.MAGIC);
|
||||||
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
|
||||||
// Write out a length that does not confirm with the content
|
// Write out a length that does not confirm with the content
|
||||||
outputStream.writeInt(1000);
|
outputStream.writeInt(1000);
|
||||||
|
|||||||
Reference in New Issue
Block a user