diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index 85e008199..f57484d88 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -29,7 +29,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -130,7 +129,7 @@ public abstract class HoodieAsyncService implements Serializable { future = res.getKey(); executor = res.getValue(); started = true; - monitorThreads(onShutdownCallback); + shutdownCallback(onShutdownCallback); } /** @@ -141,34 +140,15 @@ public abstract class HoodieAsyncService implements Serializable { protected abstract Pair startService(); /** - * A monitor thread is started which would trigger a callback if the service is shutdown. + * Add shutdown callback for the completable future. * - * @param onShutdownCallback + * @param callback The callback */ - private void monitorThreads(Function onShutdownCallback) { - LOG.info("Submitting monitor thread !!"); - Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "Monitor Thread"); - t.setDaemon(isRunInDaemonMode()); - return t; - }).submit(() -> { - boolean error = false; - try { - LOG.info("Monitoring thread(s) !!"); - future.get(); - } catch (ExecutionException ex) { - LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex); - error = true; - } catch (InterruptedException ie) { - LOG.error("Got interrupted Monitoring threads", ie); - error = true; - } finally { - // Mark as shutdown - shutdown = true; - if (null != onShutdownCallback) { - onShutdownCallback.apply(error); - } - shutdown(false); + @SuppressWarnings("unchecked") + private void shutdownCallback(Function callback) { + future.whenComplete((resp, error) -> { + if (null != callback) { + callback.apply(null != error); } }); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 358e30716..18f93faf2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -424,7 +424,11 @@ public abstract class AbstractHoodieWriteClient startService() { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); return Pair.of(CompletableFuture.supplyAsync(() -> { - writeClient.clean(cleanInstantTime); + writeClient.clean(instantTime); return true; - }), executor); + }, executor), executor); } public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) { AsyncCleanerService asyncCleanerService = null; if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { - String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); - asyncCleanerService = new AsyncCleanerService(writeClient, instantTime); + asyncCleanerService = new AsyncCleanerService(writeClient); asyncCleanerService.start(null); } else { LOG.info("Async auto cleaning is not enabled. Not running cleaner now"); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 374dd1226..9a56d6da4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -281,7 +281,11 @@ public class HoodieFlinkWriteClient extends * checkpoint finish. */ public void startAsyncCleaning() { - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + if (this.asyncCleanerService == null) { + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + } else { + this.asyncCleanerService.start(null); + } } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 77d663004..bb7900624 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -98,4 +98,11 @@ public class CleanFunction extends AbstractRichFunction public void initializeState(FunctionInitializationContext context) throws Exception { // no operation } + + @Override + public void close() throws Exception { + if (this.writeClient != null) { + this.writeClient.close(); + } + } }