[HUDI-4086] Use CustomizedThreadFactory in async compaction and clustering (#5563)
Co-authored-by: 苏承祥 <sucx@tuya.com>
This commit is contained in:
@@ -24,6 +24,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
|
|||||||
import org.apache.hudi.common.engine.EngineProperty;
|
import org.apache.hudi.common.engine.EngineProperty;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.util.CustomizedThreadFactory;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
@@ -42,13 +43,12 @@ import java.util.stream.IntStream;
|
|||||||
*/
|
*/
|
||||||
public abstract class AsyncClusteringService extends HoodieAsyncTableService {
|
public abstract class AsyncClusteringService extends HoodieAsyncTableService {
|
||||||
|
|
||||||
|
public static final String CLUSTERING_POOL_NAME = "hoodiecluster";
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
|
private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
|
||||||
public static final String CLUSTERING_POOL_NAME = "hoodiecluster";
|
|
||||||
|
|
||||||
private final int maxConcurrentClustering;
|
private final int maxConcurrentClustering;
|
||||||
private transient BaseClusterer clusteringClient;
|
|
||||||
protected transient HoodieEngineContext context;
|
protected transient HoodieEngineContext context;
|
||||||
|
private transient BaseClusterer clusteringClient;
|
||||||
|
|
||||||
public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) {
|
public AsyncClusteringService(HoodieEngineContext context, BaseHoodieWriteClient writeClient) {
|
||||||
this(context, writeClient, false);
|
this(context, writeClient, false);
|
||||||
@@ -69,12 +69,7 @@ public abstract class AsyncClusteringService extends HoodieAsyncTableService {
|
|||||||
@Override
|
@Override
|
||||||
protected Pair<CompletableFuture, ExecutorService> startService() {
|
protected Pair<CompletableFuture, ExecutorService> startService() {
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering,
|
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering,
|
||||||
r -> {
|
new CustomizedThreadFactory("async_clustering_thread", isRunInDaemonMode()));
|
||||||
Thread t = new Thread(r, "async_clustering_thread");
|
|
||||||
t.setDaemon(isRunInDaemonMode());
|
|
||||||
return t;
|
|
||||||
});
|
|
||||||
|
|
||||||
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
|
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
|
||||||
try {
|
try {
|
||||||
// Set Compactor Pool Name for allowing users to prioritize compaction
|
// Set Compactor Pool Name for allowing users to prioritize compaction
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
|
|||||||
import org.apache.hudi.common.engine.EngineProperty;
|
import org.apache.hudi.common.engine.EngineProperty;
|
||||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.util.CustomizedThreadFactory;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
@@ -39,17 +40,15 @@ import java.util.stream.IntStream;
|
|||||||
*/
|
*/
|
||||||
public abstract class AsyncCompactService extends HoodieAsyncTableService {
|
public abstract class AsyncCompactService extends HoodieAsyncTableService {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the job pool used by async compaction.
|
* This is the job pool used by async compaction.
|
||||||
*/
|
*/
|
||||||
public static final String COMPACT_POOL_NAME = "hoodiecompact";
|
public static final String COMPACT_POOL_NAME = "hoodiecompact";
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
|
||||||
private final int maxConcurrentCompaction;
|
private final int maxConcurrentCompaction;
|
||||||
private transient BaseCompactor compactor;
|
|
||||||
protected transient HoodieEngineContext context;
|
protected transient HoodieEngineContext context;
|
||||||
|
private transient BaseCompactor compactor;
|
||||||
|
|
||||||
public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) {
|
public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) {
|
||||||
this(context, client, false);
|
this(context, client, false);
|
||||||
@@ -70,11 +69,7 @@ public abstract class AsyncCompactService extends HoodieAsyncTableService {
|
|||||||
@Override
|
@Override
|
||||||
protected Pair<CompletableFuture, ExecutorService> startService() {
|
protected Pair<CompletableFuture, ExecutorService> startService() {
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction,
|
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction,
|
||||||
r -> {
|
new CustomizedThreadFactory("async_compact_thread", isRunInDaemonMode()));
|
||||||
Thread t = new Thread(r, "async_compact_thread");
|
|
||||||
t.setDaemon(isRunInDaemonMode());
|
|
||||||
return t;
|
|
||||||
});
|
|
||||||
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
|
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
|
||||||
try {
|
try {
|
||||||
// Set Compactor Pool Name for allowing users to prioritize compaction
|
// Set Compactor Pool Name for allowing users to prioritize compaction
|
||||||
@@ -107,9 +102,9 @@ public abstract class AsyncCompactService extends HoodieAsyncTableService {
|
|||||||
}, executor)).toArray(CompletableFuture[]::new)), executor);
|
}, executor)).toArray(CompletableFuture[]::new)), executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether compactor thread needs to be stopped.
|
* Check whether compactor thread needs to be stopped.
|
||||||
|
*
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
protected boolean shouldStopCompactor() {
|
protected boolean shouldStopCompactor() {
|
||||||
|
|||||||
Reference in New Issue
Block a user