From cd676367c667f7aac3bb6af251ce2ca1cdc97c38 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Fri, 2 Jun 2023 09:09:26 +0800 Subject: [PATCH] =?UTF-8?q?feature(yarn-query):=20=E4=BC=98=E5=8C=96=20yar?= =?UTF-8?q?n-query=20=E4=B8=BA=E9=9B=86=E7=BE=A4=E5=85=B1=E4=BA=AB?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 原本一个 yarn-query 对应一个集群,集群多了之后,比较浪费,改为一个组件可以查询任意集群的信息,减少部署的复杂性 --- .../src/main/resources/application.yml | 40 +---- .../service/forest/service/YarnA4Service.java | 13 -- .../service/forest/service/YarnB1Service.java | 13 -- .../service/forest/service/YarnB4Service.java | 13 -- .../service/forest/service/YarnB5Service.java | 13 -- .../forest/service/YarnB5SyncService.java | 13 -- .../forest/service/YarnClusterService.java | 158 ------------------ .../service/forest/service/YarnService.java | 19 +-- .../web/controller/YarnController.java | 40 +++-- .../yarn/configuration/YarnConfiguration.java | 28 ++-- .../yarn/controller/ClusterController.java | 5 +- .../yarn/controller/JobController.java | 21 +-- .../yarn/controller/QueueController.java | 8 +- .../service/yarn/service/ClusterService.java | 2 +- .../service/yarn/service/JobService.java | 10 +- .../service/yarn/service/QueueService.java | 4 +- .../yarn/service/impl/ClusterServiceImpl.java | 9 +- .../impl/JobAutoRefreshServiceImpl.java | 74 -------- .../yarn/service/impl/JobServiceImpl.java | 32 ++-- .../impl/QueueAutoRefreshServiceImpl.java | 67 -------- .../yarn/service/impl/QueueServiceImpl.java | 14 +- .../src/main/resources/application.yml | 7 + 22 files changed, 112 insertions(+), 491 deletions(-) delete mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnA4Service.java delete mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB1Service.java delete mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB4Service.java delete mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5Service.java delete mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5SyncService.java delete mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java delete mode 100644 service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobAutoRefreshServiceImpl.java delete mode 100644 service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueAutoRefreshServiceImpl.java diff --git a/service-cli/service-cli-runner/src/main/resources/application.yml b/service-cli/service-cli-runner/src/main/resources/application.yml index ae5338b..895f1bf 100644 --- a/service-cli/service-cli-runner/src/main/resources/application.yml +++ b/service-cli/service-cli-runner/src/main/resources/application.yml @@ -101,41 +101,15 @@ deploy: replicas: 3 arguments: loki_host: ${deploy.runtime.loki-base-url} - - name: service-yarn-query-a4 + - name: service-yarn-query source-jar: service-yarn-query-1.0.0-SNAPSHOT.jar - replicas: 4 + replicas: 10 arguments: - yarn_cluster: a4 - yarn_web-url: http://132.121.107.91:8088 - spring_application_name: service-yarn-query-a4 - - name: service-yarn-query-b1 - source-jar: service-yarn-query-1.0.0-SNAPSHOT.jar - replicas: 4 - arguments: - yarn_cluster: b1 - yarn_web-url: http://132.122.98.13:8088 - spring_application_name: service-yarn-query-b1 - - name: service-yarn-query-b4 - source-jar: service-yarn-query-1.0.0-SNAPSHOT.jar - replicas: 4 - arguments: - yarn_cluster: b4 - yarn_web-url: http://132.122.112.30:8088 - spring_application_name: service-yarn-query-b4 - - name: service-yarn-query-b5 - source-jar: service-yarn-query-1.0.0-SNAPSHOT.jar - replicas: 4 - arguments: - yarn_cluster: b5 - yarn_web-url: http://132.122.116.12:8088 - spring_application_name: service-yarn-query-b5 - - name: service-yarn-query-b5-sync - source-jar: service-yarn-query-1.0.0-SNAPSHOT.jar - replicas: 4 - arguments: - yarn_cluster: b5-sync - yarn_web-url: http://132.122.116.143:8088 - spring_application_name: service-yarn-query-b5-sync + yarn_web-urls_a4: http://132.121.107.91:8088 + yarn_web-urls_b1: http://132.122.98.13:8088 + yarn_web-urls_b4: http://132.122.112.30:8088 + yarn_web-urls_b5: http://132.122.116.12:8088 + yarn_web-urls_b5-sync: http://132.122.116.143:8088 - name: service-pulsar-query source-jar: service-pulsar-query-1.0.0-SNAPSHOT.jar replicas: 4 diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnA4Service.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnA4Service.java deleted file mode 100644 index eb254e7..0000000 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnA4Service.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.lanyuanxiaoyao.service.forest.service; - -import com.dtflys.forest.annotation.BaseRequest; - -/** - * Yarn 信息 - * - * @author lanyuanxiaoyao - * @date 2023-04-21 - */ -@BaseRequest(baseURL = "http://service-yarn-query-a4") -public interface YarnA4Service extends YarnService { -} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB1Service.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB1Service.java deleted file mode 100644 index c98daf9..0000000 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB1Service.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.lanyuanxiaoyao.service.forest.service; - -import com.dtflys.forest.annotation.BaseRequest; - -/** - * Yarn 信息 - * - * @author lanyuanxiaoyao - * @date 2023-04-21 - */ -@BaseRequest(baseURL = "http://service-yarn-query-b1") -public interface YarnB1Service extends YarnService { -} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB4Service.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB4Service.java deleted file mode 100644 index f8d1630..0000000 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB4Service.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.lanyuanxiaoyao.service.forest.service; - -import com.dtflys.forest.annotation.BaseRequest; - -/** - * Yarn 信息 - * - * @author lanyuanxiaoyao - * @date 2023-04-21 - */ -@BaseRequest(baseURL = "http://service-yarn-query-b4") -public interface YarnB4Service extends YarnService { -} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5Service.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5Service.java deleted file mode 100644 index 824fec4..0000000 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5Service.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.lanyuanxiaoyao.service.forest.service; - -import com.dtflys.forest.annotation.BaseRequest; - -/** - * Yarn 信息 - * - * @author lanyuanxiaoyao - * @date 2023-04-21 - */ -@BaseRequest(baseURL = "http://service-yarn-query-b5") -public interface YarnB5Service extends YarnService { -} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5SyncService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5SyncService.java deleted file mode 100644 index f49d98c..0000000 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5SyncService.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.lanyuanxiaoyao.service.forest.service; - -import com.dtflys.forest.annotation.BaseRequest; - -/** - * Yarn 信息 - * - * @author lanyuanxiaoyao - * @date 2023-04-21 - */ -@BaseRequest(baseURL = "http://service-yarn-query-b5-sync") -public interface YarnB5SyncService extends YarnService { -} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java deleted file mode 100644 index dade157..0000000 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java +++ /dev/null @@ -1,158 +0,0 @@ -package com.lanyuanxiaoyao.service.forest.service; - -import cn.hutool.core.util.ObjectUtil; -import com.eshore.odcp.hudi.connector.Constants; -import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; -import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; -import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.Function; -import javax.annotation.Nullable; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.factory.Maps; -import org.eclipse.collections.api.list.ImmutableList; -import org.eclipse.collections.api.map.ImmutableMap; -import org.eclipse.collections.api.map.MutableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -/** - * 聚合 Yarn 查询 - * - * @author lanyuanxiaoyao - * @date 2023-05-11 - */ -@Service -public class YarnClusterService { - private static final Logger logger = LoggerFactory.getLogger(YarnClusterService.class); - private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); - - private final ImmutableMap servicesMap; - - @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public YarnClusterService( - YarnB1Service yarnB1Service, - YarnB5Service yarnB5Service, - YarnB5SyncService yarnB5SyncService, - YarnB4Service yarnB4Service, - YarnA4Service yarnA4Service - ) { - MutableMap servicesMap = Maps.mutable.empty(); - servicesMap.put(Constants.CLUSTER_B1, yarnB1Service); - servicesMap.put(Constants.CLUSTER_B5, yarnB5Service); - servicesMap.put(Constants.CLUSTER_B5_SYNC, yarnB5SyncService); - servicesMap.put(Constants.CLUSTER_B4, yarnB4Service); - servicesMap.put(Constants.CLUSTER_A4, yarnA4Service); - this.servicesMap = servicesMap.toImmutable(); - } - - public ImmutableList clusters() { - return servicesMap.keysView().toList().toImmutable(); - } - - public ImmutableList services() { - return servicesMap.valuesView().toList().toImmutable(); - } - - public ImmutableList services(ImmutableList clusters) { - return clusters.collect(this::service); - } - - public ImmutableMap servicesMap() { - return servicesMap; - } - - public @Nullable YarnService service(String cluster) { - return servicesMap.get(cluster); - } - - public ImmutableList jobList(String cluster) { - return jobList(Lists.immutable.of(cluster)); - } - - public ImmutableList jobList(ImmutableList clusters) { - return list(clusters, YarnService::jobList); - } - - public ImmutableList jobListEquals(String cluster, String name) { - return jobListEquals(Lists.immutable.of(cluster), name); - } - - public ImmutableList jobListEquals(ImmutableList clusters, String name) { - return list(clusters, yarnService -> yarnService.jobListEquals(name)); - } - - public ImmutableList jobListLike(String cluster, String text) { - return jobListLike(Lists.immutable.of(cluster), text); - } - - public ImmutableList jobListLike(ImmutableList clusters, String text) { - return list(clusters, yarnService -> yarnService.jobListLike(text)); - } - - private ImmutableList list(ImmutableList clusters, Function> getter) { - if (ObjectUtil.isEmpty(clusters)) { - return Lists.immutable.empty(); - } - return clusters.toList() - .collect(this::service) - .reject(Objects::isNull) - .asParallel(EXECUTOR, 1) - .flatCollect(getter::apply) - .toList() - .toImmutable(); - } - - public YarnApplication jobDetail(String cluster, String applicationId) { - YarnService service = service(cluster); - return ObjectUtil.isNull(service) ? null : service.jobDetail(applicationId); - } - - @SuppressWarnings("DataFlowIssue") - public ImmutableList queueList(String cluster) { - return servicesMap.containsKey(cluster) ? service(cluster).queueList() : Lists.immutable.empty(); - } - - public ImmutableList queueList(ImmutableList clusters) { - //noinspection DataFlowIssue - return clusters.toList() - .select(servicesMap::containsKey) - .asParallel(EXECUTOR, 1) - .flatCollect(cluster -> service(cluster).queueList()) - .toList() - .toImmutable(); - } - - @SuppressWarnings("DataFlowIssue") - public YarnQueue queueDetail(String cluster, String name) { - return servicesMap.containsKey(cluster) ? service(cluster).queueDetail(name) : null; - } - - public ImmutableList queueDetail(ImmutableList clusters, String name) { - //noinspection DataFlowIssue - return clusters.toList() - .select(servicesMap::containsKey) - .asParallel(EXECUTOR, 1) - .collect(cluster -> service(cluster).queueDetail(name)) - .toList() - .toImmutable(); - } - - @SuppressWarnings("DataFlowIssue") - public YarnRootQueue cluster(String cluster) { - return servicesMap.containsKey(cluster) ? service(cluster).cluster() : null; - } - - public ImmutableList cluster(ImmutableList clusters) { - //noinspection DataFlowIssue - return clusters.toList() - .select(servicesMap::containsKey) - .asParallel(EXECUTOR, 1) - .collect(cluster -> service(cluster).cluster()) - .toList() - .toImmutable(); - } -} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnService.java index f602bdb..7f07af9 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnService.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.forest.service; +import com.dtflys.forest.annotation.BaseRequest; import com.dtflys.forest.annotation.Get; import com.dtflys.forest.annotation.Query; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; @@ -11,28 +12,26 @@ import org.eclipse.collections.api.list.ImmutableList; * @author lanyuanxiaoyao * @date 2023-04-23 */ +@BaseRequest(baseURL = "http://service-yarn-query") public interface YarnService { @Get("/job/list") - ImmutableList jobList(); + ImmutableList jobList(@Query("cluster") String cluster); @Get("/job/list") - ImmutableList jobListEquals(@Query("name") String name); + ImmutableList jobListEquals(@Query("cluster") String cluster, @Query("name") String name); @Get("/job/list") - ImmutableList jobListLike(@Query("text") String text); - - @Get("/job/kill") - void jobKill(@Query("application_id") String applicationId); + ImmutableList jobListLike(@Query("cluster") String cluster, @Query("text") String text); @Get("/job/detail") - YarnApplication jobDetail(@Query("application_id") String applicationId); + YarnApplication jobDetail(@Query("cluster") String cluster, @Query("application_id") String applicationId); @Get("/queue/list") - ImmutableList queueList(); + ImmutableList queueList(@Query("cluster") String cluster); @Get("/queue/detail") - YarnQueue queueDetail(@Query("name") String name); + YarnQueue queueDetail(@Query("cluster") String cluster, @Query("name") String name); @Get("/cluster/info") - YarnRootQueue cluster(); + YarnRootQueue cluster(@Query("cluster") String cluster); } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java index 168b10a..e8a9a06 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java @@ -7,7 +7,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; -import com.lanyuanxiaoyao.service.forest.service.YarnClusterService; +import com.lanyuanxiaoyao.service.forest.service.YarnService; import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO; import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO; import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil; @@ -45,10 +45,11 @@ public class YarnController extends BaseController { "finishedTime", YarnApplication::getFinishedTime ); - private final YarnClusterService yarnClusterService; + private final YarnService yarnService; - public YarnController(YarnClusterService yarnClusterService) { - this.yarnClusterService = yarnClusterService; + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public YarnController(YarnService yarnService) { + this.yarnService = yarnService; } @GetMapping("job_list") @@ -69,8 +70,9 @@ public class YarnController extends BaseController { boolean isSearchId = StrUtil.isNotBlank(searchId); boolean isSearchName = StrUtil.isNotBlank(searchName); Comparator comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP); - ImmutableList applications = yarnClusterService.jobList(Lists.immutable.ofAll(clusters)) + ImmutableList applications = Lists.immutable.ofAll(clusters) .asParallel(EXECUTOR, 1) + .flatCollect(yarnService::jobList) .select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState())) .select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus())) .select(app -> !isSearchId || (precise ? StrUtil.equals(app.getId(), searchId) : StrUtil.contains(app.getId(), searchId))) @@ -90,8 +92,9 @@ public class YarnController extends BaseController { @GetMapping("job_current") public AmisResponse jobCurrent(@RequestParam("clusters") List clusters, @RequestParam("name") String name) { - Optional currentApp = yarnClusterService.jobListEquals(Lists.immutable.ofAll(clusters), name) + Optional currentApp = Lists.immutable.ofAll(clusters) .asParallel(EXECUTOR, 1) + .flatCollect(cluster -> yarnService.jobListEquals(cluster, name)) .select(app -> ObjectUtil.equals(app.getState(), "RUNNING")) .toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP)) .getFirstOptional(); @@ -108,12 +111,12 @@ public class YarnController extends BaseController { public AmisResponse queueList(@RequestParam("clusters") List clusters, @RequestParam(value = "names", defaultValue = "") String names) { boolean isFilterNames = StrUtil.isNotBlank(names); ImmutableList filterNames = Lists.immutable.of(names.split(",")); - ImmutableList results = yarnClusterService.services(Lists.immutable.ofAll(clusters)) + ImmutableList results = Lists.immutable.ofAll(clusters) .asParallel(EXECUTOR, 1) - .collect(yarnService -> { - YarnRootQueue cluster = yarnService.cluster(); - ImmutableList queues = yarnService.queueList().select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n))); - return new YarnClusterVO(cluster, queues); + .collect(cluster -> { + YarnRootQueue root = yarnService.cluster(cluster); + ImmutableList queues = yarnService.queueList(cluster).select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n))); + return new YarnClusterVO(root, queues); }) .toList() .toImmutable(); @@ -122,11 +125,22 @@ public class YarnController extends BaseController { @GetMapping("queue_names") public AmisResponse queueNames(@RequestParam("clusters") List clusters) { - return responseData(MapUtil.of("queueNames", yarnClusterService.queueList(Lists.immutable.ofAll(clusters)).collect(YarnQueue::getQueueName))); + ImmutableList names = Lists.immutable.ofAll(clusters) + .asParallel(EXECUTOR, 1) + .flatCollect(yarnService::queueList) + .collect(YarnQueue::getQueueName) + .toList() + .toImmutable(); + return responseData(MapUtil.of("queueNames", names)); } @GetMapping("clusters") public AmisResponse clusters(@RequestParam("clusters") List clusters) { - return responseData(MapUtil.of("cluster", yarnClusterService.cluster(Lists.immutable.ofAll(clusters)))); + ImmutableList roots = (ImmutableList) Lists.immutable.ofAll(clusters) + .asParallel(EXECUTOR, 1) + .collect(yarnService::cluster) + .toList() + .toImmutable(); + return responseData(MapUtil.of("cluster", clusters)); } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/configuration/YarnConfiguration.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/configuration/YarnConfiguration.java index ebabf14..2ada95f 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/configuration/YarnConfiguration.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/configuration/YarnConfiguration.java @@ -1,5 +1,8 @@ package com.lanyuanxiaoyao.service.yarn.configuration; +import java.util.Map; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.map.ImmutableMap; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @@ -12,30 +15,27 @@ import org.springframework.stereotype.Component; @Component @ConfigurationProperties("yarn") public class YarnConfiguration { - private String cluster; - private String webUrl; + private ImmutableMap webUrls; - public String getCluster() { - return cluster; + public ImmutableMap getWebUrls() { + return webUrls; } - public void setCluster(String cluster) { - this.cluster = cluster; + public String getWebUrl(String cluster) { + if (!webUrls.containsKey(cluster)) { + throw new RuntimeException("Cluster not found"); + } + return webUrls.get(cluster); } - public String getWebUrl() { - return webUrl; - } - - public void setWebUrl(String webUrl) { - this.webUrl = webUrl; + public void setWebUrls(Map webUrls) { + this.webUrls = Maps.immutable.ofAll(webUrls); } @Override public String toString() { return "YarnConfiguration{" + - "cluster='" + cluster + '\'' + - ", webUrl='" + webUrl + '\'' + + "clusters=" + webUrls + '}'; } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/ClusterController.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/ClusterController.java index e18d3a1..6befcba 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/ClusterController.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/ClusterController.java @@ -7,6 +7,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -27,7 +28,7 @@ public class ClusterController { } @GetMapping("info") - public YarnRootQueue info() throws JsonProcessingException { - return clusterService.info(); + public YarnRootQueue info(@RequestParam("cluster") String cluster) throws JsonProcessingException { + return clusterService.info(cluster); } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/JobController.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/JobController.java index 85215a2..3c9bb24 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/JobController.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/JobController.java @@ -30,27 +30,22 @@ public class JobController { } @GetMapping("list") - public ImmutableList list() throws JsonProcessingException { - return jobService.list(); + public ImmutableList list(@RequestParam("cluster") String cluster) throws JsonProcessingException { + return jobService.list(cluster); } @GetMapping("list_equals") - public ImmutableList listEquals(@RequestParam("name") String name) throws JsonProcessingException { - return jobService.listEquals(name); + public ImmutableList listEquals(@RequestParam("cluster") String cluster, @RequestParam("name") String name) throws JsonProcessingException { + return jobService.listEquals(cluster, name); } @GetMapping("list_like") - public ImmutableList listLike(@RequestParam("text") String text) throws JsonProcessingException { - return jobService.listLike(text); - } - - @GetMapping("kill") - public void kill(@RequestParam("application_id") String applicationId) { - jobService.kill(applicationId); + public ImmutableList listLike(@RequestParam("cluster") String cluster, @RequestParam("text") String text) throws JsonProcessingException { + return jobService.listLike(cluster, text); } @GetMapping("detail") - public YarnApplication detail(@RequestParam("application_id") String applicationId) throws Exception { - return jobService.detail(applicationId); + public YarnApplication detail(@RequestParam("cluster") String cluster, @RequestParam("application_id") String applicationId) throws Exception { + return jobService.detail(cluster, applicationId); } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/QueueController.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/QueueController.java index 1464d2d..b34ccb0 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/QueueController.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/controller/QueueController.java @@ -30,12 +30,12 @@ public class QueueController { } @GetMapping("list") - public ImmutableList list() throws JsonProcessingException { - return queueService.list(); + public ImmutableList list(@RequestParam("cluster") String cluster) throws JsonProcessingException { + return queueService.list(cluster); } @GetMapping("detail") - public YarnQueue detail(@RequestParam("name") String name) throws Exception { - return queueService.detail(name); + public YarnQueue detail(@RequestParam("cluster") String cluster, @RequestParam("name") String name) throws Exception { + return queueService.detail(cluster, name); } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/ClusterService.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/ClusterService.java index d1a3f54..c089a37 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/ClusterService.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/ClusterService.java @@ -10,5 +10,5 @@ import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; * @date 2023-04-23 */ public interface ClusterService { - YarnRootQueue info() throws JsonProcessingException; + YarnRootQueue info(String name) throws JsonProcessingException; } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/JobService.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/JobService.java index a078655..2278f00 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/JobService.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/JobService.java @@ -11,13 +11,11 @@ import org.eclipse.collections.api.list.ImmutableList; * @date 2023-04-23 */ public interface JobService { - ImmutableList list() throws JsonProcessingException; + ImmutableList list(String cluster) throws JsonProcessingException; - ImmutableList listEquals(String name) throws JsonProcessingException; + ImmutableList listEquals(String cluster, String name) throws JsonProcessingException; - ImmutableList listLike(String text) throws JsonProcessingException; + ImmutableList listLike(String cluster, String text) throws JsonProcessingException; - void kill(String applicationId); - - YarnApplication detail(String applicationId) throws Exception; + YarnApplication detail(String cluster, String applicationId) throws Exception; } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/QueueService.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/QueueService.java index 6c9aad1..d92f69f 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/QueueService.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/QueueService.java @@ -18,9 +18,9 @@ import org.eclipse.collections.api.list.MutableList; * @date 2023-04-23 */ public interface QueueService { - ImmutableList list() throws JsonProcessingException; + ImmutableList list(String cluster) throws JsonProcessingException; - YarnQueue detail(String name) throws Exception; + YarnQueue detail(String cluster, String name) throws Exception; default Function> flatChildren() { return queue -> { diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/ClusterServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/ClusterServiceImpl.java index 0aa34fc..a5d2c44 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/ClusterServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/ClusterServiceImpl.java @@ -39,11 +39,12 @@ public class ClusterServiceImpl implements ClusterService { @Cacheable(value = "cluster-info", sync = true) @Retryable(Throwable.class) @Override - public YarnRootQueue info() throws JsonProcessingException { - String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler"); + public YarnRootQueue info(String cluster) throws JsonProcessingException { + String webUrl = yarnConfiguration.getWebUrl(cluster); + String queryUrl = URLUtil.completeUrl(webUrl, "/ws/v1/cluster/scheduler"); String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); return mapper.readValue(body, ClusterInfoResponse.class).getScheduler().getSchedulerInfo() - .setCluster(yarnConfiguration.getCluster()) - .setWebUrl(URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/cluster/scheduler")); + .setCluster(cluster) + .setWebUrl(URLUtil.completeUrl(webUrl, "/cluster/scheduler")); } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobAutoRefreshServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobAutoRefreshServiceImpl.java deleted file mode 100644 index c7b4b8e..0000000 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobAutoRefreshServiceImpl.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.lanyuanxiaoyao.service.yarn.service.impl; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.core.util.URLUtil; -import cn.hutool.http.HttpUtil; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; -import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration; -import com.lanyuanxiaoyao.service.yarn.response.ApplicationsListResponse; -import com.lanyuanxiaoyao.service.yarn.service.JobService; -import java.util.concurrent.atomic.AtomicReference; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.list.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; - -/** - * Job - * - * @author lanyuanxiaoyao - * @date 2023-04-23 - */ -@Service("JobAutoRefreshService") -public class JobAutoRefreshServiceImpl implements JobService { - private static final Logger logger = LoggerFactory.getLogger(JobAutoRefreshServiceImpl.class); - - private final ObjectMapper mapper; - private final YarnConfiguration yarnConfiguration; - - public JobAutoRefreshServiceImpl( YarnConfiguration yarnConfiguration, Jackson2ObjectMapperBuilder builder) { - this.yarnConfiguration = yarnConfiguration; - - this.mapper = builder.build(); - } - - private static final AtomicReference> CACHE = new AtomicReference<>(Lists.immutable.empty()); - - @Scheduled(fixedRate = 50000) - public void refresh() throws JsonProcessingException { - String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps"); - String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); - ImmutableList apps = mapper.readValue(body, ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(yarnConfiguration.getCluster())); - CACHE.set(apps); - } - - @Override - public ImmutableList list() { - return CACHE.get(); - } - - @Override - public ImmutableList listEquals(String name) { - return list().select(app -> StrUtil.equals(app.getName(), name)); - } - - @Override - public ImmutableList listLike(String name) { - return list().select(app -> StrUtil.contains(app.getName(), name)); - } - - @Override - public void kill(String applicationId) { - } - - @Override - public YarnApplication detail(String applicationId) throws Exception { - return list().select(app -> StrUtil.equals(app.getId(), applicationId)) - .getFirstOptional().orElseThrow(() -> new Exception("cannot found " + applicationId)); - } -} diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobServiceImpl.java index 3bfefe8..1c4fb6c 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobServiceImpl.java @@ -40,41 +40,37 @@ public class JobServiceImpl implements JobService { this.mapper = builder.build(); } - @Cacheable(value = "job-list", sync = true) + @Cacheable(value = "job-list", sync = true, key = "#cluster") @Retryable(Throwable.class) @Override - public ImmutableList list() throws JsonProcessingException { - String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps"); + public ImmutableList list(String cluster) throws JsonProcessingException { + String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/apps"); try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) { - return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(yarnConfiguration.getCluster())); + return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(cluster)); } } - @Cacheable(value = "job-list", sync = true, key = "#{methodName+name}") + @Cacheable(value = "job-list", sync = true, key = "#{methodName+cluster+name}") @Retryable(Throwable.class) @Override - public ImmutableList listEquals(String name) throws JsonProcessingException { - return list().select(app -> StrUtil.equals(app.getName(), name)); + public ImmutableList listEquals(String cluster, String name) throws JsonProcessingException { + return list(cluster).select(app -> StrUtil.equals(app.getName(), name)); } - @Cacheable(value = "job-list", sync = true, key = "#{methodName+name}") + @Cacheable(value = "job-list", sync = true, key = "#{methodName+cluster+name}") @Retryable(Throwable.class) @Override - public ImmutableList listLike(String name) throws JsonProcessingException { - return list().select(app -> StrUtil.contains(app.getName(), name)); + public ImmutableList listLike(String cluster, String name) throws JsonProcessingException { + return list(cluster).select(app -> StrUtil.contains(app.getName(), name)); } - @Override - public void kill(String applicationId) { - } - - @Cacheable(value = "job-detail", sync = true, key = "#applicationId") + @Cacheable(value = "job-detail", sync = true) @Retryable(Throwable.class) @Override - public YarnApplication detail(String applicationId) throws JsonProcessingException { - String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps/" + applicationId); + public YarnApplication detail(String cluster, String applicationId) throws JsonProcessingException { + String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/apps/" + applicationId); try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) { - return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp().setCluster(yarnConfiguration.getCluster()); + return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp().setCluster(cluster); } } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueAutoRefreshServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueAutoRefreshServiceImpl.java deleted file mode 100644 index a9ad008..0000000 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueAutoRefreshServiceImpl.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.lanyuanxiaoyao.service.yarn.service.impl; - -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.core.util.URLUtil; -import cn.hutool.http.HttpUtil; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; -import com.lanyuanxiaoyao.service.yarn.configuration.YarnConfiguration; -import com.lanyuanxiaoyao.service.yarn.response.QueueListResponse; -import com.lanyuanxiaoyao.service.yarn.service.QueueService; -import java.util.concurrent.atomic.AtomicReference; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.list.ImmutableList; -import org.eclipse.collections.api.list.MutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; - -/** - * Queue - * - * @author lanyuanxiaoyao - * @date 2023-04-23 - */ -@Service("QueueAutoRefreshService") -public class QueueAutoRefreshServiceImpl implements QueueService { - private static final Logger logger = LoggerFactory.getLogger(QueueAutoRefreshServiceImpl.class); - - private final ObjectMapper mapper; - private final YarnConfiguration yarnConfiguration; - - public QueueAutoRefreshServiceImpl(YarnConfiguration yarnConfiguration, Jackson2ObjectMapperBuilder builder) { - this.yarnConfiguration = yarnConfiguration; - - this.mapper = builder.build(); - } - - private static final AtomicReference> CACHE = new AtomicReference<>(Lists.immutable.empty()); - - @Scheduled(fixedRate = 50000) - public void refresh() throws JsonProcessingException { - String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler"); - String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); - QueueListResponse response = mapper.readValue(body, QueueListResponse.class); - ImmutableList queues = response.getScheduler().getSchedulerInfo().getQueues().getQueue() - .flatCollect(flatChildren()) - .tap(q -> q.setCluster(yarnConfiguration.getCluster())); - CACHE.set(queues); - } - - @Override - public ImmutableList list() throws JsonProcessingException { - return CACHE.get(); - } - - @Override - public YarnQueue detail(String name) throws Exception { - return list() - .select(q -> StrUtil.equals(q.getQueueName(), name)) - .getFirstOptional() - .orElseThrow(() -> new Exception("cannot found " + name)); - } -} diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java index 5b76547..1277889 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java @@ -41,23 +41,23 @@ public class QueueServiceImpl implements QueueService { @Cacheable(value = "queue-list", sync = true) @Retryable(Throwable.class) @Override - public ImmutableList list() throws JsonProcessingException { - String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler"); + public ImmutableList list(String cluster) throws JsonProcessingException { + String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(cluster), "/ws/v1/cluster/scheduler"); String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); QueueListResponse response = mapper.readValue(body, QueueListResponse.class); return response.getScheduler().getSchedulerInfo().getQueues().getQueue() .flatCollect(flatChildren()) - .tap(q -> q.setCluster(yarnConfiguration.getCluster())); + .tap(q -> q.setCluster(cluster)); } - @Cacheable(value = "queue-detail", sync = true, key = "#name") + @Cacheable(value = "queue-detail", sync = true) @Retryable(Throwable.class) @Override - public YarnQueue detail(String name) throws Exception { - return list() + public YarnQueue detail(String cluster, String name) throws Exception { + return list(cluster) .select(q -> StrUtil.equals(q.getQueueName(), name)) .getFirstOptional() .orElseThrow(() -> new Exception("cannot found " + name)) - .setCluster(yarnConfiguration.getCluster()); + .setCluster(cluster); } } diff --git a/service-yarn-query/src/main/resources/application.yml b/service-yarn-query/src/main/resources/application.yml index 7f61bb9..0ac8a9e 100644 --- a/service-yarn-query/src/main/resources/application.yml +++ b/service-yarn-query/src/main/resources/application.yml @@ -3,3 +3,10 @@ spring: name: service-yarn-query profiles: include: random-port,common,eureka,metrics +yarn: + web-urls: + a4: http://132.121.107.91:8088 + b1: http://132.122.98.13:8088 + b4: http://132.122.112.30:8088 + b5: http://132.122.116.12:8088 + b5-sync: http://132.122.116.143:8088