From 110df7190b9c414d25ab7f7012bcbb524db8b0b4 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Fri, 21 Dec 2018 14:51:44 -0800 Subject: [PATCH] Enabling hard deletes for MergeOnRead table type --- .../uber/hoodie/io/HoodieAppendHandle.java | 7 +-- .../hoodie/table/TestMergeOnReadTable.java | 4 +- .../common/model/HoodieAvroPayload.java | 9 +++- .../log/AbstractHoodieLogRecordScanner.java | 5 +- .../log/HoodieMergedLogRecordScanner.java | 13 ++--- .../log/HoodieUnMergedLogRecordScanner.java | 3 +- .../table/log/block/HoodieDeleteBlock.java | 15 +++--- .../hoodie/common/util/SpillableMapUtils.java | 11 +++++ .../common/table/log/HoodieLogFormatTest.java | 47 +++++++++++++++---- .../RealtimeCompactedRecordReader.java | 28 ++++++++--- 10 files changed, 103 insertions(+), 39 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 452d111eb..04f715bab 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -20,6 +20,7 @@ import com.beust.jcommander.internal.Maps; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDeltaWriteStat; +import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; @@ -67,7 +68,7 @@ public class HoodieAppendHandle extends HoodieIOH // Buffer for holding records in memory before they are flushed to disk private List recordList = new ArrayList<>(); // Buffer for holding records (to be deleted) in memory before they are flushed to disk - private List keysToDelete = new ArrayList<>(); + private List keysToDelete = new ArrayList<>(); private TableFileSystemView.RealtimeView fileSystemView; private String partitionPath; private Iterator> recordItr; @@ -209,7 +210,7 @@ public class HoodieAppendHandle extends HoodieIOH } if (keysToDelete.size() > 0) { writer = writer.appendBlock( - new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new), header)); + new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header)); keysToDelete.clear(); } } catch (Exception e) { @@ -286,7 +287,7 @@ public class HoodieAppendHandle extends HoodieIOH if (indexedRecord.isPresent()) { recordList.add(indexedRecord.get()); } else { - keysToDelete.add(record.getRecordKey()); + keysToDelete.add(record.getKey()); } numberOfRecords++; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index ed68e5857..1b86180fa 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -326,8 +326,8 @@ public class TestMergeOnReadTable { List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); - //Wrote 40 records and deleted 20 records, so remaining 40-20 = 20 - assertEquals("Must contain 20 records", 20, recordsRead.size()); + //Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 + assertEquals("Must contain 0 records", 0, recordsRead.size()); } @Test diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java index 9e4be0db9..692079014 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java @@ -36,7 +36,11 @@ public class HoodieAvroPayload implements HoodieRecordPayload public HoodieAvroPayload(Optional record) { try { - this.recordBytes = HoodieAvroUtils.avroToBytes(record.get()); + if (record.isPresent()) { + this.recordBytes = HoodieAvroUtils.avroToBytes(record.get()); + } else { + this.recordBytes = new byte[0]; + } } catch (IOException io) { throw new HoodieIOException("Cannot convert record to bytes", io); } @@ -55,6 +59,9 @@ public class HoodieAvroPayload implements HoodieRecordPayload @Override public Optional getInsertValue(Schema schema) throws IOException { + if (recordBytes.length == 0) { + return Optional.empty(); + } Optional record = Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema)); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java index 56271904d..5a4bb2451 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java @@ -19,6 +19,7 @@ package com.uber.hoodie.common.table.log; import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK; +import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; @@ -63,7 +64,7 @@ public abstract class AbstractHoodieLogRecordScanner { private static final Logger log = LogManager.getLogger(AbstractHoodieLogRecordScanner.class); // Reader schema for the records - private final Schema readerSchema; + protected final Schema readerSchema; // Latest valid instant time // Log-Blocks belonging to inflight delta-instants are filtered-out using this high-watermark. private final String latestInstantTime; @@ -291,7 +292,7 @@ public abstract class AbstractHoodieLogRecordScanner { * * @param key Deleted record key */ - protected abstract void processNextDeletedKey(String key); + protected abstract void processNextDeletedKey(HoodieKey key); /** * Process the set of log blocks belonging to the last instant which is read fully. diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java index 6a12d5914..f77ec9bb1 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieMergedLogRecordScanner.java @@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.util.DefaultSizeEstimator; import com.uber.hoodie.common.util.HoodieRecordSizeEstimator; import com.uber.hoodie.common.util.HoodieTimer; +import com.uber.hoodie.common.util.SpillableMapUtils; import com.uber.hoodie.common.util.collection.ExternalSpillableMap; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; @@ -102,10 +103,11 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner } @Override - protected void processNextRecord(HoodieRecord hoodieRecord) { + protected void processNextRecord(HoodieRecord hoodieRecord) throws IOException { String key = hoodieRecord.getRecordKey(); if (records.containsKey(key)) { - // Merge and store the merged record + // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be + // done when a delete (empty payload) is encountered before or after an insert/update. HoodieRecordPayload combinedValue = records.get(key).getData().preCombine(hoodieRecord.getData()); records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); } else { @@ -115,10 +117,9 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner } @Override - protected void processNextDeletedKey(String key) { - // TODO : If delete is the only block written and/or records are present in parquet file - // TODO : Mark as tombstone (optional.empty()) for data instead of deleting the entry - records.remove(key); + protected void processNextDeletedKey(HoodieKey hoodieKey) { + records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(), + hoodieKey.getPartitionPath(), getPayloadClassFQN())); } public long getTotalTimeTakenToReadAndMergeBlocks() { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieUnMergedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieUnMergedLogRecordScanner.java index 98264352b..f8673f458 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -18,6 +18,7 @@ package com.uber.hoodie.common.table.log; +import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import java.util.List; @@ -43,7 +44,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann } @Override - protected void processNextDeletedKey(String key) { + protected void processNextDeletedKey(HoodieKey key) { throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config"); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java index eddae761d..9ee4f296b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java @@ -16,16 +16,16 @@ package com.uber.hoodie.common.table.log.block; +import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.storage.SizeAwareDataInputStream; -import com.uber.hoodie.common.util.StringUtils; +import com.uber.hoodie.common.util.SerializationUtils; import com.uber.hoodie.exception.HoodieIOException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -36,9 +36,9 @@ import org.apache.hadoop.fs.FSDataInputStream; */ public class HoodieDeleteBlock extends HoodieLogBlock { - private String[] keysToDelete; + private HoodieKey[] keysToDelete; - public HoodieDeleteBlock(String[] keysToDelete, + public HoodieDeleteBlock(HoodieKey[] keysToDelete, Map header) { this(Optional.empty(), null, false, Optional.empty(), header, new HashMap<>()); this.keysToDelete = keysToDelete; @@ -64,15 +64,14 @@ public class HoodieDeleteBlock extends HoodieLogBlock { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(baos); - byte[] bytesToWrite = StringUtils.join(getKeysToDelete(), ",") - .getBytes(Charset.forName("utf-8")); + byte[] bytesToWrite = SerializationUtils.serialize(getKeysToDelete()); output.writeInt(HoodieLogBlock.version); output.writeInt(bytesToWrite.length); output.write(bytesToWrite); return baos.toByteArray(); } - public String[] getKeysToDelete() { + public HoodieKey[] getKeysToDelete() { try { if (keysToDelete == null) { if (!getContent().isPresent() && readBlockLazily) { @@ -86,7 +85,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock { int dataLength = dis.readInt(); byte[] data = new byte[dataLength]; dis.readFully(data); - this.keysToDelete = new String(data).split(","); + this.keysToDelete = SerializationUtils.deserialize(data); deflate(); } return keysToDelete; diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SpillableMapUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SpillableMapUtils.java index 79aa90ee6..cb0aab35f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SpillableMapUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SpillableMapUtils.java @@ -117,4 +117,15 @@ public class SpillableMapUtils { .loadPayload(payloadClazz, new Object[]{Optional.of(rec)}, Optional.class)); return (R) hoodieRecord; } + + /** + * Utility method to convert bytes to HoodieRecord using schema and payload class + */ + public static R generateEmptyPayload(String recKey, String partitionPath, String payloadClazz) { + HoodieRecord hoodieRecord = new HoodieRecord<>( + new HoodieKey(recKey, partitionPath), + ReflectionUtils + .loadPayload(payloadClazz, new Object[]{Optional.empty()}, Optional.class)); + return (R) hoodieRecord; + } } \ No newline at end of file diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 3c57b55f2..cc795ffcb 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import com.google.common.collect.Maps; import com.uber.hoodie.common.minicluster.MiniClusterUtil; import com.uber.hoodie.common.model.HoodieArchivedLogFile; +import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; @@ -43,6 +44,7 @@ import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.exception.CorruptedLogFileException; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; @@ -714,10 +716,13 @@ public class HoodieLogFormatTest { s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()); // Delete 50 keys - List deletedKeys = originalKeys.subList(0, 50); + List deletedKeys = copyOfRecords1.stream().map( + s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) + .collect(Collectors.toList()).subList(0, 50); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), header); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); writer = writer.appendBlock(deleteBlock); List allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, @@ -727,8 +732,19 @@ public class HoodieLogFormatTest { "102", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH); assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(200); + final List emptyPayloads = new ArrayList<>(); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); - assertEquals("Stream collect should return all 150 records", 150, readKeys.size()); + scanner.forEach(s -> { + try { + if (!s.getData().getInsertValue(schema).isPresent()) { + emptyPayloads.add(true); + } + } catch (IOException io) { + throw new UncheckedIOException(io); + } + }); + assertEquals("Stream collect should return all 200 records", 200, readKeys.size()); + assertEquals("Stream collect should return all 50 records with empty payloads", 50, emptyPayloads.size()); originalKeys.removeAll(deletedKeys); Collections.sort(originalKeys); Collections.sort(readKeys); @@ -782,8 +798,13 @@ public class HoodieLogFormatTest { s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()); // Delete 50 keys - List deletedKeys = originalKeys.subList(0, 50); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), header); + // Delete 50 keys + List deletedKeys = copyOfRecords1.stream().map( + s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) + .collect(Collectors.toList()).subList(0, 50); + + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); writer = writer.appendBlock(deleteBlock); // Attempt 1 : Write rollback block for a failed write @@ -839,8 +860,12 @@ public class HoodieLogFormatTest { s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()); // Delete 50 keys - List deletedKeys = originalKeys.subList(0, 50); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), header); + // Delete 50 keys + List deletedKeys = copyOfRecords1.stream().map( + s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) + .collect(Collectors.toList()).subList(0, 50); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); writer = writer.appendBlock(deleteBlock); // Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write @@ -921,8 +946,12 @@ public class HoodieLogFormatTest { s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()); // Delete 50 keys - List deletedKeys = originalKeys.subList(0, 50); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), header); + // Delete 50 keys + List deletedKeys = copyOfRecords1.stream().map( + s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) + .collect(Collectors.toList()).subList(0, 50); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header); writer = writer.appendBlock(deleteBlock); // Write 1 rollback block for a failed write diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java index 266e0d64c..b8ef2680f 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -24,7 +24,9 @@ import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner; import com.uber.hoodie.common.util.FSUtils; import java.io.IOException; import java.util.HashMap; +import java.util.Optional; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -60,13 +62,20 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme // but can return records for completed commits > the commit we are trying to read (if using // readCommit() API) for (HoodieRecord hoodieRecord : compactedLogRecordScanner) { - GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(getReaderSchema()).get(); + Optional recordOptional = hoodieRecord.getData().getInsertValue(getReaderSchema()); + ArrayWritable aWritable; String key = hoodieRecord.getRecordKey(); - // we assume, a later safe record in the log, is newer than what we have in the map & - // replace it. - // TODO : handle deletes here - ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema()); - deltaRecordMap.put(key, aWritable); + if (recordOptional.isPresent()) { + GenericRecord rec = (GenericRecord) recordOptional.get(); + // we assume, a later safe record in the log, is newer than what we have in the map & + // replace it. + // TODO : handle deletes here + aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema()); + deltaRecordMap.put(key, aWritable); + } else { + aWritable = new ArrayWritable(Writable.class, new Writable[0]); + deltaRecordMap.put(key, aWritable); + } if (LOG.isDebugEnabled()) { LOG.debug("Log record : " + arrayWritableToString(aWritable)); } @@ -92,8 +101,13 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key)))); } if (deltaRecordMap.containsKey(key)) { - // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro ? + // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the + // deltaRecord may not be a full record and needs values of columns from the parquet Writable[] replaceValue = deltaRecordMap.get(key).get(); + if (replaceValue.length < 1) { + // This record has been deleted, move to the next record + return next(aVoid, arrayWritable); + } Writable[] originalValue = arrayWritable.get(); try { System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);