feature(forest): 增加 YarnService 的集群模式操作
This commit is contained in:
@@ -0,0 +1,167 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.forest.service;
|
||||||
|
|
||||||
|
import cn.hutool.core.lang.Pair;
|
||||||
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
|
import com.eshore.odcp.hudi.connector.Constants;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import org.eclipse.collections.api.factory.Lists;
|
||||||
|
import org.eclipse.collections.api.factory.Maps;
|
||||||
|
import org.eclipse.collections.api.factory.Sets;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
import org.eclipse.collections.api.map.ImmutableMap;
|
||||||
|
import org.eclipse.collections.api.map.MutableMap;
|
||||||
|
import org.eclipse.collections.api.set.ImmutableSet;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 聚合 Yarn 查询
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2023-05-11
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class YarnClusterService {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(YarnClusterService.class);
|
||||||
|
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
|
||||||
|
|
||||||
|
private final ImmutableMap<String, YarnService> servicesMap;
|
||||||
|
|
||||||
|
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||||
|
public YarnClusterService(
|
||||||
|
YarnB1Service yarnB1Service,
|
||||||
|
YarnB5Service yarnB5Service,
|
||||||
|
YarnB5SyncService yarnB5SyncService,
|
||||||
|
YarnB4Service yarnB4Service
|
||||||
|
) {
|
||||||
|
MutableMap<String, YarnService> servicesMap = Maps.mutable.empty();
|
||||||
|
servicesMap.put(Constants.CLUSTER_B1, yarnB1Service);
|
||||||
|
servicesMap.put(Constants.CLUSTER_B5, yarnB5Service);
|
||||||
|
servicesMap.put(Constants.CLUSTER_B5_SYNC, yarnB5SyncService);
|
||||||
|
servicesMap.put(Constants.CLUSTER_B4, yarnB4Service);
|
||||||
|
this.servicesMap = servicesMap.toImmutable();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<String> clusters() {
|
||||||
|
return servicesMap.keysView().toList().toImmutable();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<YarnService> services() {
|
||||||
|
return servicesMap.valuesView().toList().toImmutable();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableMap<String, YarnService> servicesMap() {
|
||||||
|
return servicesMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public @Nullable YarnService service(String cluster) {
|
||||||
|
return servicesMap.get(cluster);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<YarnApplication> jobList(String cluster) {
|
||||||
|
return jobList(Sets.immutable.of(cluster));
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<YarnApplication> jobList(ImmutableSet<String> clusters) {
|
||||||
|
return list(clusters, YarnService::jobList);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<YarnApplication> jobListEquals(String cluster, String name) {
|
||||||
|
return jobListEquals(Sets.immutable.of(cluster), name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<YarnApplication> jobListEquals(ImmutableSet<String> clusters, String name) {
|
||||||
|
return list(clusters, yarnService -> yarnService.jobListEquals(name));
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<YarnApplication> jobListLike(String cluster, String text) {
|
||||||
|
return jobListLike(Sets.immutable.of(cluster), text);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<YarnApplication> jobListLike(ImmutableSet<String> clusters, String text) {
|
||||||
|
return list(clusters, yarnService -> yarnService.jobListLike(text));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ImmutableList<YarnApplication> list(ImmutableSet<String> clusters, Function<YarnService, ImmutableList<YarnApplication>> getter) {
|
||||||
|
if (ObjectUtil.isEmpty(clusters)) {
|
||||||
|
return Lists.immutable.empty();
|
||||||
|
}
|
||||||
|
return clusters.collect(this::service)
|
||||||
|
.asParallel(EXECUTOR, 1)
|
||||||
|
.reject(ObjectUtil::isNull)
|
||||||
|
.flatCollect(getter::apply)
|
||||||
|
.toList()
|
||||||
|
.toImmutable();
|
||||||
|
}
|
||||||
|
|
||||||
|
public YarnApplication jobDetail(String cluster, String applicationId) {
|
||||||
|
YarnService service = service(cluster);
|
||||||
|
return ObjectUtil.isNull(service) ? null : service.jobDetail(applicationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
public ImmutableList<YarnQueue> queueList(String cluster) {
|
||||||
|
return servicesMap.containsKey(cluster) ? service(cluster).queueList() : Lists.immutable.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableMap<String, ImmutableList<YarnQueue>> queueList(ImmutableSet<String> clusters) {
|
||||||
|
return clusters
|
||||||
|
.select(servicesMap::containsKey)
|
||||||
|
.asParallel(EXECUTOR, 1)
|
||||||
|
.collect(cluster -> {
|
||||||
|
YarnService service = service(cluster);
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
ImmutableList<YarnQueue> queues = service.queueList();
|
||||||
|
return new Pair<>(cluster, queues);
|
||||||
|
})
|
||||||
|
.toMap(Pair::getKey, Pair::getValue)
|
||||||
|
.toImmutable();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
public YarnQueue queueDetail(String cluster, String name) {
|
||||||
|
return servicesMap.containsKey(cluster) ? service(cluster).queueDetail(name) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableMap<String, YarnQueue> queueDetail(ImmutableSet<String> clusters, String name) {
|
||||||
|
return clusters
|
||||||
|
.select(servicesMap::containsKey)
|
||||||
|
.asParallel(EXECUTOR, 1)
|
||||||
|
.collect(cluster -> {
|
||||||
|
YarnService service = service(cluster);
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
YarnQueue queue = service.queueDetail(name);
|
||||||
|
return new Pair<>(cluster, queue);
|
||||||
|
})
|
||||||
|
.toMap(Pair::getKey, Pair::getValue)
|
||||||
|
.toImmutable();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
public YarnRootQueue cluster(String cluster) {
|
||||||
|
return servicesMap.containsKey(cluster) ? service(cluster).cluster() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableMap<String, YarnRootQueue> cluster(ImmutableSet<String> clusters) {
|
||||||
|
return clusters
|
||||||
|
.select(servicesMap::containsKey)
|
||||||
|
.asParallel(EXECUTOR, 1)
|
||||||
|
.collect(cluster -> {
|
||||||
|
YarnService service = service(cluster);
|
||||||
|
@SuppressWarnings("DataFlowIssue")
|
||||||
|
YarnRootQueue queues = service.cluster();
|
||||||
|
return new Pair<>(cluster, queues);
|
||||||
|
})
|
||||||
|
.toMap(Pair::getKey, Pair::getValue)
|
||||||
|
.toImmutable();
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user