From b2c958519e0ff8da2e7136a1c9b7753618806b6c Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Thu, 5 Jan 2017 17:26:47 -0800 Subject: [PATCH] Changing Update record failure semantics to be consistent with inserts - Don't skip, but writes the same old record again now - Marks the correspoinding HoodieRecord as failure to be handed back to the client --- .../uber/hoodie/io/HoodieInsertHandle.java | 2 ++ .../uber/hoodie/io/HoodieUpdateHandle.java | 19 ++++++++++++++++--- .../hoodie/table/HoodieCopyOnWriteTable.java | 7 ++++--- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java index cf1eaabbe..0b3e862d8 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieInsertHandle.java @@ -90,6 +90,8 @@ public class HoodieInsertHandle extends HoodieIOH record.deflate(); recordsWritten++; } catch (Throwable t) { + // Not throwing exception from here, since we don't want to fail the entire job + // for a single record status.markFailure(record, t); logger.error("Error writing record " + record, t); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java index 98c0992c5..92dbff783 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java @@ -114,17 +114,19 @@ import java.util.Iterator; } - private void writeUpdateRecord(HoodieRecord hoodieRecord, IndexedRecord indexedRecord) { + private boolean writeUpdateRecord(HoodieRecord hoodieRecord, IndexedRecord indexedRecord) { try { storageWriter.writeAvroWithMetadata(indexedRecord, hoodieRecord); hoodieRecord.deflate(); writeStatus.markSuccess(hoodieRecord); recordsWritten ++; updatedRecordsWritten ++; + return true; } catch (Exception e) { logger.error("Error writing record "+ hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e); } + return false; } /** @@ -133,16 +135,27 @@ import java.util.Iterator; public void write(GenericRecord oldRecord) { String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); HoodieRecord hoodieRecord = keyToNewRecords.get(key); + boolean copyOldRecord = true; if (keyToNewRecords.containsKey(key)) { try { IndexedRecord avroRecord = hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, schema); - writeUpdateRecord(hoodieRecord, avroRecord); + if (writeUpdateRecord(hoodieRecord, avroRecord)) { + /* ONLY WHEN + * 1) we have an update for this key AND + * 2) We are able to successfully write the the combined new value + * + * We no longer need to copy the old record over. + */ + copyOldRecord = false; + } keyToNewRecords.remove(key); } catch (Exception e) { throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {" + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e); } - } else { + } + + if (copyOldRecord) { // this should work as it is, since this is an existing record String errMsg = "Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath() + " to new file " + newFilePath; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 23bf11065..64880f032 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -391,7 +391,8 @@ public class HoodieCopyOnWriteTable extends Hoodi HoodieUpdateHandle upsertHandle = new HoodieUpdateHandle<>(config, commitTime, metadata, recordItr, fileLoc); if (upsertHandle.getOldFilePath() == null) { - logger.error("Error in finding the old file path at commit " + commitTime); + throw new HoodieUpsertException("Error in finding the old file path at commit " + + commitTime +" at fileLoc: " + fileLoc); } else { Configuration conf = FSUtils.getFs().getConf(); AvroReadSupport.setAvroReadSchema(conf, upsertHandle.getSchema()); @@ -414,9 +415,9 @@ public class HoodieCopyOnWriteTable extends Hoodi upsertHandle.close(); } } + //TODO(vc): This needs to be revisited if (upsertHandle.getWriteStatus().getPartitionPath() == null) { - logger.info( - "Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + upsertHandle.getWriteStatus()); } return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();