Enabling hard deletes for MergeOnRead table type
This commit is contained in:
committed by
vinoth chandar
parent
345aaa31aa
commit
110df7190b
@@ -20,6 +20,7 @@ import com.beust.jcommander.internal.Maps;
|
||||
import com.uber.hoodie.WriteStatus;
|
||||
import com.uber.hoodie.common.model.FileSlice;
|
||||
import com.uber.hoodie.common.model.HoodieDeltaWriteStat;
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRecordLocation;
|
||||
@@ -67,7 +68,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
// Buffer for holding records in memory before they are flushed to disk
|
||||
private List<IndexedRecord> recordList = new ArrayList<>();
|
||||
// Buffer for holding records (to be deleted) in memory before they are flushed to disk
|
||||
private List<String> keysToDelete = new ArrayList<>();
|
||||
private List<HoodieKey> keysToDelete = new ArrayList<>();
|
||||
private TableFileSystemView.RealtimeView fileSystemView;
|
||||
private String partitionPath;
|
||||
private Iterator<HoodieRecord<T>> recordItr;
|
||||
@@ -209,7 +210,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
}
|
||||
if (keysToDelete.size() > 0) {
|
||||
writer = writer.appendBlock(
|
||||
new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new), header));
|
||||
new HoodieDeleteBlock(keysToDelete.stream().toArray(HoodieKey[]::new), header));
|
||||
keysToDelete.clear();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@@ -286,7 +287,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
if (indexedRecord.isPresent()) {
|
||||
recordList.add(indexedRecord.get());
|
||||
} else {
|
||||
keysToDelete.add(record.getRecordKey());
|
||||
keysToDelete.add(record.getKey());
|
||||
}
|
||||
numberOfRecords++;
|
||||
}
|
||||
|
||||
@@ -326,8 +326,8 @@ public class TestMergeOnReadTable {
|
||||
|
||||
List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
|
||||
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||
//Wrote 40 records and deleted 20 records, so remaining 40-20 = 20
|
||||
assertEquals("Must contain 20 records", 20, recordsRead.size());
|
||||
//Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
|
||||
assertEquals("Must contain 0 records", 0, recordsRead.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -36,7 +36,11 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
|
||||
|
||||
public HoodieAvroPayload(Optional<GenericRecord> record) {
|
||||
try {
|
||||
this.recordBytes = HoodieAvroUtils.avroToBytes(record.get());
|
||||
if (record.isPresent()) {
|
||||
this.recordBytes = HoodieAvroUtils.avroToBytes(record.get());
|
||||
} else {
|
||||
this.recordBytes = new byte[0];
|
||||
}
|
||||
} catch (IOException io) {
|
||||
throw new HoodieIOException("Cannot convert record to bytes", io);
|
||||
}
|
||||
@@ -55,6 +59,9 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
|
||||
|
||||
@Override
|
||||
public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
|
||||
if (recordBytes.length == 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
Optional<GenericRecord> record = Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
|
||||
return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema));
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ package com.uber.hoodie.common.table.log;
|
||||
import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
|
||||
import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;
|
||||
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
@@ -63,7 +64,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
private static final Logger log = LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
|
||||
|
||||
// Reader schema for the records
|
||||
private final Schema readerSchema;
|
||||
protected final Schema readerSchema;
|
||||
// Latest valid instant time
|
||||
// Log-Blocks belonging to inflight delta-instants are filtered-out using this high-watermark.
|
||||
private final String latestInstantTime;
|
||||
@@ -291,7 +292,7 @@ public abstract class AbstractHoodieLogRecordScanner {
|
||||
*
|
||||
* @param key Deleted record key
|
||||
*/
|
||||
protected abstract void processNextDeletedKey(String key);
|
||||
protected abstract void processNextDeletedKey(HoodieKey key);
|
||||
|
||||
/**
|
||||
* Process the set of log blocks belonging to the last instant which is read fully.
|
||||
|
||||
@@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.util.DefaultSizeEstimator;
|
||||
import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
|
||||
import com.uber.hoodie.common.util.HoodieTimer;
|
||||
import com.uber.hoodie.common.util.SpillableMapUtils;
|
||||
import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import java.io.IOException;
|
||||
@@ -102,10 +103,11 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) {
|
||||
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException {
|
||||
String key = hoodieRecord.getRecordKey();
|
||||
if (records.containsKey(key)) {
|
||||
// Merge and store the merged record
|
||||
// Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
|
||||
// done when a delete (empty payload) is encountered before or after an insert/update.
|
||||
HoodieRecordPayload combinedValue = records.get(key).getData().preCombine(hoodieRecord.getData());
|
||||
records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
|
||||
} else {
|
||||
@@ -115,10 +117,9 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processNextDeletedKey(String key) {
|
||||
// TODO : If delete is the only block written and/or records are present in parquet file
|
||||
// TODO : Mark as tombstone (optional.empty()) for data instead of deleting the entry
|
||||
records.remove(key);
|
||||
protected void processNextDeletedKey(HoodieKey hoodieKey) {
|
||||
records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
|
||||
hoodieKey.getPartitionPath(), getPayloadClassFQN()));
|
||||
}
|
||||
|
||||
public long getTotalTimeTakenToReadAndMergeBlocks() {
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package com.uber.hoodie.common.table.log;
|
||||
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import java.util.List;
|
||||
@@ -43,7 +44,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processNextDeletedKey(String key) {
|
||||
protected void processNextDeletedKey(HoodieKey key) {
|
||||
throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config");
|
||||
}
|
||||
|
||||
|
||||
@@ -16,16 +16,16 @@
|
||||
|
||||
package com.uber.hoodie.common.table.log.block;
|
||||
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.storage.SizeAwareDataInputStream;
|
||||
import com.uber.hoodie.common.util.StringUtils;
|
||||
import com.uber.hoodie.common.util.SerializationUtils;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@@ -36,9 +36,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||
*/
|
||||
public class HoodieDeleteBlock extends HoodieLogBlock {
|
||||
|
||||
private String[] keysToDelete;
|
||||
private HoodieKey[] keysToDelete;
|
||||
|
||||
public HoodieDeleteBlock(String[] keysToDelete,
|
||||
public HoodieDeleteBlock(HoodieKey[] keysToDelete,
|
||||
Map<HeaderMetadataType, String> header) {
|
||||
this(Optional.empty(), null, false, Optional.empty(), header, new HashMap<>());
|
||||
this.keysToDelete = keysToDelete;
|
||||
@@ -64,15 +64,14 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream output = new DataOutputStream(baos);
|
||||
byte[] bytesToWrite = StringUtils.join(getKeysToDelete(), ",")
|
||||
.getBytes(Charset.forName("utf-8"));
|
||||
byte[] bytesToWrite = SerializationUtils.serialize(getKeysToDelete());
|
||||
output.writeInt(HoodieLogBlock.version);
|
||||
output.writeInt(bytesToWrite.length);
|
||||
output.write(bytesToWrite);
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
public String[] getKeysToDelete() {
|
||||
public HoodieKey[] getKeysToDelete() {
|
||||
try {
|
||||
if (keysToDelete == null) {
|
||||
if (!getContent().isPresent() && readBlockLazily) {
|
||||
@@ -86,7 +85,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
|
||||
int dataLength = dis.readInt();
|
||||
byte[] data = new byte[dataLength];
|
||||
dis.readFully(data);
|
||||
this.keysToDelete = new String(data).split(",");
|
||||
this.keysToDelete = SerializationUtils.deserialize(data);
|
||||
deflate();
|
||||
}
|
||||
return keysToDelete;
|
||||
|
||||
@@ -117,4 +117,15 @@ public class SpillableMapUtils {
|
||||
.loadPayload(payloadClazz, new Object[]{Optional.of(rec)}, Optional.class));
|
||||
return (R) hoodieRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to convert bytes to HoodieRecord using schema and payload class
|
||||
*/
|
||||
public static <R> R generateEmptyPayload(String recKey, String partitionPath, String payloadClazz) {
|
||||
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(
|
||||
new HoodieKey(recKey, partitionPath),
|
||||
ReflectionUtils
|
||||
.loadPayload(payloadClazz, new Object[]{Optional.empty()}, Optional.class));
|
||||
return (R) hoodieRecord;
|
||||
}
|
||||
}
|
||||
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.uber.hoodie.common.minicluster.MiniClusterUtil;
|
||||
import com.uber.hoodie.common.model.HoodieArchivedLogFile;
|
||||
import com.uber.hoodie.common.model.HoodieKey;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieTableType;
|
||||
@@ -43,6 +44,7 @@ import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.common.util.SchemaTestUtil;
|
||||
import com.uber.hoodie.exception.CorruptedLogFileException;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -714,10 +716,13 @@ public class HoodieLogFormatTest {
|
||||
s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
|
||||
|
||||
// Delete 50 keys
|
||||
List<String> deletedKeys = originalKeys.subList(0, 50);
|
||||
List<HoodieKey> deletedKeys = copyOfRecords1.stream().map(
|
||||
s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
|
||||
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
|
||||
.collect(Collectors.toList()).subList(0, 50);
|
||||
|
||||
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
|
||||
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), header);
|
||||
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
|
||||
writer = writer.appendBlock(deleteBlock);
|
||||
|
||||
List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION,
|
||||
@@ -727,8 +732,19 @@ public class HoodieLogFormatTest {
|
||||
"102", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
|
||||
assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords());
|
||||
final List<String> readKeys = new ArrayList<>(200);
|
||||
final List<Boolean> emptyPayloads = new ArrayList<>();
|
||||
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
|
||||
assertEquals("Stream collect should return all 150 records", 150, readKeys.size());
|
||||
scanner.forEach(s -> {
|
||||
try {
|
||||
if (!s.getData().getInsertValue(schema).isPresent()) {
|
||||
emptyPayloads.add(true);
|
||||
}
|
||||
} catch (IOException io) {
|
||||
throw new UncheckedIOException(io);
|
||||
}
|
||||
});
|
||||
assertEquals("Stream collect should return all 200 records", 200, readKeys.size());
|
||||
assertEquals("Stream collect should return all 50 records with empty payloads", 50, emptyPayloads.size());
|
||||
originalKeys.removeAll(deletedKeys);
|
||||
Collections.sort(originalKeys);
|
||||
Collections.sort(readKeys);
|
||||
@@ -782,8 +798,13 @@ public class HoodieLogFormatTest {
|
||||
s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
|
||||
|
||||
// Delete 50 keys
|
||||
List<String> deletedKeys = originalKeys.subList(0, 50);
|
||||
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), header);
|
||||
// Delete 50 keys
|
||||
List<HoodieKey> deletedKeys = copyOfRecords1.stream().map(
|
||||
s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
|
||||
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
|
||||
.collect(Collectors.toList()).subList(0, 50);
|
||||
|
||||
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
|
||||
writer = writer.appendBlock(deleteBlock);
|
||||
|
||||
// Attempt 1 : Write rollback block for a failed write
|
||||
@@ -839,8 +860,12 @@ public class HoodieLogFormatTest {
|
||||
s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
|
||||
|
||||
// Delete 50 keys
|
||||
List<String> deletedKeys = originalKeys.subList(0, 50);
|
||||
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), header);
|
||||
// Delete 50 keys
|
||||
List<HoodieKey> deletedKeys = copyOfRecords1.stream().map(
|
||||
s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
|
||||
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
|
||||
.collect(Collectors.toList()).subList(0, 50);
|
||||
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
|
||||
writer = writer.appendBlock(deleteBlock);
|
||||
|
||||
// Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write
|
||||
@@ -921,8 +946,12 @@ public class HoodieLogFormatTest {
|
||||
s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList());
|
||||
|
||||
// Delete 50 keys
|
||||
List<String> deletedKeys = originalKeys.subList(0, 50);
|
||||
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), header);
|
||||
// Delete 50 keys
|
||||
List<HoodieKey> deletedKeys = copyOfRecords1.stream().map(
|
||||
s -> (new HoodieKey(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
|
||||
((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
|
||||
.collect(Collectors.toList()).subList(0, 50);
|
||||
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new HoodieKey[50]), header);
|
||||
writer = writer.appendBlock(deleteBlock);
|
||||
|
||||
// Write 1 rollback block for a failed write
|
||||
|
||||
@@ -24,7 +24,9 @@ import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
@@ -60,13 +62,20 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme
|
||||
// but can return records for completed commits > the commit we are trying to read (if using
|
||||
// readCommit() API)
|
||||
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : compactedLogRecordScanner) {
|
||||
GenericRecord rec = (GenericRecord) hoodieRecord.getData().getInsertValue(getReaderSchema()).get();
|
||||
Optional<IndexedRecord> recordOptional = hoodieRecord.getData().getInsertValue(getReaderSchema());
|
||||
ArrayWritable aWritable;
|
||||
String key = hoodieRecord.getRecordKey();
|
||||
// we assume, a later safe record in the log, is newer than what we have in the map &
|
||||
// replace it.
|
||||
// TODO : handle deletes here
|
||||
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema());
|
||||
deltaRecordMap.put(key, aWritable);
|
||||
if (recordOptional.isPresent()) {
|
||||
GenericRecord rec = (GenericRecord) recordOptional.get();
|
||||
// we assume, a later safe record in the log, is newer than what we have in the map &
|
||||
// replace it.
|
||||
// TODO : handle deletes here
|
||||
aWritable = (ArrayWritable) avroToArrayWritable(rec, getWriterSchema());
|
||||
deltaRecordMap.put(key, aWritable);
|
||||
} else {
|
||||
aWritable = new ArrayWritable(Writable.class, new Writable[0]);
|
||||
deltaRecordMap.put(key, aWritable);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Log record : " + arrayWritableToString(aWritable));
|
||||
}
|
||||
@@ -92,8 +101,13 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader impleme
|
||||
arrayWritableToString(arrayWritable), arrayWritableToString(deltaRecordMap.get(key))));
|
||||
}
|
||||
if (deltaRecordMap.containsKey(key)) {
|
||||
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro ?
|
||||
// TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the
|
||||
// deltaRecord may not be a full record and needs values of columns from the parquet
|
||||
Writable[] replaceValue = deltaRecordMap.get(key).get();
|
||||
if (replaceValue.length < 1) {
|
||||
// This record has been deleted, move to the next record
|
||||
return next(aVoid, arrayWritable);
|
||||
}
|
||||
Writable[] originalValue = arrayWritable.get();
|
||||
try {
|
||||
System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length);
|
||||
|
||||
Reference in New Issue
Block a user