refactor(web): 使用统一的线程池资源,防止浪费

This commit is contained in:
2023-06-07 16:42:05 +08:00
parent 9ab94f98de
commit 65eb82651f
5 changed files with 34 additions and 42 deletions

View File

@@ -0,0 +1,14 @@
package com.lanyuanxiaoyao.service.configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 线程池
*
* @author lanyuanxiaoyao
* @date 2023-06-07
*/
public class ExecutorProvider {
public static final ExecutorService EXECUTORS = Executors.newWorkStealingPool(60);
}

View File

@@ -1,13 +1,12 @@
package com.lanyuanxiaoyao.service.web.controller; package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkCheckpoint; import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkCheckpoint;
import com.lanyuanxiaoyao.service.forest.service.FlinkService; import com.lanyuanxiaoyao.service.forest.service.FlinkService;
import com.lanyuanxiaoyao.service.web.entity.FlinkVertexVO; import com.lanyuanxiaoyao.service.web.entity.FlinkVertexVO;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -26,7 +25,6 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("flink") @RequestMapping("flink")
public class FlinkController extends BaseController { public class FlinkController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(FlinkController.class); private static final Logger logger = LoggerFactory.getLogger(FlinkController.class);
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
private final FlinkService flinkService; private final FlinkService flinkService;
@@ -52,7 +50,7 @@ public class FlinkController extends BaseController {
boolean isFilterTable = StrUtil.isNotBlank(table); boolean isFilterTable = StrUtil.isNotBlank(table);
ImmutableList<FlinkVertexVO> vertexVOS = flinkService.vertexOverview(url) ImmutableList<FlinkVertexVO> vertexVOS = flinkService.vertexOverview(url)
.getJobs() .getJobs()
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(vertex -> flinkService.vertex(url, vertex.getJid())) .collect(vertex -> flinkService.vertex(url, vertex.getJid()))
.select(vertex -> !isFilterSchema || StrUtil.contains(vertex.getName(), schema)) .select(vertex -> !isFilterSchema || StrUtil.contains(vertex.getName(), schema))
.select(vertex -> isAllInOne || (!isFilterTable || StrUtil.contains(vertex.getName(), table))) .select(vertex -> isAllInOne || (!isFilterTable || StrUtil.contains(vertex.getName(), table)))
@@ -68,7 +66,7 @@ public class FlinkController extends BaseController {
.collect(child -> new FlinkVertexVO.FlinkVertexChildVO(child, StrUtil.format("{}/#/job/{}/overview/{}", url, vertex.getJid(), child.getJid()))); .collect(child -> new FlinkVertexVO.FlinkVertexChildVO(child, StrUtil.format("{}/#/job/{}/overview/{}", url, vertex.getJid(), child.getJid())));
ImmutableList<FlinkCheckpoint> checkpoints = flinkService.checkpointOverview(url, vertex.getJid()) ImmutableList<FlinkCheckpoint> checkpoints = flinkService.checkpointOverview(url, vertex.getJid())
.getHistory() .getHistory()
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(checkpoint -> flinkService.checkpoint(url, vertex.getJid(), checkpoint.getId())) .collect(checkpoint -> flinkService.checkpoint(url, vertex.getJid(), checkpoint.getId()))
.toSortedListBy(FlinkCheckpoint::getId) .toSortedListBy(FlinkCheckpoint::getId)
.reverseThis() .reverseThis()

View File

@@ -4,6 +4,7 @@ import com.eshore.odcp.hudi.connector.entity.RunMeta;
import com.eshore.odcp.hudi.connector.utils.NameHelper; import com.eshore.odcp.hudi.connector.utils.NameHelper;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode; import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
import com.lanyuanxiaoyao.service.forest.service.ZookeeperService; import com.lanyuanxiaoyao.service.forest.service.ZookeeperService;
@@ -15,9 +16,6 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* 运行时监控 * 运行时监控
* *
@@ -28,7 +26,6 @@ import java.util.concurrent.Executors;
@RequestMapping("running") @RequestMapping("running")
public class RunningController extends BaseController { public class RunningController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(RunningController.class); private static final Logger logger = LoggerFactory.getLogger(RunningController.class);
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
private final ZookeeperService zookeeperService; private final ZookeeperService zookeeperService;
private final ObjectMapper mapper; private final ObjectMapper mapper;
@@ -43,7 +40,7 @@ public class RunningController extends BaseController {
public AmisResponse sync() { public AmisResponse sync() {
return responseCrudData( return responseCrudData(
zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH) zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH)
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(this::parseRunMeta) .collect(this::parseRunMeta)
.toSortedListBy(ZookeeperNodeVO::getCreateTime) .toSortedListBy(ZookeeperNodeVO::getCreateTime)
.toReversed() .toReversed()
@@ -55,7 +52,7 @@ public class RunningController extends BaseController {
public AmisResponse compaction() { public AmisResponse compaction() {
return responseCrudData( return responseCrudData(
zookeeperService.getChildren(NameHelper.ZK_COMPACTION_RUNNING_LOCK_PATH) zookeeperService.getChildren(NameHelper.ZK_COMPACTION_RUNNING_LOCK_PATH)
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(this::parseRunMeta) .collect(this::parseRunMeta)
.toSortedListBy(ZookeeperNodeVO::getCreateTime) .toSortedListBy(ZookeeperNodeVO::getCreateTime)
.toReversed() .toReversed()

View File

@@ -8,6 +8,7 @@ import com.eshore.odcp.hudi.connector.entity.SyncState;
import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.eshore.odcp.hudi.connector.utils.NameHelper; import com.eshore.odcp.hudi.connector.utils.NameHelper;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
@@ -18,10 +19,7 @@ import com.lanyuanxiaoyao.service.web.entity.TableVO;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.MutableMap; import org.eclipse.collections.api.map.MutableMap;
import org.slf4j.Logger; import org.slf4j.Logger;
@@ -42,7 +40,6 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("table") @RequestMapping("table")
public class TableController extends BaseController { public class TableController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(TableController.class); private static final Logger logger = LoggerFactory.getLogger(TableController.class);
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(50);
private final InfoService infoService; private final InfoService infoService;
private final ZookeeperService zookeeperService; private final ZookeeperService zookeeperService;
private final ObjectMapper mapper; private final ObjectMapper mapper;
@@ -65,19 +62,7 @@ public class TableController extends BaseController {
@RequestParam(value = "filter_run_mode", required = false) List<String> runMode, @RequestParam(value = "filter_run_mode", required = false) List<String> runMode,
@RequestParam(value = "filter_compaction_status", required = false) List<String> compactionStatus @RequestParam(value = "filter_compaction_status", required = false) List<String> compactionStatus
) { ) {
MutableMap<String, Object> queryMap = Maps.mutable.empty(); MutableMap<String, Object> queryMap = buildQueryMap(page, count, order, direction, searchFlinkJobId, searchAlias);
queryMap.put("page", page);
queryMap.put("count", count);
if (StrUtil.isNotBlank(searchFlinkJobId)) {
queryMap.put("flink_job_id", searchFlinkJobId);
}
if (StrUtil.isNotBlank(searchAlias)) {
queryMap.put("alias", searchAlias);
}
if (StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction)) {
queryMap.put("order", order);
queryMap.put("direction", direction);
}
if (ObjectUtil.isNotEmpty(runMode)) { if (ObjectUtil.isNotEmpty(runMode)) {
queryMap.put("filter_run_mode", runMode); queryMap.put("filter_run_mode", runMode);
} }
@@ -87,13 +72,13 @@ public class TableController extends BaseController {
PageResponse<JobIdAndAlias> pageResponse = infoService.jobIdAndAlias(queryMap); PageResponse<JobIdAndAlias> pageResponse = infoService.jobIdAndAlias(queryMap);
Long total = pageResponse.getTotal(); Long total = pageResponse.getTotal();
ImmutableList<TableVO> tableVOS = Lists.immutable.ofAll(pageResponse.getData()) ImmutableList<TableVO> tableVOS = Lists.immutable.ofAll(pageResponse.getData())
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(item -> { .collect(item -> {
CompletableFuture<FlinkJob> flinkJobFuture = CompletableFuture.supplyAsync(() -> infoService.flinkJobDetail(item.getId()), EXECUTOR); CompletableFuture<FlinkJob> flinkJobFuture = CompletableFuture.supplyAsync(() -> infoService.flinkJobDetail(item.getId()), ExecutorProvider.EXECUTORS);
CompletableFuture<TableMeta> tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), EXECUTOR); CompletableFuture<TableMeta> tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), ExecutorProvider.EXECUTORS);
CompletableFuture<SyncState> syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), EXECUTOR); CompletableFuture<SyncState> syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), ExecutorProvider.EXECUTORS);
CompletableFuture<RunMeta> syncRuntime = CompletableFuture CompletableFuture<RunMeta> syncRuntime = CompletableFuture
.supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId(), item.getAlias())), EXECUTOR) .supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId(), item.getAlias())), ExecutorProvider.EXECUTORS)
.thenApply(running -> { .thenApply(running -> {
if (running) { if (running) {
try { try {
@@ -108,7 +93,7 @@ public class TableController extends BaseController {
return null; return null;
}); });
CompletableFuture<RunMeta> compactionRuntime = CompletableFuture CompletableFuture<RunMeta> compactionRuntime = CompletableFuture
.supplyAsync(() -> zookeeperService.existsPath(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())), EXECUTOR) .supplyAsync(() -> zookeeperService.existsPath(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())), ExecutorProvider.EXECUTORS)
.thenApply(running -> { .thenApply(running -> {
if (running) { if (running) {
try { try {

View File

@@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.ExecutorProvider;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
@@ -14,8 +15,6 @@ import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function; import java.util.function.Function;
import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.factory.Maps;
@@ -38,7 +37,6 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("yarn") @RequestMapping("yarn")
public class YarnController extends BaseController { public class YarnController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(YarnController.class); private static final Logger logger = LoggerFactory.getLogger(YarnController.class);
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
private static final ImmutableMap<String, Function<YarnApplication, Long>> SORT_MAP = Maps.immutable.of( private static final ImmutableMap<String, Function<YarnApplication, Long>> SORT_MAP = Maps.immutable.of(
"startedTime", "startedTime",
YarnApplication::getStartedTime, YarnApplication::getStartedTime,
@@ -71,7 +69,7 @@ public class YarnController extends BaseController {
boolean isSearchName = StrUtil.isNotBlank(searchName); boolean isSearchName = StrUtil.isNotBlank(searchName);
Comparator<YarnApplication> comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP); Comparator<YarnApplication> comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP);
ImmutableList<YarnApplication> applications = Lists.immutable.ofAll(clusters) ImmutableList<YarnApplication> applications = Lists.immutable.ofAll(clusters)
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.flatCollect(yarnService::jobList) .flatCollect(yarnService::jobList)
.select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState())) .select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState()))
.select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus())) .select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus()))
@@ -93,7 +91,7 @@ public class YarnController extends BaseController {
@GetMapping("job_current") @GetMapping("job_current")
public AmisResponse jobCurrent(@RequestParam("clusters") List<String> clusters, @RequestParam("name") String name) { public AmisResponse jobCurrent(@RequestParam("clusters") List<String> clusters, @RequestParam("name") String name) {
Optional<YarnApplication> currentApp = Lists.immutable.ofAll(clusters) Optional<YarnApplication> currentApp = Lists.immutable.ofAll(clusters)
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.flatCollect(cluster -> yarnService.jobListEquals(cluster, name)) .flatCollect(cluster -> yarnService.jobListEquals(cluster, name))
.select(app -> ObjectUtil.equals(app.getState(), "RUNNING")) .select(app -> ObjectUtil.equals(app.getState(), "RUNNING"))
.toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP)) .toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP))
@@ -112,7 +110,7 @@ public class YarnController extends BaseController {
boolean isFilterNames = StrUtil.isNotBlank(names); boolean isFilterNames = StrUtil.isNotBlank(names);
ImmutableList<String> filterNames = Lists.immutable.of(names.split(",")); ImmutableList<String> filterNames = Lists.immutable.of(names.split(","));
ImmutableList<YarnClusterVO> results = Lists.immutable.ofAll(clusters) ImmutableList<YarnClusterVO> results = Lists.immutable.ofAll(clusters)
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(cluster -> { .collect(cluster -> {
YarnRootQueue root = yarnService.cluster(cluster); YarnRootQueue root = yarnService.cluster(cluster);
ImmutableList<YarnQueue> queues = yarnService.queueList(cluster).select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n))); ImmutableList<YarnQueue> queues = yarnService.queueList(cluster).select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n)));
@@ -126,7 +124,7 @@ public class YarnController extends BaseController {
@GetMapping("queue_names") @GetMapping("queue_names")
public AmisResponse queueNames(@RequestParam("clusters") List<String> clusters) { public AmisResponse queueNames(@RequestParam("clusters") List<String> clusters) {
ImmutableList<String> names = Lists.immutable.ofAll(clusters) ImmutableList<String> names = Lists.immutable.ofAll(clusters)
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.flatCollect(yarnService::queueList) .flatCollect(yarnService::queueList)
.collect(YarnQueue::getQueueName) .collect(YarnQueue::getQueueName)
.toList() .toList()
@@ -137,7 +135,7 @@ public class YarnController extends BaseController {
@GetMapping("clusters") @GetMapping("clusters")
public AmisResponse clusters(@RequestParam("clusters") List<String> clusters) { public AmisResponse clusters(@RequestParam("clusters") List<String> clusters) {
ImmutableList<YarnRootQueue> roots = (ImmutableList<YarnRootQueue>) Lists.immutable.ofAll(clusters) ImmutableList<YarnRootQueue> roots = (ImmutableList<YarnRootQueue>) Lists.immutable.ofAll(clusters)
.asParallel(EXECUTOR, 1) .asParallel(ExecutorProvider.EXECUTORS, 1)
.collect(yarnService::cluster) .collect(yarnService::cluster)
.toList() .toList()
.toImmutable(); .toImmutable();