diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java index cce2ff565..1c1cf2bb9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java @@ -82,10 +82,16 @@ public abstract class AsyncClusteringService extends HoodieAsyncTableService { } LOG.info("Clustering executor shutting down properly"); } catch (InterruptedException ie) { + hasError = true; LOG.warn("Clustering executor got interrupted exception! Stopping", ie); } catch (IOException e) { - LOG.error("Clustering executor failed", e); + hasError = true; + LOG.error("Clustering executor failed due to IOException", e); throw new HoodieIOException(e.getMessage(), e); + } catch (Exception e) { + hasError = true; + LOG.error("Clustering executor failed", e); + throw e; } return true; }, executor)).toArray(CompletableFuture[]::new)), executor); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java index 6bfaa2f5c..f1f7f416e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java @@ -92,10 +92,16 @@ public abstract class AsyncCompactService extends HoodieAsyncTableService { } LOG.info("Compactor shutting down properly!!"); } catch (InterruptedException ie) { + hasError = true; LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie); } catch (IOException e) { - LOG.error("Compactor executor failed", e); + hasError = true; + LOG.error("Compactor executor failed due to IOException", e); throw new HoodieIOException(e.getMessage(), e); + } catch (Exception e) { + hasError = true; + LOG.error("Compactor executor failed", e); + throw e; } return true; }, executor)).toArray(CompletableFuture[]::new)), executor); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index a1665c7fb..1ce6dfb28 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -41,7 +41,10 @@ import java.util.function.Function; public abstract class HoodieAsyncService implements Serializable { private static final Logger LOG = LogManager.getLogger(HoodieAsyncService.class); + private static final long POLLING_SECONDS = 10; + // Flag indicating whether an error is incurred in the service + protected boolean hasError; // Flag to track if the service is started. private boolean started; // Flag indicating shutdown is externally requested @@ -82,9 +85,13 @@ public abstract class HoodieAsyncService implements Serializable { return shutdown; } + public boolean hasError() { + return hasError; + } + /** * Wait till the service shutdown. If the service shutdown with exception, it will be thrown - * + * * @throws ExecutionException * @throws InterruptedException */ @@ -109,6 +116,7 @@ public abstract class HoodieAsyncService implements Serializable { public void shutdown(boolean force) { if (!shutdownRequested || force) { shutdownRequested = true; + shutdown = true; if (executor != null) { if (force) { executor.shutdownNow(); @@ -178,8 +186,8 @@ public abstract class HoodieAsyncService implements Serializable { public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) throws InterruptedException { try { queueLock.lock(); - while (!isShutdown() && (pendingInstants.size() > numPending)) { - consumed.await(); + while (!isShutdown() && !hasError() && (pendingInstants.size() > numPending)) { + consumed.await(POLLING_SECONDS, TimeUnit.SECONDS); } } finally { queueLock.unlock(); @@ -202,8 +210,8 @@ public abstract class HoodieAsyncService implements Serializable { * @throws InterruptedException */ HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException { - LOG.info("Waiting for next instant upto 10 seconds"); - HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS); + LOG.info(String.format("Waiting for next instant up to %d seconds", POLLING_SECONDS)); + HoodieInstant instant = pendingInstants.poll(POLLING_SECONDS, TimeUnit.SECONDS); if (instant != null) { try { queueLock.lock(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 65cf2c3d3..c0c141db1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -659,6 +659,10 @@ public class HoodieDeltaStreamer implements Serializable { asyncCompactService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get())); asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); + if (asyncCompactService.get().hasError()) { + error = true; + throw new HoodieException("Async compaction failed. Shutting down Delta Sync..."); + } } if (clusteringConfig.isAsyncClusteringEnabled()) { Option clusteringInstant = deltaSync.getClusteringInstantOpt(); @@ -666,6 +670,10 @@ public class HoodieDeltaStreamer implements Serializable { LOG.info("Scheduled async clustering for instant: " + clusteringInstant.get()); asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get())); asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering); + if (asyncClusteringService.get().hasError()) { + error = true; + throw new HoodieException("Async clustering failed. Shutting down Delta Sync..."); + } } } long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); @@ -684,6 +692,7 @@ public class HoodieDeltaStreamer implements Serializable { } } finally { shutdownAsyncServices(error); + executor.shutdownNow(); } return true; }, executor), executor); @@ -737,13 +746,12 @@ public class HoodieDeltaStreamer implements Serializable { HoodieTableMetaClient.builder().setConf(new Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build(); List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); - asyncCompactService.get().start((error) -> { - // Shutdown DeltaSync - shutdown(false); - return true; - }); + asyncCompactService.get().start(error -> true); try { asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); + if (asyncCompactService.get().hasError()) { + throw new HoodieException("Async compaction failed during write client initialization."); + } } catch (InterruptedException ie) { throw new HoodieException(ie); } @@ -762,12 +770,12 @@ public class HoodieDeltaStreamer implements Serializable { List pending = ClusteringUtils.getPendingClusteringInstantTimes(meta); LOG.info(String.format("Found %d pending clustering instants ", pending.size())); pending.forEach(hoodieInstant -> asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); - asyncClusteringService.get().start((error) -> { - shutdown(false); - return true; - }); + asyncClusteringService.get().start(error -> true); try { asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering); + if (asyncClusteringService.get().hasError()) { + throw new HoodieException("Async clustering failed during write client initialization."); + } } catch (InterruptedException e) { throw new HoodieException(e); }