From 65eb82651f037f568a0ffc2840ca35c6e0e8d044 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 7 Jun 2023 16:42:05 +0800 Subject: [PATCH] =?UTF-8?q?refactor(web):=20=E4=BD=BF=E7=94=A8=E7=BB=9F?= =?UTF-8?q?=E4=B8=80=E7=9A=84=E7=BA=BF=E7=A8=8B=E6=B1=A0=E8=B5=84=E6=BA=90?= =?UTF-8?q?=EF=BC=8C=E9=98=B2=E6=AD=A2=E6=B5=AA=E8=B4=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../configuration/ExecutorProvider.java | 14 +++++++++ .../web/controller/FlinkController.java | 8 ++--- .../web/controller/RunningController.java | 9 ++---- .../web/controller/TableController.java | 31 +++++-------------- .../web/controller/YarnController.java | 14 ++++----- 5 files changed, 34 insertions(+), 42 deletions(-) create mode 100644 service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/ExecutorProvider.java diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/ExecutorProvider.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/ExecutorProvider.java new file mode 100644 index 0000000..3facb95 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/ExecutorProvider.java @@ -0,0 +1,14 @@ +package com.lanyuanxiaoyao.service.configuration; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 线程池 + * + * @author lanyuanxiaoyao + * @date 2023-06-07 + */ +public class ExecutorProvider { + public static final ExecutorService EXECUTORS = Executors.newWorkStealingPool(60); +} diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java index b919b78..52b0274 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java @@ -1,13 +1,12 @@ package com.lanyuanxiaoyao.service.web.controller; import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkCheckpoint; import com.lanyuanxiaoyao.service.forest.service.FlinkService; import com.lanyuanxiaoyao.service.web.entity.FlinkVertexVO; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +25,6 @@ import org.springframework.web.bind.annotation.RestController; @RequestMapping("flink") public class FlinkController extends BaseController { private static final Logger logger = LoggerFactory.getLogger(FlinkController.class); - private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); private final FlinkService flinkService; @@ -52,7 +50,7 @@ public class FlinkController extends BaseController { boolean isFilterTable = StrUtil.isNotBlank(table); ImmutableList vertexVOS = flinkService.vertexOverview(url) .getJobs() - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .collect(vertex -> flinkService.vertex(url, vertex.getJid())) .select(vertex -> !isFilterSchema || StrUtil.contains(vertex.getName(), schema)) .select(vertex -> isAllInOne || (!isFilterTable || StrUtil.contains(vertex.getName(), table))) @@ -68,7 +66,7 @@ public class FlinkController extends BaseController { .collect(child -> new FlinkVertexVO.FlinkVertexChildVO(child, StrUtil.format("{}/#/job/{}/overview/{}", url, vertex.getJid(), child.getJid()))); ImmutableList checkpoints = flinkService.checkpointOverview(url, vertex.getJid()) .getHistory() - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .collect(checkpoint -> flinkService.checkpoint(url, vertex.getJid(), checkpoint.getId())) .toSortedListBy(FlinkCheckpoint::getId) .reverseThis() diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/RunningController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/RunningController.java index 178dd96..e822a2c 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/RunningController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/RunningController.java @@ -4,6 +4,7 @@ import com.eshore.odcp.hudi.connector.entity.RunMeta; import com.eshore.odcp.hudi.connector.utils.NameHelper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode; import com.lanyuanxiaoyao.service.forest.service.ZookeeperService; @@ -15,9 +16,6 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - /** * 运行时监控 * @@ -28,7 +26,6 @@ import java.util.concurrent.Executors; @RequestMapping("running") public class RunningController extends BaseController { private static final Logger logger = LoggerFactory.getLogger(RunningController.class); - private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); private final ZookeeperService zookeeperService; private final ObjectMapper mapper; @@ -43,7 +40,7 @@ public class RunningController extends BaseController { public AmisResponse sync() { return responseCrudData( zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH) - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .collect(this::parseRunMeta) .toSortedListBy(ZookeeperNodeVO::getCreateTime) .toReversed() @@ -55,7 +52,7 @@ public class RunningController extends BaseController { public AmisResponse compaction() { return responseCrudData( zookeeperService.getChildren(NameHelper.ZK_COMPACTION_RUNNING_LOCK_PATH) - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .collect(this::parseRunMeta) .toSortedListBy(ZookeeperNodeVO::getCreateTime) .toReversed() diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java index cd50f1e..999f29b 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java @@ -8,6 +8,7 @@ import com.eshore.odcp.hudi.connector.entity.SyncState; import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.eshore.odcp.hudi.connector.utils.NameHelper; import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; @@ -18,10 +19,7 @@ import com.lanyuanxiaoyao.service.web.entity.TableVO; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.map.MutableMap; import org.slf4j.Logger; @@ -42,7 +40,6 @@ import org.springframework.web.bind.annotation.RestController; @RequestMapping("table") public class TableController extends BaseController { private static final Logger logger = LoggerFactory.getLogger(TableController.class); - private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(50); private final InfoService infoService; private final ZookeeperService zookeeperService; private final ObjectMapper mapper; @@ -65,19 +62,7 @@ public class TableController extends BaseController { @RequestParam(value = "filter_run_mode", required = false) List runMode, @RequestParam(value = "filter_compaction_status", required = false) List compactionStatus ) { - MutableMap queryMap = Maps.mutable.empty(); - queryMap.put("page", page); - queryMap.put("count", count); - if (StrUtil.isNotBlank(searchFlinkJobId)) { - queryMap.put("flink_job_id", searchFlinkJobId); - } - if (StrUtil.isNotBlank(searchAlias)) { - queryMap.put("alias", searchAlias); - } - if (StrUtil.isNotBlank(order) && StrUtil.isNotBlank(direction)) { - queryMap.put("order", order); - queryMap.put("direction", direction); - } + MutableMap queryMap = buildQueryMap(page, count, order, direction, searchFlinkJobId, searchAlias); if (ObjectUtil.isNotEmpty(runMode)) { queryMap.put("filter_run_mode", runMode); } @@ -87,13 +72,13 @@ public class TableController extends BaseController { PageResponse pageResponse = infoService.jobIdAndAlias(queryMap); Long total = pageResponse.getTotal(); ImmutableList tableVOS = Lists.immutable.ofAll(pageResponse.getData()) - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .collect(item -> { - CompletableFuture flinkJobFuture = CompletableFuture.supplyAsync(() -> infoService.flinkJobDetail(item.getId()), EXECUTOR); - CompletableFuture tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), EXECUTOR); - CompletableFuture syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), EXECUTOR); + CompletableFuture flinkJobFuture = CompletableFuture.supplyAsync(() -> infoService.flinkJobDetail(item.getId()), ExecutorProvider.EXECUTORS); + CompletableFuture tableMetaFuture = CompletableFuture.supplyAsync(() -> infoService.tableMetaDetail(item.getId(), item.getAlias()), ExecutorProvider.EXECUTORS); + CompletableFuture syncStateFuture = CompletableFuture.supplyAsync(() -> infoService.syncStateDetail(item.getId(), item.getAlias()), ExecutorProvider.EXECUTORS); CompletableFuture syncRuntime = CompletableFuture - .supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId(), item.getAlias())), EXECUTOR) + .supplyAsync(() -> zookeeperService.existsPath(NameHelper.syncRunningLockPath(item.getId(), item.getAlias())), ExecutorProvider.EXECUTORS) .thenApply(running -> { if (running) { try { @@ -108,7 +93,7 @@ public class TableController extends BaseController { return null; }); CompletableFuture compactionRuntime = CompletableFuture - .supplyAsync(() -> zookeeperService.existsPath(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())), EXECUTOR) + .supplyAsync(() -> zookeeperService.existsPath(NameHelper.compactionRunningLockPath(item.getId(), item.getAlias())), ExecutorProvider.EXECUTORS) .thenApply(running -> { if (running) { try { diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java index e8a9a06..d84ce96 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.web.controller; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; @@ -14,8 +15,6 @@ import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil; import java.util.Comparator; import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Function; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Maps; @@ -38,7 +37,6 @@ import org.springframework.web.bind.annotation.RestController; @RequestMapping("yarn") public class YarnController extends BaseController { private static final Logger logger = LoggerFactory.getLogger(YarnController.class); - private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); private static final ImmutableMap> SORT_MAP = Maps.immutable.of( "startedTime", YarnApplication::getStartedTime, @@ -71,7 +69,7 @@ public class YarnController extends BaseController { boolean isSearchName = StrUtil.isNotBlank(searchName); Comparator comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP); ImmutableList applications = Lists.immutable.ofAll(clusters) - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .flatCollect(yarnService::jobList) .select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState())) .select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus())) @@ -93,7 +91,7 @@ public class YarnController extends BaseController { @GetMapping("job_current") public AmisResponse jobCurrent(@RequestParam("clusters") List clusters, @RequestParam("name") String name) { Optional currentApp = Lists.immutable.ofAll(clusters) - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .flatCollect(cluster -> yarnService.jobListEquals(cluster, name)) .select(app -> ObjectUtil.equals(app.getState(), "RUNNING")) .toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP)) @@ -112,7 +110,7 @@ public class YarnController extends BaseController { boolean isFilterNames = StrUtil.isNotBlank(names); ImmutableList filterNames = Lists.immutable.of(names.split(",")); ImmutableList results = Lists.immutable.ofAll(clusters) - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .collect(cluster -> { YarnRootQueue root = yarnService.cluster(cluster); ImmutableList queues = yarnService.queueList(cluster).select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n))); @@ -126,7 +124,7 @@ public class YarnController extends BaseController { @GetMapping("queue_names") public AmisResponse queueNames(@RequestParam("clusters") List clusters) { ImmutableList names = Lists.immutable.ofAll(clusters) - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .flatCollect(yarnService::queueList) .collect(YarnQueue::getQueueName) .toList() @@ -137,7 +135,7 @@ public class YarnController extends BaseController { @GetMapping("clusters") public AmisResponse clusters(@RequestParam("clusters") List clusters) { ImmutableList roots = (ImmutableList) Lists.immutable.ofAll(clusters) - .asParallel(EXECUTOR, 1) + .asParallel(ExecutorProvider.EXECUTORS, 1) .collect(yarnService::cluster) .toList() .toImmutable();