refactor(web): 优化 yarn 信息的获取
This commit is contained in:
@@ -96,8 +96,8 @@ public class YarnClusterService {
|
||||
return Lists.immutable.empty();
|
||||
}
|
||||
return clusters.collect(this::service)
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.reject(ObjectUtil::isNull)
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.flatCollect(getter::apply)
|
||||
.toList()
|
||||
.toImmutable();
|
||||
@@ -113,19 +113,14 @@ public class YarnClusterService {
|
||||
return servicesMap.containsKey(cluster) ? service(cluster).queueList() : Lists.immutable.empty();
|
||||
}
|
||||
|
||||
public ImmutableMap<String, ImmutableList<YarnQueue>> queueList(ImmutableSet<String> clusters) {
|
||||
public ImmutableList<YarnQueue> queueList(ImmutableSet<String> clusters) {
|
||||
//noinspection DataFlowIssue
|
||||
return clusters
|
||||
.select(servicesMap::containsKey)
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.collect(cluster -> {
|
||||
YarnService service = service(cluster);
|
||||
@SuppressWarnings("DataFlowIssue")
|
||||
ImmutableList<YarnQueue> queues = service.queueList();
|
||||
return new Pair<>(cluster, queues);
|
||||
})
|
||||
.toMap(Pair::getKey, Pair::getValue)
|
||||
.flatCollect(cluster -> service(cluster).queueList())
|
||||
.toList()
|
||||
.toImmutable();
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("DataFlowIssue")
|
||||
@@ -133,17 +128,13 @@ public class YarnClusterService {
|
||||
return servicesMap.containsKey(cluster) ? service(cluster).queueDetail(name) : null;
|
||||
}
|
||||
|
||||
public ImmutableMap<String, YarnQueue> queueDetail(ImmutableSet<String> clusters, String name) {
|
||||
public ImmutableList<YarnQueue> queueDetail(ImmutableSet<String> clusters, String name) {
|
||||
//noinspection DataFlowIssue
|
||||
return clusters
|
||||
.select(servicesMap::containsKey)
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.collect(cluster -> {
|
||||
YarnService service = service(cluster);
|
||||
@SuppressWarnings("DataFlowIssue")
|
||||
YarnQueue queue = service.queueDetail(name);
|
||||
return new Pair<>(cluster, queue);
|
||||
})
|
||||
.toMap(Pair::getKey, Pair::getValue)
|
||||
.collect(cluster -> service(cluster).queueDetail(name))
|
||||
.toList()
|
||||
.toImmutable();
|
||||
}
|
||||
|
||||
@@ -152,18 +143,13 @@ public class YarnClusterService {
|
||||
return servicesMap.containsKey(cluster) ? service(cluster).cluster() : null;
|
||||
}
|
||||
|
||||
public ImmutableMap<String, YarnRootQueue> cluster(ImmutableSet<String> clusters) {
|
||||
public ImmutableList<YarnRootQueue> cluster(ImmutableSet<String> clusters) {
|
||||
//noinspection DataFlowIssue
|
||||
return clusters
|
||||
.select(servicesMap::containsKey)
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.collect(cluster -> {
|
||||
YarnService service = service(cluster);
|
||||
@SuppressWarnings("DataFlowIssue")
|
||||
YarnRootQueue queues = service.cluster();
|
||||
return new Pair<>(cluster, queues);
|
||||
})
|
||||
.toMap(Pair::getKey, Pair::getValue)
|
||||
.collect(cluster -> service(cluster).cluster())
|
||||
.toList()
|
||||
.toImmutable();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
|
||||
import com.lanyuanxiaoyao.service.forest.service.YarnClusterService;
|
||||
import com.lanyuanxiaoyao.service.forest.service.YarnService;
|
||||
import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO;
|
||||
import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO;
|
||||
import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil;
|
||||
@@ -20,6 +19,7 @@ import java.util.concurrent.Executors;
|
||||
import java.util.function.Function;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.factory.Maps;
|
||||
import org.eclipse.collections.api.factory.Sets;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.eclipse.collections.api.map.ImmutableMap;
|
||||
import org.slf4j.Logger;
|
||||
@@ -46,21 +46,10 @@ public class YarnController extends BaseController {
|
||||
"finishedTime",
|
||||
YarnApplication::getFinishedTime
|
||||
);
|
||||
private final ImmutableMap<String, YarnService> yarnServices;
|
||||
private final YarnClusterService yarnClusterService;
|
||||
|
||||
public YarnController(YarnClusterService yarnClusterService) {
|
||||
this.yarnServices = yarnClusterService.servicesMap();
|
||||
}
|
||||
|
||||
private YarnService getService(String cluster) {
|
||||
if (!yarnServices.containsKey(cluster)) {
|
||||
throw new RuntimeException(StrUtil.format("Cluster {} not found", cluster));
|
||||
}
|
||||
return yarnServices.get(cluster);
|
||||
}
|
||||
|
||||
private ImmutableList<YarnService> getServices(List<String> clusters) {
|
||||
return Lists.immutable.ofAll(clusters).collect(this::getService);
|
||||
this.yarnClusterService = yarnClusterService;
|
||||
}
|
||||
|
||||
@GetMapping("job_list")
|
||||
@@ -81,9 +70,8 @@ public class YarnController extends BaseController {
|
||||
boolean isSearchId = StrUtil.isNotBlank(searchId);
|
||||
boolean isSearchName = StrUtil.isNotBlank(searchName);
|
||||
Comparator<YarnApplication> comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP);
|
||||
ImmutableList<YarnApplication> applications = getServices(clusters)
|
||||
ImmutableList<YarnApplication> applications = yarnClusterService.jobList(Sets.immutable.ofAll(clusters))
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.flatCollect(YarnService::jobList)
|
||||
.select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState()))
|
||||
.select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus()))
|
||||
.select(app -> !isSearchId || (precise ? StrUtil.equals(app.getId(), searchId) : StrUtil.contains(app.getId(), searchId)))
|
||||
@@ -103,10 +91,8 @@ public class YarnController extends BaseController {
|
||||
|
||||
@GetMapping("job_current")
|
||||
public AmisResponse jobCurrent(@RequestParam("clusters") List<String> clusters, @RequestParam("name") String name) {
|
||||
Optional<YarnApplication> currentApp = getServices(clusters)
|
||||
Optional<YarnApplication> currentApp = yarnClusterService.jobListEquals(Sets.immutable.ofAll(clusters), name)
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.flatCollect(YarnService::jobList)
|
||||
.select(app -> ObjectUtil.equals(app.getName(), name))
|
||||
.select(app -> ObjectUtil.equals(app.getState(), "RUNNING"))
|
||||
.toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP))
|
||||
.getFirstOptional();
|
||||
@@ -123,7 +109,7 @@ public class YarnController extends BaseController {
|
||||
public AmisResponse queueList(@RequestParam("clusters") List<String> clusters, @RequestParam(value = "names", defaultValue = "") String names) {
|
||||
boolean isFilterNames = StrUtil.isNotBlank(names);
|
||||
ImmutableList<String> filterNames = Lists.immutable.of(names.split(","));
|
||||
ImmutableList<YarnClusterVO> results = getServices(clusters)
|
||||
ImmutableList<YarnClusterVO> results = yarnClusterService.services()
|
||||
.asParallel(EXECUTOR, 1)
|
||||
.collect(yarnService -> {
|
||||
YarnRootQueue cluster = yarnService.cluster();
|
||||
@@ -137,11 +123,11 @@ public class YarnController extends BaseController {
|
||||
|
||||
@GetMapping("queue_names")
|
||||
public AmisResponse queueNames(@RequestParam("clusters") List<String> clusters) {
|
||||
return responseData(MapUtil.of("queueNames", getServices(clusters).flatCollect(YarnService::queueList).collect(YarnQueue::getQueueName)));
|
||||
return responseData(MapUtil.of("queueNames", yarnClusterService.queueList(Sets.immutable.ofAll(clusters)).collect(YarnQueue::getQueueName)));
|
||||
}
|
||||
|
||||
@GetMapping("clusters")
|
||||
public AmisResponse clusters(@RequestParam("clusters") List<String> clusters) {
|
||||
return responseData(MapUtil.of("cluster", getServices(clusters).collect(YarnService::cluster)));
|
||||
return responseData(MapUtil.of("cluster", yarnClusterService.cluster(Sets.immutable.ofAll(clusters))));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user