diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 632c13df9..ff467945d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -1348,7 +1348,8 @@ public class HoodieWriteClient implements Seriali Optional compactionInstantTimeOpt = scheduleCompaction(extraMetadata); compactionInstantTimeOpt.ifPresent(compactionInstantTime -> { try { - compact(compactionInstantTime); + // inline compaction should auto commit as the user is never given control + compact(compactionInstantTime, true); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java b/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java index 54b32f099..98c7e7809 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java @@ -89,7 +89,8 @@ public class WriteStatus implements Serializable { * HoodieRecord} before deflation. */ public void markFailure(HoodieRecord record, Throwable t, Optional> optionalRecordMetadata) { - if (random.nextDouble() <= failureFraction) { + if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) { + // Guaranteed to have at-least one error failedRecords.add(record); errors.put(record.getKey(), t); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 7cd22676f..17f4d26b0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -251,7 +251,7 @@ public class HoodieAppendHandle extends HoodieIOH writeStatus.getStat().setNumInserts(insertRecordsWritten); writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten); - writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); + writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalUpsertTime(timer.endTimer()); writeStatus.getStat().setRuntimeStats(runtimeStats); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index 5ff356171..0a22865dc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -166,7 +166,7 @@ public class HoodieCreateHandle extends HoodieIOH stat.setFileId(writeStatus.getFileId()); stat.setPaths(new Path(config.getBasePath()), path, tempPath); stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath())); - stat.setTotalWriteErrors(writeStatus.getFailedRecords().size()); + stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalCreateTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 3beb97d53..4c3bfa850 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -281,7 +281,7 @@ public class HoodieMergeHandle extends HoodieIOHa writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); writeStatus.getStat().setNumInserts(insertRecordsWritten); - writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); + writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords()); RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalUpsertTime(timer.endTimer()); writeStatus.getStat().setRuntimeStats(runtimeStats); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryExecutor.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryExecutor.java index add92005f..5765ea963 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryExecutor.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryExecutor.java @@ -93,7 +93,7 @@ public class BoundedInMemoryExecutor { preExecute(); producer.produce(queue); } catch (Exception e) { - logger.error("error consuming records", e); + logger.error("error producing records", e); queue.markAsFailed(e); throw e; } finally { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryQueue.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryQueue.java index 401924e96..3792233a1 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryQueue.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryQueue.java @@ -207,6 +207,9 @@ public class BoundedInMemoryQueue implements Iterable { throw new HoodieException(e); } } + // Check one more time here as it is possible producer errored out and closed immediately + throwExceptionIfFailed(); + if (newRecord != null && newRecord.isPresent()) { return newRecord; } else { diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index 68621c44b..e6e8417ce 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -276,24 +276,42 @@ public class HoodieDeltaStreamer implements Serializable { throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); } - // Simply commit for now. TODO(vc): Support better error handlers later on - HashMap checkpointCommitMetadata = new HashMap<>(); - checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); + long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue(); + long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue(); + boolean hasErrors = totalErrorRecords > 0; + long hiveSyncTimeMs = 0; + if (!hasErrors || cfg.commitOnErrors) { + HashMap checkpointCommitMetadata = new HashMap<>(); + checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr); - boolean success = client.commit(commitTime, writeStatusRDD, - Optional.of(checkpointCommitMetadata)); - if (success) { - log.info("Commit " + commitTime + " successful!"); - // TODO(vc): Kick off hive sync from here. + if (hasErrors) { + log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total=" + + totalErrorRecords + "/" + totalRecords); + } + + boolean success = client.commit(commitTime, writeStatusRDD, + Optional.of(checkpointCommitMetadata)); + if (success) { + log.info("Commit " + commitTime + " successful!"); + // Sync to hive if enabled + Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext(); + syncHive(); + hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; + } else { + log.info("Commit " + commitTime + " failed!"); + } } else { - log.info("Commit " + commitTime + " failed!"); + log.error("There are errors when ingesting records. Errors/Total=" + + totalErrorRecords + "/" + totalRecords); + log.error("Printing out the top 100 errors"); + writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> { + log.error("Global error :", ws.getGlobalError()); + if (ws.getErrors().size() > 0) { + ws.getErrors().entrySet().forEach(r -> + log.trace("Error for key:" + r.getKey() + " is " + r.getValue())); + } + }); } - - // Sync to hive if enabled - Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext(); - syncHive(); - long hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0; - client.close(); long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0; @@ -347,7 +365,7 @@ public class HoodieDeltaStreamer implements Serializable { UPSERT, INSERT, BULK_INSERT } - private class OperationConvertor implements IStringConverter { + private static class OperationConvertor implements IStringConverter { @Override public Operation convert(String value) throws ParameterException { return Operation.valueOf(value); @@ -432,6 +450,9 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--spark-master"}, description = "spark master to use.") public String sparkMaster = "local[2]"; + @Parameter(names = {"--commit-on-errors"}) + public Boolean commitOnErrors = false; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; }