Caching Avro Binary encoder/decoder to avoid creating new one for every record
This commit is contained in:
committed by
vinoth chandar
parent
ee1feb7c75
commit
a33a55fcb5
@@ -37,6 +37,8 @@ import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.avro.io.BinaryDecoder;
|
||||
import org.apache.avro.io.BinaryEncoder;
|
||||
import org.apache.avro.io.Decoder;
|
||||
import org.apache.avro.io.DecoderFactory;
|
||||
import org.apache.avro.io.Encoder;
|
||||
@@ -52,6 +54,8 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
|
||||
|
||||
private List<IndexedRecord> records;
|
||||
private Schema schema;
|
||||
private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
|
||||
private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
|
||||
|
||||
public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records,
|
||||
@Nonnull Map<HeaderMetadataType, String> header,
|
||||
@@ -118,7 +122,8 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
|
||||
while (itr.hasNext()) {
|
||||
IndexedRecord s = itr.next();
|
||||
ByteArrayOutputStream temp = new ByteArrayOutputStream();
|
||||
Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null);
|
||||
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get());
|
||||
encoderCache.set(encoder);
|
||||
try {
|
||||
// Encode the record into bytes
|
||||
writer.write(s, encoder);
|
||||
@@ -201,8 +206,9 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
|
||||
// 3. Read the content
|
||||
for (int i = 0; i < totalRecords; i++) {
|
||||
int recordLength = dis.readInt();
|
||||
Decoder decoder = DecoderFactory.get()
|
||||
.binaryDecoder(getContent().get(), dis.getNumberOfBytesRead(), recordLength, null);
|
||||
BinaryDecoder decoder = DecoderFactory.get()
|
||||
.binaryDecoder(getContent().get(), dis.getNumberOfBytesRead(), recordLength, decoderCache.get());
|
||||
decoderCache.set(decoder);
|
||||
IndexedRecord record = reader.read(null, decoder);
|
||||
records.add(record);
|
||||
dis.skipBytes(recordLength);
|
||||
|
||||
Reference in New Issue
Block a user