diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index f57484d88..85e008199 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -29,6 +29,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -129,7 +130,7 @@ public abstract class HoodieAsyncService implements Serializable { future = res.getKey(); executor = res.getValue(); started = true; - shutdownCallback(onShutdownCallback); + monitorThreads(onShutdownCallback); } /** @@ -140,15 +141,34 @@ public abstract class HoodieAsyncService implements Serializable { protected abstract Pair startService(); /** - * Add shutdown callback for the completable future. + * A monitor thread is started which would trigger a callback if the service is shutdown. * - * @param callback The callback + * @param onShutdownCallback */ - @SuppressWarnings("unchecked") - private void shutdownCallback(Function callback) { - future.whenComplete((resp, error) -> { - if (null != callback) { - callback.apply(null != error); + private void monitorThreads(Function onShutdownCallback) { + LOG.info("Submitting monitor thread !!"); + Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "Monitor Thread"); + t.setDaemon(isRunInDaemonMode()); + return t; + }).submit(() -> { + boolean error = false; + try { + LOG.info("Monitoring thread(s) !!"); + future.get(); + } catch (ExecutionException ex) { + LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex); + error = true; + } catch (InterruptedException ie) { + LOG.error("Got interrupted Monitoring threads", ie); + error = true; + } finally { + // Mark as shutdown + shutdown = true; + if (null != onShutdownCallback) { + onShutdownCallback.apply(error); + } + shutdown(false); } }); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 286e3adbb..3fe28c64b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -425,11 +425,7 @@ public abstract class AbstractHoodieWriteClient startService() { - String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); return Pair.of(CompletableFuture.supplyAsync(() -> { - writeClient.clean(instantTime); + writeClient.clean(cleanInstantTime); return true; - }, executor), executor); + }), executor); } public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) { AsyncCleanerService asyncCleanerService = null; if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { - asyncCleanerService = new AsyncCleanerService(writeClient); + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); + asyncCleanerService = new AsyncCleanerService(writeClient, instantTime); asyncCleanerService.start(null); } else { LOG.info("Async auto cleaning is not enabled. Not running cleaner now"); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 9a56d6da4..374dd1226 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -281,11 +281,7 @@ public class HoodieFlinkWriteClient extends * checkpoint finish. */ public void startAsyncCleaning() { - if (this.asyncCleanerService == null) { - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); - } else { - this.asyncCleanerService.start(null); - } + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index bb7900624..77d663004 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -98,11 +98,4 @@ public class CleanFunction extends AbstractRichFunction public void initializeState(FunctionInitializationContext context) throws Exception { // no operation } - - @Override - public void close() throws Exception { - if (this.writeClient != null) { - this.writeClient.close(); - } - } }