1
0

Fix various errors found by long running delta-streamer tests

1. Parquet Avro schema mismatch errors when ingesting are sometimes silently ignored due to race-condition in BoundedInMemoryExecutor. This was reproducible when running long-running delta-streamer with wrong schema and it caused data-loss
  2. Fix behavior of Delta-Streamer to error out by default if there are any error records
  3. Fix a bug in tracking write errors in WriteStats. Earlier the write errors were tracking sampled errors as opposed to total errors.
  4. Delta Streamer does not commit the changes done as part of inline compaction as auto-commit is force disabled. Fix this behavior to always auto-commit inline compaction as it would not otherwise commit.
This commit is contained in:
Balaji Varadarajan
2019-05-12 11:57:04 -07:00
committed by vinoth chandar
parent a0e62b7919
commit 9cce9abf4d
8 changed files with 48 additions and 22 deletions

View File

@@ -1348,7 +1348,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
Optional<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
try {
compact(compactionInstantTime);
// inline compaction should auto commit as the user is never given control
compact(compactionInstantTime, true);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}

View File

@@ -89,7 +89,8 @@ public class WriteStatus implements Serializable {
* HoodieRecord} before deflation.
*/
public void markFailure(HoodieRecord record, Throwable t, Optional<Map<String, String>> optionalRecordMetadata) {
if (random.nextDouble() <= failureFraction) {
if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) {
// Guaranteed to have at-least one error
failedRecords.add(record);
errors.put(record.getKey(), t);
}

View File

@@ -251,7 +251,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
writeStatus.getStat().setNumInserts(insertRecordsWritten);
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalUpsertTime(timer.endTimer());
writeStatus.getStat().setRuntimeStats(runtimeStats);

View File

@@ -166,7 +166,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
stat.setFileId(writeStatus.getFileId());
stat.setPaths(new Path(config.getBasePath()), path, tempPath);
stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
stat.setTotalWriteErrors(writeStatus.getFailedRecords().size());
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);

View File

@@ -281,7 +281,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
writeStatus.getStat().setNumDeletes(recordsDeleted);
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
writeStatus.getStat().setNumInserts(insertRecordsWritten);
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords());
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalUpsertTime(timer.endTimer());
writeStatus.getStat().setRuntimeStats(runtimeStats);

View File

@@ -93,7 +93,7 @@ public class BoundedInMemoryExecutor<I, O, E> {
preExecute();
producer.produce(queue);
} catch (Exception e) {
logger.error("error consuming records", e);
logger.error("error producing records", e);
queue.markAsFailed(e);
throw e;
} finally {

View File

@@ -207,6 +207,9 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
throw new HoodieException(e);
}
}
// Check one more time here as it is possible producer errored out and closed immediately
throwExceptionIfFailed();
if (newRecord != null && newRecord.isPresent()) {
return newRecord;
} else {

View File

@@ -276,24 +276,42 @@ public class HoodieDeltaStreamer implements Serializable {
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
}
// Simply commit for now. TODO(vc): Support better error handlers later on
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue();
long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue();
boolean hasErrors = totalErrorRecords > 0;
long hiveSyncTimeMs = 0;
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
boolean success = client.commit(commitTime, writeStatusRDD,
Optional.of(checkpointCommitMetadata));
if (success) {
log.info("Commit " + commitTime + " successful!");
// TODO(vc): Kick off hive sync from here.
if (hasErrors) {
log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
}
boolean success = client.commit(commitTime, writeStatusRDD,
Optional.of(checkpointCommitMetadata));
if (success) {
log.info("Commit " + commitTime + " successful!");
// Sync to hive if enabled
Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
syncHive();
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
} else {
log.info("Commit " + commitTime + " failed!");
}
} else {
log.info("Commit " + commitTime + " failed!");
log.error("There are errors when ingesting records. Errors/Total="
+ totalErrorRecords + "/" + totalRecords);
log.error("Printing out the top 100 errors");
writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
log.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
ws.getErrors().entrySet().forEach(r ->
log.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
}
});
}
// Sync to hive if enabled
Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
syncHive();
long hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
client.close();
long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0;
@@ -347,7 +365,7 @@ public class HoodieDeltaStreamer implements Serializable {
UPSERT, INSERT, BULK_INSERT
}
private class OperationConvertor implements IStringConverter<Operation> {
private static class OperationConvertor implements IStringConverter<Operation> {
@Override
public Operation convert(String value) throws ParameterException {
return Operation.valueOf(value);
@@ -432,6 +450,9 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";
@Parameter(names = {"--commit-on-errors"})
public Boolean commitOnErrors = false;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
}