diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java index 314039b3a..8ef567b97 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HBaseIndex.java @@ -17,7 +17,6 @@ package com.uber.hoodie.index; import com.google.common.base.Optional; -import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.config.HoodieWriteConfig; @@ -180,21 +179,29 @@ public class HBaseIndex extends HoodieIndex { while (statusIterator.hasNext()) { WriteStatus writeStatus = statusIterator.next(); List puts = new ArrayList<>(); + List deletes = new ArrayList<>(); try { for (HoodieRecord rec : writeStatus.getWrittenRecords()) { if (!writeStatus.isErrored(rec.getKey())) { - Put put = new Put(Bytes.toBytes(rec.getRecordKey())); - HoodieRecordLocation loc = rec.getNewLocation(); - put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, - Bytes.toBytes(loc.getCommitTime())); - put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, - Bytes.toBytes(loc.getFileId())); - put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, - Bytes.toBytes(rec.getPartitionPath())); - puts.add(put); + java.util.Optional loc = rec.getNewLocation(); + if(loc.isPresent()) { + Put put = new Put(Bytes.toBytes(rec.getRecordKey())); + put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, + Bytes.toBytes(loc.get().getCommitTime())); + put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, + Bytes.toBytes(loc.get().getFileId())); + put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, + Bytes.toBytes(rec.getPartitionPath())); + puts.add(put); + } else { + //Delete existing index for a deleted record + Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey())); + deletes.add(delete); + } } } hTable.put(puts); + hTable.delete(deletes); hTable.flushCommits(); } catch (Exception e) { Exception we = new Exception("Error updating index for " + writeStatus, e); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java index a546b4bc1..81b96ff8a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java @@ -41,9 +41,8 @@ import java.util.concurrent.ConcurrentMap; /** * Hoodie Index implementation backed by an in-memory Hash map. - * + *

* ONLY USE FOR LOCAL TESTING - * */ public class InMemoryHashIndex extends HoodieIndex { @@ -56,7 +55,7 @@ public class InMemoryHashIndex extends HoodieInde @Override public JavaPairRDD> fetchRecordLocation( - JavaRDD hoodieKeys, final HoodieTable hoodieTable) { + JavaRDD hoodieKeys, final HoodieTable hoodieTable) { throw new UnsupportedOperationException("InMemory index does not implement check exist yet"); } @@ -67,7 +66,7 @@ public class InMemoryHashIndex extends HoodieInde implements Function2>, Iterator>> { @Override public Iterator> call(Integer partitionNum, - Iterator> hoodieRecordIterator) { + Iterator> hoodieRecordIterator) { List> taggedRecords = new ArrayList<>(); while (hoodieRecordIterator.hasNext()) { HoodieRecord rec = hoodieRecordIterator.next(); @@ -82,7 +81,7 @@ public class InMemoryHashIndex extends HoodieInde @Override public JavaRDD> tagLocation(JavaRDD> recordRDD, - HoodieTable hoodieTable) { + HoodieTable hoodieTable) { return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(), true); } @@ -94,7 +93,14 @@ public class InMemoryHashIndex extends HoodieInde public WriteStatus call(WriteStatus writeStatus) { for (HoodieRecord record : writeStatus.getWrittenRecords()) { if (!writeStatus.isErrored(record.getKey())) { - recordLocationMap.put(record.getKey(), record.getNewLocation()); + HoodieKey key = record.getKey(); + java.util.Optional newLocation = record.getNewLocation(); + if (newLocation.isPresent()) { + recordLocationMap.put(key, newLocation.get()); + } else { + //Delete existing index for a deleted record + recordLocationMap.remove(key); + } } } return writeStatus; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 272c98e1e..a10a89431 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -54,6 +54,7 @@ public class HoodieAppendHandle extends HoodieIOH private RollingAvroLogAppender logAppender; private List> records; private long recordsWritten = 0; + private long recordsDeleted = 0; private HoodieLogFile currentLogFile; public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, @@ -112,18 +113,24 @@ public class HoodieAppendHandle extends HoodieIOH private Optional getIndexedRecord(HoodieRecord hoodieRecord) { try { - IndexedRecord avroRecord = hoodieRecord.getData().getInsertValue(schema); - String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(), - recordIndex.getAndIncrement()); - HoodieAvroUtils - .addHoodieKeyToRecord((GenericRecord) avroRecord, hoodieRecord.getRecordKey(), - hoodieRecord.getPartitionPath(), fileId); - HoodieAvroUtils - .addCommitMetadataToRecord((GenericRecord) avroRecord, commitTime, seqId); + Optional avroRecord = hoodieRecord.getData().getInsertValue(schema); + + if(avroRecord.isPresent()) { + String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(), + recordIndex.getAndIncrement()); + HoodieAvroUtils + .addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(), + hoodieRecord.getPartitionPath(), fileId); + HoodieAvroUtils + .addCommitMetadataToRecord((GenericRecord) avroRecord.get(), commitTime, seqId); + recordsWritten++; + } else { + recordsDeleted++; + } + hoodieRecord.deflate(); writeStatus.markSuccess(hoodieRecord); - recordsWritten++; - return Optional.of(avroRecord); + return avroRecord; } catch (Exception e) { logger.error("Error writing record " + hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e); @@ -149,6 +156,7 @@ public class HoodieAppendHandle extends HoodieIOH logAppender.close(); } writeStatus.getStat().setNumWrites(recordsWritten); + writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java index 6b799f833..ed66fc586 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java @@ -34,6 +34,7 @@ import org.apache.log4j.Logger; import org.apache.spark.TaskContext; import java.io.IOException; +import java.util.Optional; import java.util.UUID; public class HoodieInsertHandle extends HoodieIOHandle { @@ -42,7 +43,8 @@ public class HoodieInsertHandle extends HoodieIOH private final WriteStatus status; private final HoodieStorageWriter storageWriter; private final Path path; - private int recordsWritten = 0; + private long recordsWritten = 0; + private long recordsDeleted = 0; public HoodieInsertHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath) { @@ -82,13 +84,19 @@ public class HoodieInsertHandle extends HoodieIOH */ public void write(HoodieRecord record) { try { - IndexedRecord avroRecord = record.getData().getInsertValue(schema); - storageWriter.writeAvroWithMetadata(avroRecord, record); - status.markSuccess(record); - // update the new location of record, so we know where to find it next - record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId())); + Optional avroRecord = record.getData().getInsertValue(schema); + + if(avroRecord.isPresent()) { + storageWriter.writeAvroWithMetadata(avroRecord.get(), record); + // update the new location of record, so we know where to find it next + record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId())); + recordsWritten++; + } else { + recordsDeleted++; + } + record.deflate(); - recordsWritten++; + status.markSuccess(record); } catch (Throwable t) { // Not throwing exception from here, since we don't want to fail the entire job // for a single record @@ -111,6 +119,7 @@ public class HoodieInsertHandle extends HoodieIOH HoodieWriteStat stat = new HoodieWriteStat(); stat.setNumWrites(recordsWritten); + stat.setNumDeletes(recordsDeleted); stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setFileId(status.getFileId()); stat.setFullPath(path.toString()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java index 01933e73d..09c6a834e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java @@ -37,6 +37,7 @@ import org.apache.spark.TaskContext; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; +import java.util.Optional; @SuppressWarnings("Duplicates") public class HoodieUpdateHandle extends HoodieIOHandle { @@ -48,6 +49,7 @@ public class HoodieUpdateHandle extends HoodieIO private Path newFilePath; private Path oldFilePath; private long recordsWritten = 0; + private long recordsDeleted = 0; private long updatedRecordsWritten = 0; private String fileId; @@ -118,13 +120,18 @@ public class HoodieUpdateHandle extends HoodieIO } - private boolean writeUpdateRecord(HoodieRecord hoodieRecord, IndexedRecord indexedRecord) { + private boolean writeUpdateRecord(HoodieRecord hoodieRecord, Optional indexedRecord) { try { - storageWriter.writeAvroWithMetadata(indexedRecord, hoodieRecord); + if(indexedRecord.isPresent()) { + storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord); + recordsWritten++; + updatedRecordsWritten++; + } else { + recordsDeleted++; + } + hoodieRecord.deflate(); writeStatus.markSuccess(hoodieRecord); - recordsWritten ++; - updatedRecordsWritten ++; return true; } catch (Exception e) { logger.error("Error writing record "+ hoodieRecord, e); @@ -142,8 +149,8 @@ public class HoodieUpdateHandle extends HoodieIO boolean copyOldRecord = true; if (keyToNewRecords.containsKey(key)) { try { - IndexedRecord avroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, schema); - if (writeUpdateRecord(hoodieRecord, avroRecord)) { + Optional combinedAvroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, schema); + if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) { /* ONLY WHEN * 1) we have an update for this key AND * 2) We are able to successfully write the the combined new value @@ -194,8 +201,10 @@ public class HoodieUpdateHandle extends HoodieIO if (storageWriter != null) { storageWriter.close(); } + writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, newFilePath)); writeStatus.getStat().setNumWrites(recordsWritten); + writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); } catch (IOException e) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index a67d4f93d..aa83eec89 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -256,6 +256,85 @@ public class TestHoodieClient implements Serializable { readClient.readCommit(newCommitTime).count(), readClient.readSince("001").count()); } + + @Test + public void testDeletes() throws Exception { + + HoodieWriteConfig cfg = getConfig(); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); + FileSystem fs = FSUtils.getFs(); + + /** + * Write 1 (inserts and deletes) + * Write actual 200 insert records and ignore 100 delete records + */ + String newCommitTime = "001"; + List fewRecordsForInsert = dataGen.generateInserts(newCommitTime, 200); + List fewRecordsForDelete = dataGen.generateDeletes(newCommitTime, 100); + + List records = new ArrayList(fewRecordsForInsert); + records.addAll(fewRecordsForDelete); + + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + // verify that there is a commit + HoodieReadClient readClient = new HoodieReadClient(jsc, basePath, sqlContext); + assertEquals("Expecting a single commit.", readClient.listCommitsSince("000").size(), 1); + assertEquals("Latest commit should be 001",readClient.latestCommit(), newCommitTime); + assertEquals("Must contain 200 records", readClient.readCommit(newCommitTime).count(), fewRecordsForInsert.size()); + // Should have 100 records in table (check using Index), all in locations marked at commit + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + + List taggedRecords = index.tagLocation(jsc.parallelize(fewRecordsForInsert, 1), table).collect(); + checkTaggedRecords(taggedRecords, "001"); + + /** + * Write 2 (deletes+writes) + */ + newCommitTime = "004"; + fewRecordsForDelete = records.subList(0,50); + List fewRecordsForUpdate = records.subList(50,100); + records = dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete); + + records.addAll(fewRecordsForUpdate); + + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + // verify there are now 2 commits + readClient = new HoodieReadClient(jsc, basePath, sqlContext); + assertEquals("Expecting two commits.", readClient.listCommitsSince("000").size(), 2); + assertEquals("Latest commit should be 004",readClient.latestCommit(), newCommitTime); + + metaClient = new HoodieTableMetaClient(fs, basePath); + table = HoodieTable.getHoodieTable(metaClient, getConfig()); + + // Check the entire dataset has 150 records(200-50) still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i=0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain 150 records", readClient.read(fullPartitionPaths).count(), 150); + + + // Check that the incremental consumption from time 000 + assertEquals("Incremental consumption from latest commit, should give 50 updated records", + readClient.readCommit(newCommitTime).count(), + 50); + assertEquals("Incremental consumption from time 001, should give 50 updated records", + 50, + readClient.readSince("001").count()); + assertEquals("Incremental consumption from time 000, should give 150", + 150, + readClient.readSince("000").count()); + } + @Test public void testInsertAndCleanByVersions() throws Exception { int maxVersions = 2; // keep upto 2 versions for each file diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index 47bddde2b..6eff5fe4f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -34,10 +34,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.UUID; +import java.util.*; /** * Class to be used in tests to keep generating test inserts and updates against a corpus. @@ -100,6 +97,26 @@ public class HoodieTestDataGenerator { return inserts; } + public List generateDeletes(String commitTime, int n) throws IOException { + List inserts = generateInserts(commitTime, n); + return generateDeletesFromExistingRecords(inserts); + } + + public List generateDeletesFromExistingRecords(List existingRecords) throws IOException { + List deletes = new ArrayList<>(); + for (HoodieRecord existingRecord: existingRecords) { + HoodieRecord record = generateDeleteRecord(existingRecord); + deletes.add(record); + + } + return deletes; + } + + public HoodieRecord generateDeleteRecord(HoodieRecord existingRecord) throws IOException { + HoodieKey key = existingRecord.getKey(); + TestRawTripPayload payload = new TestRawTripPayload(Optional.empty(), key.getRecordKey(), key.getPartitionPath(), null, true); + return new HoodieRecord(key, payload); + } public List generateUpdates(String commitTime, List baseRecords) throws IOException { List updates = new ArrayList<>(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java b/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java index 6d8182275..a672cb31b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/TestRawTripPayload.java @@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils; import java.io.*; import java.util.Map; +import java.util.Optional; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; @@ -41,12 +42,22 @@ public class TestRawTripPayload implements HoodieRecordPayload jsonData, String rowKey, String partitionPath, + String schemaStr, Boolean isDeleted) throws IOException { + if(jsonData.isPresent()) { + this.jsonDataCompressed = compressData(jsonData.get()); + this.dataSize = jsonData.get().length(); + } this.rowKey = rowKey; this.partitionPath = partitionPath; + this.isDeleted = isDeleted; + } + + public TestRawTripPayload(String jsonData, String rowKey, String partitionPath, + String schemaStr)throws IOException { + this(Optional.of(jsonData), rowKey, partitionPath, schemaStr, false); } public TestRawTripPayload(String jsonData) throws IOException { @@ -55,6 +66,7 @@ public class TestRawTripPayload implements HoodieRecordPayload jsonRecordMap = mapper.readValue(jsonData, Map.class); this.rowKey = jsonRecordMap.get("_row_key").toString(); this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/"); + this.isDeleted = false; } public String getPartitionPath() { @@ -66,20 +78,24 @@ public class TestRawTripPayload implements HoodieRecordPayload combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException { return this.getInsertValue(schema); } - @Override public IndexedRecord getInsertValue(Schema schema) throws IOException { - MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema); - return jsonConverter.convert(getJsonData()); + @Override public Optional getInsertValue(Schema schema) throws IOException { + if(isDeleted){ + return Optional.empty(); + } else { + MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema); + return Optional.of(jsonConverter.convert(getJsonData())); + } } public String getRowKey() { return rowKey; } - public String getJsonData() throws IOException { + private String getJsonData() throws IOException { return unCompressData(jsonDataCompressed); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java index 76bf3aa19..0cf66657c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHoodieBloomIndex.java @@ -421,7 +421,7 @@ public class TestHoodieBloomIndex { int seqId = 1; String commitTime = FSUtils.getCommitTime(filename); for (HoodieRecord record : records) { - GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema); + GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++); HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename); writer.write(avroRecord); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieJsonPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieJsonPayload.java index fae46a6b0..85ccfc744 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieJsonPayload.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieJsonPayload.java @@ -30,6 +30,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.StringWriter; +import java.util.Optional; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; @@ -47,13 +48,13 @@ public class HoodieJsonPayload implements HoodieRecordPayload return this; } - @Override public IndexedRecord combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException { + @Override public Optional combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException { return getInsertValue(schema); } - @Override public IndexedRecord getInsertValue(Schema schema) throws IOException { + @Override public Optional getInsertValue(Schema schema) throws IOException { MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema); - return jsonConverter.convert(getJsonData()); + return Optional.of(jsonConverter.convert(getJsonData())); } private String getJsonData() throws IOException { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java index e552d7e13..164766dc7 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java @@ -17,6 +17,9 @@ package com.uber.hoodie.common.model; import com.uber.hoodie.common.util.HoodieAvroUtils; + +import java.util.Optional; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -28,9 +31,9 @@ import java.io.IOException; * Useful to create a HoodieRecord over existing GenericRecords in a hoodie datasets (useful in compactions) */ public class HoodieAvroPayload implements HoodieRecordPayload { - private final GenericRecord record; + private final Optional record; - public HoodieAvroPayload(GenericRecord record) { + public HoodieAvroPayload(Optional record) { this.record = record; } @@ -40,13 +43,13 @@ public class HoodieAvroPayload implements HoodieRecordPayload } @Override - public IndexedRecord combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) - throws IOException { + public Optional combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) + throws IOException { return getInsertValue(schema); } @Override - public IndexedRecord getInsertValue(Schema schema) throws IOException { - return HoodieAvroUtils.rewriteRecord(record, schema); + public Optional getInsertValue(Schema schema) throws IOException { + return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema)); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java index 01f3fd42d..4b05a2e3c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecord.java @@ -19,6 +19,7 @@ package com.uber.hoodie.common.model; import com.google.common.base.Objects; import java.io.Serializable; +import java.util.Optional; /** * A Single Record managed by Hoodie TODO - Make this generic @@ -101,8 +102,8 @@ public class HoodieRecord implements Serializable return this; } - public HoodieRecordLocation getNewLocation() { - return this.newLocation; + public Optional getNewLocation() { + return Optional.of(this.newLocation); } public boolean isCurrentLocationKnown() { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordPayload.java index 28af727f1..1e0494dc8 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordPayload.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordPayload.java @@ -21,6 +21,7 @@ import org.apache.avro.generic.IndexedRecord; import java.io.IOException; import java.io.Serializable; +import java.util.Optional; /** * Every Hoodie dataset has an implementation of the HoodieRecordPayload @@ -44,14 +45,15 @@ public interface HoodieRecordPayload extends Seri * * @param currentValue Current value in storage, to merge/combine this payload with * @param schema Schema used for record - * @return new combined/merged value to be written back to storage + * @return new combined/merged value to be written back to storage. EMPTY to skip writing this record. */ - IndexedRecord combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException; + Optional combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException; /** * Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. * Called when writing a new value for the given HoodieKey, wherein there is no existing record in * storage to be combined against. (i.e insert) + * Return EMPTY to skip writing this record. */ - IndexedRecord getInsertValue(Schema schema) throws IOException; + Optional getInsertValue(Schema schema) throws IOException; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java index a29d07428..0b2417e0b 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java @@ -51,6 +51,11 @@ public class HoodieWriteStat implements Serializable { */ private long numWrites; + /** + * Total number of records deleted. + */ + private long numDeletes; + /** * Total number of records actually changed. (0 for inserts) */ @@ -86,6 +91,10 @@ public class HoodieWriteStat implements Serializable { this.numWrites = numWrites; } + public void setNumDeletes(long numDeletes) { + this.numDeletes = numDeletes; + } + public void setNumUpdateWrites(long numUpdateWrites) { this.numUpdateWrites = numUpdateWrites; } @@ -110,6 +119,10 @@ public class HoodieWriteStat implements Serializable { return numWrites; } + public long getNumDeletes() { + return numDeletes; + } + public long getNumUpdateWrites() { return numUpdateWrites; } @@ -129,6 +142,7 @@ public class HoodieWriteStat implements Serializable { .append("fullPath='" + fullPath + '\'') .append(", prevCommit='" + prevCommit + '\'') .append(", numWrites=" + numWrites) + .append(", numDeletes=" + numDeletes) .append(", numUpdateWrites=" + numUpdateWrites) .append(", numWriteBytes=" + totalWriteBytes) .append('}') diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index 707b191d5..5aa39070e 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; +import java.util.Optional; public class AvroUtils { @@ -58,7 +59,7 @@ public class AvroUtils { String partitionPath = deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath), - new HoodieAvroPayload(deltaRecord))); + new HoodieAvroPayload(Optional.of(deltaRecord)))); } fileReader.close(); // also closes underlying FsInput } catch (IOException e) { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 54864cf76..b882dd29b 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -193,7 +193,7 @@ public class HoodieTestUtils { AvroLogAppender log = new AvroLogAppender(logConfig); log.append(s.getValue().stream().map(r -> { try { - GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema); + GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get(); HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(),