From 8bfb274dde444cb382c93a224d7ef3f9e625fb79 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 31 May 2023 21:37:57 +0800 Subject: [PATCH] =?UTF-8?q?refactor(web):=20=E4=BC=98=E5=8C=96=20yarn=20?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E7=9A=84=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../forest/service/YarnClusterService.java | 40 ++++++------------- .../web/controller/YarnController.java | 30 ++++---------- 2 files changed, 21 insertions(+), 49 deletions(-) diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java index 0835c1b..5eee7ba 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java @@ -96,8 +96,8 @@ public class YarnClusterService { return Lists.immutable.empty(); } return clusters.collect(this::service) - .asParallel(EXECUTOR, 1) .reject(ObjectUtil::isNull) + .asParallel(EXECUTOR, 1) .flatCollect(getter::apply) .toList() .toImmutable(); @@ -113,19 +113,14 @@ public class YarnClusterService { return servicesMap.containsKey(cluster) ? service(cluster).queueList() : Lists.immutable.empty(); } - public ImmutableMap> queueList(ImmutableSet clusters) { + public ImmutableList queueList(ImmutableSet clusters) { + //noinspection DataFlowIssue return clusters .select(servicesMap::containsKey) .asParallel(EXECUTOR, 1) - .collect(cluster -> { - YarnService service = service(cluster); - @SuppressWarnings("DataFlowIssue") - ImmutableList queues = service.queueList(); - return new Pair<>(cluster, queues); - }) - .toMap(Pair::getKey, Pair::getValue) + .flatCollect(cluster -> service(cluster).queueList()) + .toList() .toImmutable(); - } @SuppressWarnings("DataFlowIssue") @@ -133,17 +128,13 @@ public class YarnClusterService { return servicesMap.containsKey(cluster) ? service(cluster).queueDetail(name) : null; } - public ImmutableMap queueDetail(ImmutableSet clusters, String name) { + public ImmutableList queueDetail(ImmutableSet clusters, String name) { + //noinspection DataFlowIssue return clusters .select(servicesMap::containsKey) .asParallel(EXECUTOR, 1) - .collect(cluster -> { - YarnService service = service(cluster); - @SuppressWarnings("DataFlowIssue") - YarnQueue queue = service.queueDetail(name); - return new Pair<>(cluster, queue); - }) - .toMap(Pair::getKey, Pair::getValue) + .collect(cluster -> service(cluster).queueDetail(name)) + .toList() .toImmutable(); } @@ -152,18 +143,13 @@ public class YarnClusterService { return servicesMap.containsKey(cluster) ? service(cluster).cluster() : null; } - public ImmutableMap cluster(ImmutableSet clusters) { + public ImmutableList cluster(ImmutableSet clusters) { + //noinspection DataFlowIssue return clusters .select(servicesMap::containsKey) .asParallel(EXECUTOR, 1) - .collect(cluster -> { - YarnService service = service(cluster); - @SuppressWarnings("DataFlowIssue") - YarnRootQueue queues = service.cluster(); - return new Pair<>(cluster, queues); - }) - .toMap(Pair::getKey, Pair::getValue) + .collect(cluster -> service(cluster).cluster()) + .toList() .toImmutable(); - } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java index 5617e0b..71ce288 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java @@ -8,7 +8,6 @@ import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; import com.lanyuanxiaoyao.service.forest.service.YarnClusterService; -import com.lanyuanxiaoyao.service.forest.service.YarnService; import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO; import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO; import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil; @@ -20,6 +19,7 @@ import java.util.concurrent.Executors; import java.util.function.Function; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.factory.Sets; import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.map.ImmutableMap; import org.slf4j.Logger; @@ -46,21 +46,10 @@ public class YarnController extends BaseController { "finishedTime", YarnApplication::getFinishedTime ); - private final ImmutableMap yarnServices; + private final YarnClusterService yarnClusterService; public YarnController(YarnClusterService yarnClusterService) { - this.yarnServices = yarnClusterService.servicesMap(); - } - - private YarnService getService(String cluster) { - if (!yarnServices.containsKey(cluster)) { - throw new RuntimeException(StrUtil.format("Cluster {} not found", cluster)); - } - return yarnServices.get(cluster); - } - - private ImmutableList getServices(List clusters) { - return Lists.immutable.ofAll(clusters).collect(this::getService); + this.yarnClusterService = yarnClusterService; } @GetMapping("job_list") @@ -81,9 +70,8 @@ public class YarnController extends BaseController { boolean isSearchId = StrUtil.isNotBlank(searchId); boolean isSearchName = StrUtil.isNotBlank(searchName); Comparator comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP); - ImmutableList applications = getServices(clusters) + ImmutableList applications = yarnClusterService.jobList(Sets.immutable.ofAll(clusters)) .asParallel(EXECUTOR, 1) - .flatCollect(YarnService::jobList) .select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState())) .select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus())) .select(app -> !isSearchId || (precise ? StrUtil.equals(app.getId(), searchId) : StrUtil.contains(app.getId(), searchId))) @@ -103,10 +91,8 @@ public class YarnController extends BaseController { @GetMapping("job_current") public AmisResponse jobCurrent(@RequestParam("clusters") List clusters, @RequestParam("name") String name) { - Optional currentApp = getServices(clusters) + Optional currentApp = yarnClusterService.jobListEquals(Sets.immutable.ofAll(clusters), name) .asParallel(EXECUTOR, 1) - .flatCollect(YarnService::jobList) - .select(app -> ObjectUtil.equals(app.getName(), name)) .select(app -> ObjectUtil.equals(app.getState(), "RUNNING")) .toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP)) .getFirstOptional(); @@ -123,7 +109,7 @@ public class YarnController extends BaseController { public AmisResponse queueList(@RequestParam("clusters") List clusters, @RequestParam(value = "names", defaultValue = "") String names) { boolean isFilterNames = StrUtil.isNotBlank(names); ImmutableList filterNames = Lists.immutable.of(names.split(",")); - ImmutableList results = getServices(clusters) + ImmutableList results = yarnClusterService.services() .asParallel(EXECUTOR, 1) .collect(yarnService -> { YarnRootQueue cluster = yarnService.cluster(); @@ -137,11 +123,11 @@ public class YarnController extends BaseController { @GetMapping("queue_names") public AmisResponse queueNames(@RequestParam("clusters") List clusters) { - return responseData(MapUtil.of("queueNames", getServices(clusters).flatCollect(YarnService::queueList).collect(YarnQueue::getQueueName))); + return responseData(MapUtil.of("queueNames", yarnClusterService.queueList(Sets.immutable.ofAll(clusters)).collect(YarnQueue::getQueueName))); } @GetMapping("clusters") public AmisResponse clusters(@RequestParam("clusters") List clusters) { - return responseData(MapUtil.of("cluster", getServices(clusters).collect(YarnService::cluster))); + return responseData(MapUtil.of("cluster", yarnClusterService.cluster(Sets.immutable.ofAll(clusters)))); } }