1
0

[HUDI-3043] Revert async cleaner leak commit to unblock CI failure (#4343)

* Revert "[HUDI-2959] Fix the thread leak of cleaning service (#4252)"
Reverting to unblock CI failure for now. will revisit this with the right fix
This commit is contained in:
Sivabalan Narayanan
2021-12-16 21:51:28 -05:00
committed by GitHub
parent 294d712948
commit 7e7ad1558c
5 changed files with 38 additions and 31 deletions

View File

@@ -29,6 +29,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
@@ -129,7 +130,7 @@ public abstract class HoodieAsyncService implements Serializable {
future = res.getKey(); future = res.getKey();
executor = res.getValue(); executor = res.getValue();
started = true; started = true;
shutdownCallback(onShutdownCallback); monitorThreads(onShutdownCallback);
} }
/** /**
@@ -140,15 +141,34 @@ public abstract class HoodieAsyncService implements Serializable {
protected abstract Pair<CompletableFuture, ExecutorService> startService(); protected abstract Pair<CompletableFuture, ExecutorService> startService();
/** /**
* Add shutdown callback for the completable future. * A monitor thread is started which would trigger a callback if the service is shutdown.
* *
* @param callback The callback * @param onShutdownCallback
*/ */
@SuppressWarnings("unchecked") private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
private void shutdownCallback(Function<Boolean, Boolean> callback) { LOG.info("Submitting monitor thread !!");
future.whenComplete((resp, error) -> { Executors.newSingleThreadExecutor(r -> {
if (null != callback) { Thread t = new Thread(r, "Monitor Thread");
callback.apply(null != error); t.setDaemon(isRunInDaemonMode());
return t;
}).submit(() -> {
boolean error = false;
try {
LOG.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
error = true;
} catch (InterruptedException ie) {
LOG.error("Got interrupted Monitoring threads", ie);
error = true;
} finally {
// Mark as shutdown
shutdown = true;
if (null != onShutdownCallback) {
onShutdownCallback.apply(error);
}
shutdown(false);
} }
}); });
} }

View File

@@ -425,11 +425,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
HoodieTableMetaClient metaClient) { HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType); setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
if (null == this.asyncCleanerService) { this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
} else {
this.asyncCleanerService.start(null);
}
} }
/** /**

View File

@@ -37,26 +37,28 @@ class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
private final AbstractHoodieWriteClient writeClient; private final AbstractHoodieWriteClient writeClient;
private final String cleanInstantTime;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) { protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) {
this.writeClient = writeClient; this.writeClient = writeClient;
this.cleanInstantTime = cleanInstantTime;
} }
@Override @Override
protected Pair<CompletableFuture, ExecutorService> startService() { protected Pair<CompletableFuture, ExecutorService> startService() {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
return Pair.of(CompletableFuture.supplyAsync(() -> { return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.clean(instantTime); writeClient.clean(cleanInstantTime);
return true; return true;
}, executor), executor); }), executor);
} }
public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) { public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null; AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
asyncCleanerService = new AsyncCleanerService(writeClient); String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
asyncCleanerService.start(null); asyncCleanerService.start(null);
} else { } else {
LOG.info("Async auto cleaning is not enabled. Not running cleaner now"); LOG.info("Async auto cleaning is not enabled. Not running cleaner now");

View File

@@ -281,11 +281,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* checkpoint finish. * checkpoint finish.
*/ */
public void startAsyncCleaning() { public void startAsyncCleaning() {
if (this.asyncCleanerService == null) { this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
} else {
this.asyncCleanerService.start(null);
}
} }
/** /**

View File

@@ -98,11 +98,4 @@ public class CleanFunction<T> extends AbstractRichFunction
public void initializeState(FunctionInitializationContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception {
// no operation // no operation
} }
@Override
public void close() throws Exception {
if (this.writeClient != null) {
this.writeClient.close();
}
}
} }