1
0

[HUDI-2959] Fix the thread leak of cleaning service (#4252)

This commit is contained in:
Danny Chan
2021-12-11 12:08:47 +08:00
committed by GitHub
parent 9797fdfbb2
commit 9bdcee00c0
5 changed files with 31 additions and 38 deletions

View File

@@ -29,7 +29,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
@@ -130,7 +129,7 @@ public abstract class HoodieAsyncService implements Serializable {
future = res.getKey(); future = res.getKey();
executor = res.getValue(); executor = res.getValue();
started = true; started = true;
monitorThreads(onShutdownCallback); shutdownCallback(onShutdownCallback);
} }
/** /**
@@ -141,34 +140,15 @@ public abstract class HoodieAsyncService implements Serializable {
protected abstract Pair<CompletableFuture, ExecutorService> startService(); protected abstract Pair<CompletableFuture, ExecutorService> startService();
/** /**
* A monitor thread is started which would trigger a callback if the service is shutdown. * Add shutdown callback for the completable future.
* *
* @param onShutdownCallback * @param callback The callback
*/ */
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) { @SuppressWarnings("unchecked")
LOG.info("Submitting monitor thread !!"); private void shutdownCallback(Function<Boolean, Boolean> callback) {
Executors.newSingleThreadExecutor(r -> { future.whenComplete((resp, error) -> {
Thread t = new Thread(r, "Monitor Thread"); if (null != callback) {
t.setDaemon(isRunInDaemonMode()); callback.apply(null != error);
return t;
}).submit(() -> {
boolean error = false;
try {
LOG.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
error = true;
} catch (InterruptedException ie) {
LOG.error("Got interrupted Monitoring threads", ie);
error = true;
} finally {
// Mark as shutdown
shutdown = true;
if (null != onShutdownCallback) {
onShutdownCallback.apply(error);
}
shutdown(false);
} }
}); });
} }

View File

@@ -424,7 +424,11 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
HoodieTableMetaClient metaClient) { HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType); setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); if (null == this.asyncCleanerService) {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
} else {
this.asyncCleanerService.start(null);
}
} }
/** /**

View File

@@ -37,28 +37,26 @@ class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
private final AbstractHoodieWriteClient writeClient; private final AbstractHoodieWriteClient writeClient;
private final String cleanInstantTime;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) { protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) {
this.writeClient = writeClient; this.writeClient = writeClient;
this.cleanInstantTime = cleanInstantTime;
} }
@Override @Override
protected Pair<CompletableFuture, ExecutorService> startService() { protected Pair<CompletableFuture, ExecutorService> startService() {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
return Pair.of(CompletableFuture.supplyAsync(() -> { return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.clean(cleanInstantTime); writeClient.clean(instantTime);
return true; return true;
}), executor); }, executor), executor);
} }
public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) { public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null; AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(); asyncCleanerService = new AsyncCleanerService(writeClient);
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
asyncCleanerService.start(null); asyncCleanerService.start(null);
} else { } else {
LOG.info("Async auto cleaning is not enabled. Not running cleaner now"); LOG.info("Async auto cleaning is not enabled. Not running cleaner now");

View File

@@ -281,7 +281,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* checkpoint finish. * checkpoint finish.
*/ */
public void startAsyncCleaning() { public void startAsyncCleaning() {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); if (this.asyncCleanerService == null) {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
} else {
this.asyncCleanerService.start(null);
}
} }
/** /**

View File

@@ -98,4 +98,11 @@ public class CleanFunction<T> extends AbstractRichFunction
public void initializeState(FunctionInitializationContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception {
// no operation // no operation
} }
@Override
public void close() throws Exception {
if (this.writeClient != null) {
this.writeClient.close();
}
}
} }