1
0

Add delete support to Hoodie (#85)

This commit is contained in:
siddharthagunda
2017-03-04 01:33:49 -08:00
committed by prazanna
parent 41e08018fc
commit 348a48aa80
16 changed files with 241 additions and 68 deletions

View File

@@ -17,7 +17,6 @@
package com.uber.hoodie.index; package com.uber.hoodie.index;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
@@ -180,21 +179,29 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
while (statusIterator.hasNext()) { while (statusIterator.hasNext()) {
WriteStatus writeStatus = statusIterator.next(); WriteStatus writeStatus = statusIterator.next();
List<Put> puts = new ArrayList<>(); List<Put> puts = new ArrayList<>();
List<Delete> deletes = new ArrayList<>();
try { try {
for (HoodieRecord rec : writeStatus.getWrittenRecords()) { for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
if (!writeStatus.isErrored(rec.getKey())) { if (!writeStatus.isErrored(rec.getKey())) {
Put put = new Put(Bytes.toBytes(rec.getRecordKey())); java.util.Optional<HoodieRecordLocation> loc = rec.getNewLocation();
HoodieRecordLocation loc = rec.getNewLocation(); if(loc.isPresent()) {
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
Bytes.toBytes(loc.getCommitTime())); put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN,
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getCommitTime()));
Bytes.toBytes(loc.getFileId())); put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN,
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(loc.get().getFileId()));
Bytes.toBytes(rec.getPartitionPath())); put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN,
puts.add(put); Bytes.toBytes(rec.getPartitionPath()));
puts.add(put);
} else {
//Delete existing index for a deleted record
Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey()));
deletes.add(delete);
}
} }
} }
hTable.put(puts); hTable.put(puts);
hTable.delete(deletes);
hTable.flushCommits(); hTable.flushCommits();
} catch (Exception e) { } catch (Exception e) {
Exception we = new Exception("Error updating index for " + writeStatus, e); Exception we = new Exception("Error updating index for " + writeStatus, e);

View File

@@ -41,9 +41,8 @@ import java.util.concurrent.ConcurrentMap;
/** /**
* Hoodie Index implementation backed by an in-memory Hash map. * Hoodie Index implementation backed by an in-memory Hash map.
* * <p>
* ONLY USE FOR LOCAL TESTING * ONLY USE FOR LOCAL TESTING
*
*/ */
public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> { public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
@@ -56,7 +55,7 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieInde
@Override @Override
public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation( public JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> hoodieTable) { JavaRDD<HoodieKey> hoodieKeys, final HoodieTable<T> hoodieTable) {
throw new UnsupportedOperationException("InMemory index does not implement check exist yet"); throw new UnsupportedOperationException("InMemory index does not implement check exist yet");
} }
@@ -67,7 +66,7 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieInde
implements Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> { implements Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> {
@Override @Override
public Iterator<HoodieRecord<T>> call(Integer partitionNum, public Iterator<HoodieRecord<T>> call(Integer partitionNum,
Iterator<HoodieRecord<T>> hoodieRecordIterator) { Iterator<HoodieRecord<T>> hoodieRecordIterator) {
List<HoodieRecord<T>> taggedRecords = new ArrayList<>(); List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
while (hoodieRecordIterator.hasNext()) { while (hoodieRecordIterator.hasNext()) {
HoodieRecord<T> rec = hoodieRecordIterator.next(); HoodieRecord<T> rec = hoodieRecordIterator.next();
@@ -82,7 +81,7 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieInde
@Override @Override
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
HoodieTable<T> hoodieTable) { HoodieTable<T> hoodieTable) {
return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(), true); return recordRDD.mapPartitionsWithIndex(this.new LocationTagFunction(), true);
} }
@@ -94,7 +93,14 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieInde
public WriteStatus call(WriteStatus writeStatus) { public WriteStatus call(WriteStatus writeStatus) {
for (HoodieRecord record : writeStatus.getWrittenRecords()) { for (HoodieRecord record : writeStatus.getWrittenRecords()) {
if (!writeStatus.isErrored(record.getKey())) { if (!writeStatus.isErrored(record.getKey())) {
recordLocationMap.put(record.getKey(), record.getNewLocation()); HoodieKey key = record.getKey();
java.util.Optional<HoodieRecordLocation> newLocation = record.getNewLocation();
if (newLocation.isPresent()) {
recordLocationMap.put(key, newLocation.get());
} else {
//Delete existing index for a deleted record
recordLocationMap.remove(key);
}
} }
} }
return writeStatus; return writeStatus;

View File

@@ -54,6 +54,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private RollingAvroLogAppender logAppender; private RollingAvroLogAppender logAppender;
private List<HoodieRecord<T>> records; private List<HoodieRecord<T>> records;
private long recordsWritten = 0; private long recordsWritten = 0;
private long recordsDeleted = 0;
private HoodieLogFile currentLogFile; private HoodieLogFile currentLogFile;
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, public HoodieAppendHandle(HoodieWriteConfig config, String commitTime,
@@ -112,18 +113,24 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) { private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
try { try {
IndexedRecord avroRecord = hoodieRecord.getData().getInsertValue(schema); Optional<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(schema);
String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(),
recordIndex.getAndIncrement()); if(avroRecord.isPresent()) {
HoodieAvroUtils String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(),
.addHoodieKeyToRecord((GenericRecord) avroRecord, hoodieRecord.getRecordKey(), recordIndex.getAndIncrement());
hoodieRecord.getPartitionPath(), fileId); HoodieAvroUtils
HoodieAvroUtils .addHoodieKeyToRecord((GenericRecord) avroRecord.get(), hoodieRecord.getRecordKey(),
.addCommitMetadataToRecord((GenericRecord) avroRecord, commitTime, seqId); hoodieRecord.getPartitionPath(), fileId);
HoodieAvroUtils
.addCommitMetadataToRecord((GenericRecord) avroRecord.get(), commitTime, seqId);
recordsWritten++;
} else {
recordsDeleted++;
}
hoodieRecord.deflate(); hoodieRecord.deflate();
writeStatus.markSuccess(hoodieRecord); writeStatus.markSuccess(hoodieRecord);
recordsWritten++; return avroRecord;
return Optional.of(avroRecord);
} catch (Exception e) { } catch (Exception e) {
logger.error("Error writing record " + hoodieRecord, e); logger.error("Error writing record " + hoodieRecord, e);
writeStatus.markFailure(hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e);
@@ -149,6 +156,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
logAppender.close(); logAppender.close();
} }
writeStatus.getStat().setNumWrites(recordsWritten); writeStatus.getStat().setNumWrites(recordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
} catch (IOException e) { } catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e); throw new HoodieUpsertException("Failed to close UpdateHandle", e);

View File

@@ -34,6 +34,7 @@ import org.apache.log4j.Logger;
import org.apache.spark.TaskContext; import org.apache.spark.TaskContext;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
public class HoodieInsertHandle<T extends HoodieRecordPayload> extends HoodieIOHandle<T> { public class HoodieInsertHandle<T extends HoodieRecordPayload> extends HoodieIOHandle<T> {
@@ -42,7 +43,8 @@ public class HoodieInsertHandle<T extends HoodieRecordPayload> extends HoodieIOH
private final WriteStatus status; private final WriteStatus status;
private final HoodieStorageWriter<IndexedRecord> storageWriter; private final HoodieStorageWriter<IndexedRecord> storageWriter;
private final Path path; private final Path path;
private int recordsWritten = 0; private long recordsWritten = 0;
private long recordsDeleted = 0;
public HoodieInsertHandle(HoodieWriteConfig config, String commitTime, public HoodieInsertHandle(HoodieWriteConfig config, String commitTime,
HoodieTable<T> hoodieTable, String partitionPath) { HoodieTable<T> hoodieTable, String partitionPath) {
@@ -82,13 +84,19 @@ public class HoodieInsertHandle<T extends HoodieRecordPayload> extends HoodieIOH
*/ */
public void write(HoodieRecord record) { public void write(HoodieRecord record) {
try { try {
IndexedRecord avroRecord = record.getData().getInsertValue(schema); Optional<IndexedRecord> avroRecord = record.getData().getInsertValue(schema);
storageWriter.writeAvroWithMetadata(avroRecord, record);
status.markSuccess(record); if(avroRecord.isPresent()) {
// update the new location of record, so we know where to find it next storageWriter.writeAvroWithMetadata(avroRecord.get(), record);
record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId())); // update the new location of record, so we know where to find it next
record.setNewLocation(new HoodieRecordLocation(commitTime, status.getFileId()));
recordsWritten++;
} else {
recordsDeleted++;
}
record.deflate(); record.deflate();
recordsWritten++; status.markSuccess(record);
} catch (Throwable t) { } catch (Throwable t) {
// Not throwing exception from here, since we don't want to fail the entire job // Not throwing exception from here, since we don't want to fail the entire job
// for a single record // for a single record
@@ -111,6 +119,7 @@ public class HoodieInsertHandle<T extends HoodieRecordPayload> extends HoodieIOH
HoodieWriteStat stat = new HoodieWriteStat(); HoodieWriteStat stat = new HoodieWriteStat();
stat.setNumWrites(recordsWritten); stat.setNumWrites(recordsWritten);
stat.setNumDeletes(recordsDeleted);
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(status.getFileId()); stat.setFileId(status.getFileId());
stat.setFullPath(path.toString()); stat.setFullPath(path.toString());

View File

@@ -37,6 +37,7 @@ import org.apache.spark.TaskContext;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Optional;
@SuppressWarnings("Duplicates") @SuppressWarnings("Duplicates")
public class HoodieUpdateHandle <T extends HoodieRecordPayload> extends HoodieIOHandle<T> { public class HoodieUpdateHandle <T extends HoodieRecordPayload> extends HoodieIOHandle<T> {
@@ -48,6 +49,7 @@ public class HoodieUpdateHandle <T extends HoodieRecordPayload> extends HoodieIO
private Path newFilePath; private Path newFilePath;
private Path oldFilePath; private Path oldFilePath;
private long recordsWritten = 0; private long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0; private long updatedRecordsWritten = 0;
private String fileId; private String fileId;
@@ -118,13 +120,18 @@ public class HoodieUpdateHandle <T extends HoodieRecordPayload> extends HoodieIO
} }
private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, IndexedRecord indexedRecord) { private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, Optional<IndexedRecord> indexedRecord) {
try { try {
storageWriter.writeAvroWithMetadata(indexedRecord, hoodieRecord); if(indexedRecord.isPresent()) {
storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord);
recordsWritten++;
updatedRecordsWritten++;
} else {
recordsDeleted++;
}
hoodieRecord.deflate(); hoodieRecord.deflate();
writeStatus.markSuccess(hoodieRecord); writeStatus.markSuccess(hoodieRecord);
recordsWritten ++;
updatedRecordsWritten ++;
return true; return true;
} catch (Exception e) { } catch (Exception e) {
logger.error("Error writing record "+ hoodieRecord, e); logger.error("Error writing record "+ hoodieRecord, e);
@@ -142,8 +149,8 @@ public class HoodieUpdateHandle <T extends HoodieRecordPayload> extends HoodieIO
boolean copyOldRecord = true; boolean copyOldRecord = true;
if (keyToNewRecords.containsKey(key)) { if (keyToNewRecords.containsKey(key)) {
try { try {
IndexedRecord avroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, schema); Optional<IndexedRecord> combinedAvroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, schema);
if (writeUpdateRecord(hoodieRecord, avroRecord)) { if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
/* ONLY WHEN /* ONLY WHEN
* 1) we have an update for this key AND * 1) we have an update for this key AND
* 2) We are able to successfully write the the combined new value * 2) We are able to successfully write the the combined new value
@@ -194,8 +201,10 @@ public class HoodieUpdateHandle <T extends HoodieRecordPayload> extends HoodieIO
if (storageWriter != null) { if (storageWriter != null) {
storageWriter.close(); storageWriter.close();
} }
writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, newFilePath)); writeStatus.getStat().setTotalWriteBytes(FSUtils.getFileSize(fs, newFilePath));
writeStatus.getStat().setNumWrites(recordsWritten); writeStatus.getStat().setNumWrites(recordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
} catch (IOException e) { } catch (IOException e) {

View File

@@ -256,6 +256,85 @@ public class TestHoodieClient implements Serializable {
readClient.readCommit(newCommitTime).count(), readClient.readCommit(newCommitTime).count(),
readClient.readSince("001").count()); readClient.readSince("001").count());
} }
@Test
public void testDeletes() throws Exception {
HoodieWriteConfig cfg = getConfig();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
FileSystem fs = FSUtils.getFs();
/**
* Write 1 (inserts and deletes)
* Write actual 200 insert records and ignore 100 delete records
*/
String newCommitTime = "001";
List<HoodieRecord> fewRecordsForInsert = dataGen.generateInserts(newCommitTime, 200);
List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletes(newCommitTime, 100);
List<HoodieRecord> records = new ArrayList(fewRecordsForInsert);
records.addAll(fewRecordsForDelete);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
// verify that there is a commit
HoodieReadClient readClient = new HoodieReadClient(jsc, basePath, sqlContext);
assertEquals("Expecting a single commit.", readClient.listCommitsSince("000").size(), 1);
assertEquals("Latest commit should be 001",readClient.latestCommit(), newCommitTime);
assertEquals("Must contain 200 records", readClient.readCommit(newCommitTime).count(), fewRecordsForInsert.size());
// Should have 100 records in table (check using Index), all in locations marked at commit
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(fewRecordsForInsert, 1), table).collect();
checkTaggedRecords(taggedRecords, "001");
/**
* Write 2 (deletes+writes)
*/
newCommitTime = "004";
fewRecordsForDelete = records.subList(0,50);
List<HoodieRecord> fewRecordsForUpdate = records.subList(50,100);
records = dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete);
records.addAll(fewRecordsForUpdate);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
// verify there are now 2 commits
readClient = new HoodieReadClient(jsc, basePath, sqlContext);
assertEquals("Expecting two commits.", readClient.listCommitsSince("000").size(), 2);
assertEquals("Latest commit should be 004",readClient.latestCommit(), newCommitTime);
metaClient = new HoodieTableMetaClient(fs, basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig());
// Check the entire dataset has 150 records(200-50) still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i=0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
}
assertEquals("Must contain 150 records", readClient.read(fullPartitionPaths).count(), 150);
// Check that the incremental consumption from time 000
assertEquals("Incremental consumption from latest commit, should give 50 updated records",
readClient.readCommit(newCommitTime).count(),
50);
assertEquals("Incremental consumption from time 001, should give 50 updated records",
50,
readClient.readSince("001").count());
assertEquals("Incremental consumption from time 000, should give 150",
150,
readClient.readSince("000").count());
}
@Test @Test
public void testInsertAndCleanByVersions() throws Exception { public void testInsertAndCleanByVersions() throws Exception {
int maxVersions = 2; // keep upto 2 versions for each file int maxVersions = 2; // keep upto 2 versions for each file

View File

@@ -34,10 +34,7 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Random;
import java.util.UUID;
/** /**
* Class to be used in tests to keep generating test inserts and updates against a corpus. * Class to be used in tests to keep generating test inserts and updates against a corpus.
@@ -100,6 +97,26 @@ public class HoodieTestDataGenerator {
return inserts; return inserts;
} }
public List<HoodieRecord> generateDeletes(String commitTime, int n) throws IOException {
List<HoodieRecord> inserts = generateInserts(commitTime, n);
return generateDeletesFromExistingRecords(inserts);
}
public List<HoodieRecord> generateDeletesFromExistingRecords(List<HoodieRecord> existingRecords) throws IOException {
List<HoodieRecord> deletes = new ArrayList<>();
for (HoodieRecord existingRecord: existingRecords) {
HoodieRecord record = generateDeleteRecord(existingRecord);
deletes.add(record);
}
return deletes;
}
public HoodieRecord generateDeleteRecord(HoodieRecord existingRecord) throws IOException {
HoodieKey key = existingRecord.getKey();
TestRawTripPayload payload = new TestRawTripPayload(Optional.empty(), key.getRecordKey(), key.getPartitionPath(), null, true);
return new HoodieRecord(key, payload);
}
public List<HoodieRecord> generateUpdates(String commitTime, List<HoodieRecord> baseRecords) throws IOException { public List<HoodieRecord> generateUpdates(String commitTime, List<HoodieRecord> baseRecords) throws IOException {
List<HoodieRecord> updates = new ArrayList<>(); List<HoodieRecord> updates = new ArrayList<>();

View File

@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils;
import java.io.*; import java.io.*;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
@@ -41,12 +42,22 @@ public class TestRawTripPayload implements HoodieRecordPayload<TestRawTripPayloa
private String rowKey; private String rowKey;
private byte[] jsonDataCompressed; private byte[] jsonDataCompressed;
private int dataSize; private int dataSize;
private boolean isDeleted;
public TestRawTripPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException { public TestRawTripPayload(Optional<String> jsonData, String rowKey, String partitionPath,
this.jsonDataCompressed = compressData(jsonData); String schemaStr, Boolean isDeleted) throws IOException {
this.dataSize = jsonData.length(); if(jsonData.isPresent()) {
this.jsonDataCompressed = compressData(jsonData.get());
this.dataSize = jsonData.get().length();
}
this.rowKey = rowKey; this.rowKey = rowKey;
this.partitionPath = partitionPath; this.partitionPath = partitionPath;
this.isDeleted = isDeleted;
}
public TestRawTripPayload(String jsonData, String rowKey, String partitionPath,
String schemaStr)throws IOException {
this(Optional.of(jsonData), rowKey, partitionPath, schemaStr, false);
} }
public TestRawTripPayload(String jsonData) throws IOException { public TestRawTripPayload(String jsonData) throws IOException {
@@ -55,6 +66,7 @@ public class TestRawTripPayload implements HoodieRecordPayload<TestRawTripPayloa
Map<String, Object> jsonRecordMap = mapper.readValue(jsonData, Map.class); Map<String, Object> jsonRecordMap = mapper.readValue(jsonData, Map.class);
this.rowKey = jsonRecordMap.get("_row_key").toString(); this.rowKey = jsonRecordMap.get("_row_key").toString();
this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/"); this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/");
this.isDeleted = false;
} }
public String getPartitionPath() { public String getPartitionPath() {
@@ -66,20 +78,24 @@ public class TestRawTripPayload implements HoodieRecordPayload<TestRawTripPayloa
return another; return another;
} }
@Override public IndexedRecord combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException { @Override public Optional<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException {
return this.getInsertValue(schema); return this.getInsertValue(schema);
} }
@Override public IndexedRecord getInsertValue(Schema schema) throws IOException { @Override public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema); if(isDeleted){
return jsonConverter.convert(getJsonData()); return Optional.empty();
} else {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema);
return Optional.of(jsonConverter.convert(getJsonData()));
}
} }
public String getRowKey() { public String getRowKey() {
return rowKey; return rowKey;
} }
public String getJsonData() throws IOException { private String getJsonData() throws IOException {
return unCompressData(jsonDataCompressed); return unCompressData(jsonDataCompressed);
} }

View File

@@ -421,7 +421,7 @@ public class TestHoodieBloomIndex {
int seqId = 1; int seqId = 1;
String commitTime = FSUtils.getCommitTime(filename); String commitTime = FSUtils.getCommitTime(filename);
for (HoodieRecord record : records) { for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema); GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++); HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, commitTime, "" + seqId++);
HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename); HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename);
writer.write(avroRecord); writer.write(avroRecord);

View File

@@ -30,6 +30,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Optional;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
@@ -47,13 +48,13 @@ public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload>
return this; return this;
} }
@Override public IndexedRecord combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException { @Override public Optional<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException {
return getInsertValue(schema); return getInsertValue(schema);
} }
@Override public IndexedRecord getInsertValue(Schema schema) throws IOException { @Override public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema); MercifulJsonConverter jsonConverter = new MercifulJsonConverter(schema);
return jsonConverter.convert(getJsonData()); return Optional.of(jsonConverter.convert(getJsonData()));
} }
private String getJsonData() throws IOException { private String getJsonData() throws IOException {

View File

@@ -17,6 +17,9 @@
package com.uber.hoodie.common.model; package com.uber.hoodie.common.model;
import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.HoodieAvroUtils;
import java.util.Optional;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
@@ -28,9 +31,9 @@ import java.io.IOException;
* Useful to create a HoodieRecord over existing GenericRecords in a hoodie datasets (useful in compactions) * Useful to create a HoodieRecord over existing GenericRecords in a hoodie datasets (useful in compactions)
*/ */
public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload> { public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload> {
private final GenericRecord record; private final Optional<GenericRecord> record;
public HoodieAvroPayload(GenericRecord record) { public HoodieAvroPayload(Optional<GenericRecord> record) {
this.record = record; this.record = record;
} }
@@ -40,13 +43,13 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
} }
@Override @Override
public IndexedRecord combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) public Optional<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
throws IOException { throws IOException {
return getInsertValue(schema); return getInsertValue(schema);
} }
@Override @Override
public IndexedRecord getInsertValue(Schema schema) throws IOException { public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
return HoodieAvroUtils.rewriteRecord(record, schema); return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema));
} }
} }

View File

@@ -19,6 +19,7 @@ package com.uber.hoodie.common.model;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import java.io.Serializable; import java.io.Serializable;
import java.util.Optional;
/** /**
* A Single Record managed by Hoodie TODO - Make this generic * A Single Record managed by Hoodie TODO - Make this generic
@@ -101,8 +102,8 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
return this; return this;
} }
public HoodieRecordLocation getNewLocation() { public Optional<HoodieRecordLocation> getNewLocation() {
return this.newLocation; return Optional.of(this.newLocation);
} }
public boolean isCurrentLocationKnown() { public boolean isCurrentLocationKnown() {

View File

@@ -21,6 +21,7 @@ import org.apache.avro.generic.IndexedRecord;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Optional;
/** /**
* Every Hoodie dataset has an implementation of the <code>HoodieRecordPayload</code> * Every Hoodie dataset has an implementation of the <code>HoodieRecordPayload</code>
@@ -44,14 +45,15 @@ public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Seri
* *
* @param currentValue Current value in storage, to merge/combine this payload with * @param currentValue Current value in storage, to merge/combine this payload with
* @param schema Schema used for record * @param schema Schema used for record
* @return new combined/merged value to be written back to storage * @return new combined/merged value to be written back to storage. EMPTY to skip writing this record.
*/ */
IndexedRecord combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException; Optional<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException;
/** /**
* Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. * Generates an avro record out of the given HoodieRecordPayload, to be written out to storage.
* Called when writing a new value for the given HoodieKey, wherein there is no existing record in * Called when writing a new value for the given HoodieKey, wherein there is no existing record in
* storage to be combined against. (i.e insert) * storage to be combined against. (i.e insert)
* Return EMPTY to skip writing this record.
*/ */
IndexedRecord getInsertValue(Schema schema) throws IOException; Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException;
} }

View File

@@ -51,6 +51,11 @@ public class HoodieWriteStat implements Serializable {
*/ */
private long numWrites; private long numWrites;
/**
* Total number of records deleted.
*/
private long numDeletes;
/** /**
* Total number of records actually changed. (0 for inserts) * Total number of records actually changed. (0 for inserts)
*/ */
@@ -86,6 +91,10 @@ public class HoodieWriteStat implements Serializable {
this.numWrites = numWrites; this.numWrites = numWrites;
} }
public void setNumDeletes(long numDeletes) {
this.numDeletes = numDeletes;
}
public void setNumUpdateWrites(long numUpdateWrites) { public void setNumUpdateWrites(long numUpdateWrites) {
this.numUpdateWrites = numUpdateWrites; this.numUpdateWrites = numUpdateWrites;
} }
@@ -110,6 +119,10 @@ public class HoodieWriteStat implements Serializable {
return numWrites; return numWrites;
} }
public long getNumDeletes() {
return numDeletes;
}
public long getNumUpdateWrites() { public long getNumUpdateWrites() {
return numUpdateWrites; return numUpdateWrites;
} }
@@ -129,6 +142,7 @@ public class HoodieWriteStat implements Serializable {
.append("fullPath='" + fullPath + '\'') .append("fullPath='" + fullPath + '\'')
.append(", prevCommit='" + prevCommit + '\'') .append(", prevCommit='" + prevCommit + '\'')
.append(", numWrites=" + numWrites) .append(", numWrites=" + numWrites)
.append(", numDeletes=" + numDeletes)
.append(", numUpdateWrites=" + numUpdateWrites) .append(", numUpdateWrites=" + numUpdateWrites)
.append(", numWriteBytes=" + totalWriteBytes) .append(", numWriteBytes=" + totalWriteBytes)
.append('}') .append('}')

View File

@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Optional;
public class AvroUtils { public class AvroUtils {
@@ -58,7 +59,7 @@ public class AvroUtils {
String partitionPath = String partitionPath =
deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); deltaRecord.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath), loadedRecords.add(new HoodieRecord<>(new HoodieKey(key, partitionPath),
new HoodieAvroPayload(deltaRecord))); new HoodieAvroPayload(Optional.of(deltaRecord))));
} }
fileReader.close(); // also closes underlying FsInput fileReader.close(); // also closes underlying FsInput
} catch (IOException e) { } catch (IOException e) {

View File

@@ -193,7 +193,7 @@ public class HoodieTestUtils {
AvroLogAppender log = new AvroLogAppender(logConfig); AvroLogAppender log = new AvroLogAppender(logConfig);
log.append(s.getValue().stream().map(r -> { log.append(s.getValue().stream().map(r -> {
try { try {
GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema); GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
HoodieAvroUtils.addHoodieKeyToRecord(val, HoodieAvroUtils.addHoodieKeyToRecord(val,
r.getRecordKey(), r.getRecordKey(),
r.getPartitionPath(), r.getPartitionPath(),