From 27d2fae82ae34b66559dc20e0cb4f81d0e1d2189 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Thu, 11 May 2023 23:45:38 +0800 Subject: [PATCH] =?UTF-8?q?feature(forest):=20=E5=A2=9E=E5=8A=A0=20YarnSer?= =?UTF-8?q?vice=20=E7=9A=84=E9=9B=86=E7=BE=A4=E6=A8=A1=E5=BC=8F=E6=93=8D?= =?UTF-8?q?=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../forest/service/YarnClusterService.java | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java new file mode 100644 index 0000000..526d0b1 --- /dev/null +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnClusterService.java @@ -0,0 +1,167 @@ +package com.lanyuanxiaoyao.service.forest.service; + +import cn.hutool.core.lang.Pair; +import cn.hutool.core.util.ObjectUtil; +import com.eshore.odcp.hudi.connector.Constants; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.factory.Sets; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; +import org.eclipse.collections.api.map.MutableMap; +import org.eclipse.collections.api.set.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +/** + * 聚合 Yarn 查询 + * + * @author lanyuanxiaoyao + * @date 2023-05-11 + */ +@Service +public class YarnClusterService { + private static final Logger logger = LoggerFactory.getLogger(YarnClusterService.class); + private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); + + private final ImmutableMap servicesMap; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public YarnClusterService( + YarnB1Service yarnB1Service, + YarnB5Service yarnB5Service, + YarnB5SyncService yarnB5SyncService, + YarnB4Service yarnB4Service + ) { + MutableMap servicesMap = Maps.mutable.empty(); + servicesMap.put(Constants.CLUSTER_B1, yarnB1Service); + servicesMap.put(Constants.CLUSTER_B5, yarnB5Service); + servicesMap.put(Constants.CLUSTER_B5_SYNC, yarnB5SyncService); + servicesMap.put(Constants.CLUSTER_B4, yarnB4Service); + this.servicesMap = servicesMap.toImmutable(); + } + + public ImmutableList clusters() { + return servicesMap.keysView().toList().toImmutable(); + } + + public ImmutableList services() { + return servicesMap.valuesView().toList().toImmutable(); + } + + public ImmutableMap servicesMap() { + return servicesMap; + } + + public @Nullable YarnService service(String cluster) { + return servicesMap.get(cluster); + } + + public ImmutableList jobList(String cluster) { + return jobList(Sets.immutable.of(cluster)); + } + + public ImmutableList jobList(ImmutableSet clusters) { + return list(clusters, YarnService::jobList); + } + + public ImmutableList jobListEquals(String cluster, String name) { + return jobListEquals(Sets.immutable.of(cluster), name); + } + + public ImmutableList jobListEquals(ImmutableSet clusters, String name) { + return list(clusters, yarnService -> yarnService.jobListEquals(name)); + } + + public ImmutableList jobListLike(String cluster, String text) { + return jobListLike(Sets.immutable.of(cluster), text); + } + + public ImmutableList jobListLike(ImmutableSet clusters, String text) { + return list(clusters, yarnService -> yarnService.jobListLike(text)); + } + + private ImmutableList list(ImmutableSet clusters, Function> getter) { + if (ObjectUtil.isEmpty(clusters)) { + return Lists.immutable.empty(); + } + return clusters.collect(this::service) + .asParallel(EXECUTOR, 1) + .reject(ObjectUtil::isNull) + .flatCollect(getter::apply) + .toList() + .toImmutable(); + } + + public YarnApplication jobDetail(String cluster, String applicationId) { + YarnService service = service(cluster); + return ObjectUtil.isNull(service) ? null : service.jobDetail(applicationId); + } + + @SuppressWarnings("DataFlowIssue") + public ImmutableList queueList(String cluster) { + return servicesMap.containsKey(cluster) ? service(cluster).queueList() : Lists.immutable.empty(); + } + + public ImmutableMap> queueList(ImmutableSet clusters) { + return clusters + .select(servicesMap::containsKey) + .asParallel(EXECUTOR, 1) + .collect(cluster -> { + YarnService service = service(cluster); + @SuppressWarnings("DataFlowIssue") + ImmutableList queues = service.queueList(); + return new Pair<>(cluster, queues); + }) + .toMap(Pair::getKey, Pair::getValue) + .toImmutable(); + + } + + @SuppressWarnings("DataFlowIssue") + public YarnQueue queueDetail(String cluster, String name) { + return servicesMap.containsKey(cluster) ? service(cluster).queueDetail(name) : null; + } + + public ImmutableMap queueDetail(ImmutableSet clusters, String name) { + return clusters + .select(servicesMap::containsKey) + .asParallel(EXECUTOR, 1) + .collect(cluster -> { + YarnService service = service(cluster); + @SuppressWarnings("DataFlowIssue") + YarnQueue queue = service.queueDetail(name); + return new Pair<>(cluster, queue); + }) + .toMap(Pair::getKey, Pair::getValue) + .toImmutable(); + } + + @SuppressWarnings("DataFlowIssue") + public YarnRootQueue cluster(String cluster) { + return servicesMap.containsKey(cluster) ? service(cluster).cluster() : null; + } + + public ImmutableMap cluster(ImmutableSet clusters) { + return clusters + .select(servicesMap::containsKey) + .asParallel(EXECUTOR, 1) + .collect(cluster -> { + YarnService service = service(cluster); + @SuppressWarnings("DataFlowIssue") + YarnRootQueue queues = service.cluster(); + return new Pair<>(cluster, queues); + }) + .toMap(Pair::getKey, Pair::getValue) + .toImmutable(); + + } +}