Changing Update record failure semantics to be consistent with inserts
- Don't skip, but writes the same old record again now - Marks the correspoinding HoodieRecord as failure to be handed back to the client
This commit is contained in:
@@ -90,6 +90,8 @@ public class HoodieInsertHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
record.deflate();
|
||||
recordsWritten++;
|
||||
} catch (Throwable t) {
|
||||
// Not throwing exception from here, since we don't want to fail the entire job
|
||||
// for a single record
|
||||
status.markFailure(record, t);
|
||||
logger.error("Error writing record " + record, t);
|
||||
}
|
||||
|
||||
@@ -114,17 +114,19 @@ import java.util.Iterator;
|
||||
}
|
||||
|
||||
|
||||
private void writeUpdateRecord(HoodieRecord<T> hoodieRecord, IndexedRecord indexedRecord) {
|
||||
private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, IndexedRecord indexedRecord) {
|
||||
try {
|
||||
storageWriter.writeAvroWithMetadata(indexedRecord, hoodieRecord);
|
||||
hoodieRecord.deflate();
|
||||
writeStatus.markSuccess(hoodieRecord);
|
||||
recordsWritten ++;
|
||||
updatedRecordsWritten ++;
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
logger.error("Error writing record "+ hoodieRecord, e);
|
||||
writeStatus.markFailure(hoodieRecord, e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -133,16 +135,27 @@ import java.util.Iterator;
|
||||
public void write(GenericRecord oldRecord) {
|
||||
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
|
||||
boolean copyOldRecord = true;
|
||||
if (keyToNewRecords.containsKey(key)) {
|
||||
try {
|
||||
IndexedRecord avroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, schema);
|
||||
writeUpdateRecord(hoodieRecord, avroRecord);
|
||||
if (writeUpdateRecord(hoodieRecord, avroRecord)) {
|
||||
/* ONLY WHEN
|
||||
* 1) we have an update for this key AND
|
||||
* 2) We are able to successfully write the the combined new value
|
||||
*
|
||||
* We no longer need to copy the old record over.
|
||||
*/
|
||||
copyOldRecord = false;
|
||||
}
|
||||
keyToNewRecords.remove(key);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {"
|
||||
+ keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
if (copyOldRecord) {
|
||||
// this should work as it is, since this is an existing record
|
||||
String errMsg = "Failed to merge old record into new file for key " + key + " from old file "
|
||||
+ getOldFilePath() + " to new file " + newFilePath;
|
||||
|
||||
@@ -391,7 +391,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
HoodieUpdateHandle upsertHandle =
|
||||
new HoodieUpdateHandle<>(config, commitTime, metadata, recordItr, fileLoc);
|
||||
if (upsertHandle.getOldFilePath() == null) {
|
||||
logger.error("Error in finding the old file path at commit " + commitTime);
|
||||
throw new HoodieUpsertException("Error in finding the old file path at commit " +
|
||||
commitTime +" at fileLoc: " + fileLoc);
|
||||
} else {
|
||||
Configuration conf = FSUtils.getFs().getConf();
|
||||
AvroReadSupport.setAvroReadSchema(conf, upsertHandle.getSchema());
|
||||
@@ -414,9 +415,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
||||
upsertHandle.close();
|
||||
}
|
||||
}
|
||||
//TODO(vc): This needs to be revisited
|
||||
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
|
||||
logger.info(
|
||||
"Upsert Handle has partition path as null " + upsertHandle.getOldFilePath()
|
||||
logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath()
|
||||
+ ", " + upsertHandle.getWriteStatus());
|
||||
}
|
||||
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
|
||||
|
||||
Reference in New Issue
Block a user