From 73fdcf37dfa8ca4585533401f1ddabef081b72b4 Mon Sep 17 00:00:00 2001 From: yuzhaojing <32435329+yuzhaojing@users.noreply.github.com> Date: Thu, 26 Aug 2021 20:34:05 +0800 Subject: [PATCH] [HUDI-2368] Catch Throwable in BoundedInMemoryExecutor (#3546) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 喻兆靖 --- .../hudi/common/util/queue/BoundedInMemoryExecutor.java | 2 +- .../apache/hudi/common/util/queue/BoundedInMemoryQueue.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index 95a501f6e..872837913 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -90,7 +90,7 @@ public class BoundedInMemoryExecutor { try { preExecute(); producer.produce(queue); - } catch (Exception e) { + } catch (Throwable e) { LOG.error("error producing records", e); queue.markAsFailed(e); throw e; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java index f1ebdcd44..4d55249d1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java @@ -78,10 +78,10 @@ public class BoundedInMemoryQueue implements Iterable { private final long memoryLimit; /** - * it holds the root cause of the exception in case either queueing records + * it holds the root cause of the Throwable in case either queueing records * (consuming from inputIterator) fails or thread reading records from queue fails. */ - private final AtomicReference hasFailed = new AtomicReference<>(null); + private final AtomicReference hasFailed = new AtomicReference<>(null); /** Used for indicating that all the records from queue are read successfully. **/ private final AtomicBoolean isReadDone = new AtomicBoolean(false); @@ -251,7 +251,7 @@ public class BoundedInMemoryQueue implements Iterable { /** * API to allow producers and consumer to communicate termination due to failure. */ - public void markAsFailed(Exception e) { + public void markAsFailed(Throwable e) { this.hasFailed.set(e); // release the permits so that if the queueing thread is waiting for permits then it will // get it.