1
0

[HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly (#2084)

This commit is contained in:
Balaji Varadarajan
2020-09-11 16:11:42 -07:00
committed by GitHub
parent a1cff8abae
commit 5e61454a6c
2 changed files with 151 additions and 4 deletions

View File

@@ -53,10 +53,12 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
this(record.get(), (record1) -> 0); // natural order
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
throws IOException {
IndexedRecord insertValue = getInsertValue(schema).get();
/**
*
* Handle a possible delete - check for "D" in Op column and return empty row if found.
* @param insertValue The new row that is being "inserted".
*/
private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertValue) throws IOException {
boolean delete = false;
if (insertValue instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertValue;
@@ -65,4 +67,17 @@ public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
return delete ? Option.empty() : Option.of(insertValue);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema).get();
return handleDeleteOperation(insertValue);
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema).get();
return handleDeleteOperation(insertValue);
}
}