1
0

Reducing memory footprint required in HoodieAvroDataBlock and HoodieAppendHandle

This commit is contained in:
Nishith Agarwal
2017-12-15 14:03:06 -08:00
committed by vinoth chandar
parent 85d32930cd
commit 937ae322ba
8 changed files with 174 additions and 48 deletions

View File

@@ -276,7 +276,11 @@ public class HoodieLogFormatTest {
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
Schema schema = getSimpleSchema();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords = records.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records,
@@ -292,8 +296,8 @@ public class HoodieLogFormatTest {
nextBlock.getBlockType());
HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size",
records.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", records,
copyOfRecords.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords,
dataBlockRead.getRecords());
}
@@ -305,6 +309,10 @@ public class HoodieLogFormatTest {
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
Schema schema = getSimpleSchema();
List<IndexedRecord> copyOfRecords1 = records1.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
@@ -316,6 +324,9 @@ public class HoodieLogFormatTest {
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records2 = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
dataBlock = new HoodieAvroDataBlock(records2,
getSimpleSchema(), metadata);
writer = writer.appendBlock(dataBlock);
@@ -326,6 +337,9 @@ public class HoodieLogFormatTest {
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
List<IndexedRecord> records3 = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
dataBlock = new HoodieAvroDataBlock(records3,
getSimpleSchema(), metadata);
writer = writer.appendBlock(dataBlock);
@@ -337,22 +351,22 @@ public class HoodieLogFormatTest {
HoodieLogBlock nextBlock = reader.next();
HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size",
records1.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", records1,
copyOfRecords1.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords1,
dataBlockRead.getRecords());
nextBlock = reader.next();
dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size",
records2.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", records2,
copyOfRecords2.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords2,
dataBlockRead.getRecords());
nextBlock = reader.next();
dataBlockRead = (HoodieAvroDataBlock) nextBlock;
assertEquals("Read records size should be equal to the written records size",
records3.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", records3,
copyOfRecords3.size(), dataBlockRead.getRecords().size());
assertEquals("Both records lists should be the same. (ordering guaranteed)", copyOfRecords3,
dataBlockRead.getRecords());
}
@@ -450,6 +464,9 @@ public class HoodieLogFormatTest {
.overBaseCommit("100").withFs(fs).withSizeThreshold(500).build();
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
@@ -459,6 +476,9 @@ public class HoodieLogFormatTest {
// Write 2
List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
dataBlock = new HoodieAvroDataBlock(records2, schema, metadata);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -475,8 +495,8 @@ public class HoodieLogFormatTest {
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 200 records", 200, readKeys.size());
records1.addAll(records2);
Set<String> originalKeys = records1.stream()
copyOfRecords1.addAll(copyOfRecords2);
Set<String> originalKeys = copyOfRecords1.stream()
.map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(
Collectors.toSet());
@@ -495,6 +515,9 @@ public class HoodieLogFormatTest {
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
@@ -515,6 +538,9 @@ public class HoodieLogFormatTest {
// Write 3
List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
dataBlock = new HoodieAvroDataBlock(records3, schema, metadata);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -532,8 +558,8 @@ public class HoodieLogFormatTest {
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 200 records", 200, readKeys.size());
records1.addAll(records3);
Set<String> originalKeys = records1.stream()
copyOfRecords1.addAll(copyOfRecords3);
Set<String> originalKeys = copyOfRecords1.stream()
.map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(
Collectors.toSet());
@@ -552,6 +578,9 @@ public class HoodieLogFormatTest {
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
@@ -585,6 +614,10 @@ public class HoodieLogFormatTest {
// Write 3
List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
dataBlock = new HoodieAvroDataBlock(records3, schema, metadata);
writer = writer.appendBlock(dataBlock);
writer.close();
@@ -602,8 +635,8 @@ public class HoodieLogFormatTest {
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 200 records", 200, readKeys.size());
records1.addAll(records3);
Set<String> originalKeys = records1.stream()
copyOfRecords1.addAll(copyOfRecords3);
Set<String> originalKeys = copyOfRecords1.stream()
.map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(
Collectors.toSet());
@@ -622,6 +655,9 @@ public class HoodieLogFormatTest {
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
@@ -631,11 +667,14 @@ public class HoodieLogFormatTest {
// Write 2
List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
dataBlock = new HoodieAvroDataBlock(records2, schema, metadata);
writer = writer.appendBlock(dataBlock);
records1.addAll(records2);
List<String> originalKeys = records1.stream()
copyOfRecords1.addAll(copyOfRecords2);
List<String> originalKeys = copyOfRecords1.stream()
.map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(
Collectors.toList());
@@ -691,9 +730,12 @@ public class HoodieLogFormatTest {
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
schema, metadata);
writer = writer.appendBlock(dataBlock);
@@ -703,7 +745,7 @@ public class HoodieLogFormatTest {
dataBlock = new HoodieAvroDataBlock(records2, schema, metadata);
writer = writer.appendBlock(dataBlock);
List<String> originalKeys = records1.stream()
List<String> originalKeys = copyOfRecords1.stream()
.map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(
Collectors.toList());
@@ -757,6 +799,9 @@ public class HoodieLogFormatTest {
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
@@ -764,7 +809,7 @@ public class HoodieLogFormatTest {
schema, metadata);
writer = writer.appendBlock(dataBlock);
List<String> originalKeys = records1.stream()
List<String> originalKeys = copyOfRecords1.stream()
.map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(
Collectors.toList());