[HUDI-2368] Catch Throwable in BoundedInMemoryExecutor (#3546)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
@@ -90,7 +90,7 @@ public class BoundedInMemoryExecutor<I, O, E> {
|
|||||||
try {
|
try {
|
||||||
preExecute();
|
preExecute();
|
||||||
producer.produce(queue);
|
producer.produce(queue);
|
||||||
} catch (Exception e) {
|
} catch (Throwable e) {
|
||||||
LOG.error("error producing records", e);
|
LOG.error("error producing records", e);
|
||||||
queue.markAsFailed(e);
|
queue.markAsFailed(e);
|
||||||
throw e;
|
throw e;
|
||||||
|
|||||||
@@ -78,10 +78,10 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
|||||||
private final long memoryLimit;
|
private final long memoryLimit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* it holds the root cause of the exception in case either queueing records
|
* it holds the root cause of the Throwable in case either queueing records
|
||||||
* (consuming from inputIterator) fails or thread reading records from queue fails.
|
* (consuming from inputIterator) fails or thread reading records from queue fails.
|
||||||
*/
|
*/
|
||||||
private final AtomicReference<Exception> hasFailed = new AtomicReference<>(null);
|
private final AtomicReference<Throwable> hasFailed = new AtomicReference<>(null);
|
||||||
|
|
||||||
/** Used for indicating that all the records from queue are read successfully. **/
|
/** Used for indicating that all the records from queue are read successfully. **/
|
||||||
private final AtomicBoolean isReadDone = new AtomicBoolean(false);
|
private final AtomicBoolean isReadDone = new AtomicBoolean(false);
|
||||||
@@ -251,7 +251,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
|||||||
/**
|
/**
|
||||||
* API to allow producers and consumer to communicate termination due to failure.
|
* API to allow producers and consumer to communicate termination due to failure.
|
||||||
*/
|
*/
|
||||||
public void markAsFailed(Exception e) {
|
public void markAsFailed(Throwable e) {
|
||||||
this.hasFailed.set(e);
|
this.hasFailed.set(e);
|
||||||
// release the permits so that if the queueing thread is waiting for permits then it will
|
// release the permits so that if the queueing thread is waiting for permits then it will
|
||||||
// get it.
|
// get it.
|
||||||
|
|||||||
Reference in New Issue
Block a user