feature(yarn-query,web): 新增 yarn 多集群查询

可以联合或单独查询 yarn 集群,并且聚合过滤排序等数据筛选内容
This commit is contained in:
2023-05-05 11:52:09 +08:00
parent 49a77a3795
commit 027fa10d50
24 changed files with 348 additions and 269 deletions

View File

@@ -7,6 +7,7 @@ package com.lanyuanxiaoyao.service.configuration.entity.yarn;
* @date 2023-04-19
*/
public final class YarnApplication {
private String cluster;
private String id;
private String user;
private String queue;
@@ -27,6 +28,15 @@ public final class YarnApplication {
private Float queueUsagePercentage;
private Float clusterUsagePercentage;
public String getCluster() {
return cluster;
}
public YarnApplication setCluster(String cluster) {
this.cluster = cluster;
return this;
}
public String getId() {
return id;
}
@@ -106,7 +116,8 @@ public final class YarnApplication {
@Override
public String toString() {
return "YarnApplication{" +
"id='" + id + '\'' +
"cluster='" + cluster + '\'' +
", id='" + id + '\'' +
", user='" + user + '\'' +
", queue='" + queue + '\'' +
", name='" + name + '\'' +

View File

@@ -7,6 +7,7 @@ package com.lanyuanxiaoyao.service.configuration.entity.yarn;
* @date 2023-04-22
*/
public final class YarnQueue {
private String cluster;
private String queueName;
private String state;
private String type;
@@ -38,6 +39,15 @@ public final class YarnQueue {
private ResourcesUsed usedAMResource;
private ResourcesUsed userAMResourceLimit;
public String getCluster() {
return cluster;
}
public YarnQueue setCluster(String cluster) {
this.cluster = cluster;
return this;
}
public String getQueueName() {
return queueName;
}
@@ -161,7 +171,8 @@ public final class YarnQueue {
@Override
public String toString() {
return "YarnQueue{" +
"queueName='" + queueName + '\'' +
"cluster='" + cluster + '\'' +
", queueName='" + queueName + '\'' +
", state='" + state + '\'' +
", type='" + type + '\'' +
", capacity=" + capacity +
@@ -180,11 +191,11 @@ public final class YarnQueue {
", maximumAllocation=" + maximumAllocation +
", queuePriority=" + queuePriority +
", orderingPolicyInfo='" + orderingPolicyInfo + '\'' +
", maxApplications=" + maxApplications +
", maxApplicationsPerUser=" + maxApplicationsPerUser +
", numActiveApplications=" + numActiveApplications +
", numPendingApplications=" + numPendingApplications +
", numContainers=" + numContainers +
", maxApplications=" + maxApplications +
", maxApplicationsPerUser=" + maxApplicationsPerUser +
", userLimit=" + userLimit +
", userLimitFactor=" + userLimitFactor +
", configuredMaxAMResourceLimit=" + configuredMaxAMResourceLimit +

View File

@@ -7,12 +7,22 @@ package com.lanyuanxiaoyao.service.configuration.entity.yarn;
* @date 2023-04-22
*/
public final class YarnRootQueue {
private String cluster;
private String queueName;
private String type;
private Float capacity;
private Float usedCapacity;
private Float maxCapacity;
public String getCluster() {
return cluster;
}
public YarnRootQueue setCluster(String cluster) {
this.cluster = cluster;
return this;
}
public String getQueueName() {
return queueName;
}
@@ -36,7 +46,8 @@ public final class YarnRootQueue {
@Override
public String toString() {
return "YarnRootQueue{" +
"queueName='" + queueName + '\'' +
"cluster='" + cluster + '\'' +
", queueName='" + queueName + '\'' +
", type='" + type + '\'' +
", capacity=" + capacity +
", usedCapacity=" + usedCapacity +

View File

@@ -9,5 +9,5 @@ import com.dtflys.forest.annotation.BaseRequest;
* @date 2023-04-21
*/
@BaseRequest(baseURL = "http://service-yarn-query-b1e11")
public interface CompactionYarnService extends YarnService {
public interface YarnB1Service extends YarnService {
}

View File

@@ -0,0 +1,13 @@
package com.lanyuanxiaoyao.service.forest.service;
import com.dtflys.forest.annotation.BaseRequest;
/**
* Yarn 信息
*
* @author lanyuanxiaoyao
* @date 2023-04-21
*/
@BaseRequest(baseURL = "http://service-yarn-query-b4")
public interface YarnB4Service extends YarnService {
}

View File

@@ -0,0 +1,13 @@
package com.lanyuanxiaoyao.service.forest.service;
import com.dtflys.forest.annotation.BaseRequest;
/**
* Yarn 信息
*
* @author lanyuanxiaoyao
* @date 2023-04-21
*/
@BaseRequest(baseURL = "http://service-yarn-query-b5")
public interface YarnB5Service extends YarnService {
}

View File

@@ -9,5 +9,5 @@ import com.dtflys.forest.annotation.BaseRequest;
* @date 2023-04-21
*/
@BaseRequest(baseURL = "http://service-yarn-query-b2s119")
public interface SyncYarnService extends YarnService {
public interface YarnB5SyncService extends YarnService {
}

View File

@@ -1,23 +1,9 @@
package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO;
import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
/**
* 放一些 Controller 的辅助方法
@@ -28,12 +14,6 @@ import org.eclipse.collections.api.map.ImmutableMap;
public class BaseController {
private static final int SUCCESS_STATUS = 0;
private static final String SUCCESS_MESSAGE = "OK";
private static final ImmutableMap<String, Function<YarnApplication, Long>> SORT_MAP = Maps.immutable.of(
"startedTime",
YarnApplication::getStartedTime,
"finishedTime",
YarnApplication::getFinishedTime
);
protected AmisResponse responseData() {
return AmisResponse.builder()
@@ -85,73 +65,4 @@ public class BaseController {
.build())
.build();
}
protected AmisResponse jobList(
YarnService yarnService,
Integer page,
Integer count,
String order,
String direction,
String filterState,
String filterFinalStatus,
String searchId,
String searchName,
// 是否使用精确查询
Boolean precise
) {
boolean isFilterState = StrUtil.isNotBlank(filterState);
boolean isFilterFinalStatus = StrUtil.isNotBlank(filterFinalStatus);
boolean isSearchId = StrUtil.isNotBlank(searchId);
boolean isSearchName = StrUtil.isNotBlank(searchName);
Comparator<YarnApplication> comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP);
ImmutableList<YarnApplication> applications = yarnService.jobList()
.select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState()))
.select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus()))
.select(app -> !isSearchId || (precise ? StrUtil.equals(app.getId(), searchId) : StrUtil.contains(app.getId(), searchId)))
.select(app -> !isSearchName || (precise ? StrUtil.equals(app.getName(), searchName) : StrUtil.contains(app.getName(), searchName)))
.toSortedList(comparator)
.toImmutable();
int running = applications.count(app -> ObjectUtil.equal(app.getState(), "RUNNING"));
int unRunning = applications.count(app -> ObjectUtil.notEqual(app.getState(), "RUNNING"));
ImmutableList<YarnApplicationVO> result = applications
.drop(Math.max(page - 1, 0) * count)
.take(count)
.collect(YarnApplicationVO::new);
return responseCrudData(result, applications.size())
.withData("running", running)
.withData("unRunning", unRunning);
}
protected AmisResponse jobCurrent(YarnService yarnService, String name) {
Optional<YarnApplication> currentApp = yarnService.jobList()
.select(app -> ObjectUtil.equals(app.getName(), name))
.select(app -> ObjectUtil.equals(app.getState(), "RUNNING"))
.toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP))
.getFirstOptional();
if (currentApp.isPresent()) {
return responseData()
.withData("hasCurrent", true)
.withData("current", currentApp.get());
} else {
return responseData().withData("hasCurrent", false);
}
}
protected AmisResponse queueList(YarnService yarnService, String names) {
boolean isFilterNames = StrUtil.isNotBlank(names);
ImmutableList<String> filterNames = Lists.immutable.of(names.split(","));
ImmutableList<YarnQueue> queues = yarnService.queueList();
ImmutableList<YarnQueue> result = queues.select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n)));
return responseCrudData(result, result.size())
.withData("queueNames", queues.collect(YarnQueue::getQueueName))
.withData("cluster", yarnService.cluster());
}
protected AmisResponse queueNames(YarnService yarnService) {
return responseData(MapUtil.of("queueNames", yarnService.queueList().collect(YarnQueue::getQueueName)));
}
protected AmisResponse cluster(YarnService yarnService) {
return responseData(MapUtil.of("cluster", yarnService.cluster()));
}
}

View File

@@ -1,64 +0,0 @@
package com.lanyuanxiaoyao.service.web.controller;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.forest.service.CompactionYarnService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Sync Yarn
*
* @author lanyuanxiaoyao
* @date 2023-04-21
*/
@RestController
@RequestMapping("compaction_yarn")
public class CompactionYarnController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(CompactionYarnController.class);
private final CompactionYarnService compactionYarnService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public CompactionYarnController(CompactionYarnService compactionYarnService) {
this.compactionYarnService = compactionYarnService;
}
@GetMapping("job_list")
public AmisResponse jobList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "order", defaultValue = "startedTime") String order,
@RequestParam(value = "direction", defaultValue = "desc") String direction,
@RequestParam(value = "filter_state", defaultValue = "") String filterState,
@RequestParam(value = "filter_final_status", defaultValue = "") String filterFinalStatus,
@RequestParam(value = "search_id", defaultValue = "") String searchId,
@RequestParam(value = "search_name", defaultValue = "") String searchName,
@RequestParam(value = "precise", defaultValue = "false") Boolean precise
) {
return jobList(compactionYarnService, page, count, order, direction, filterState, filterFinalStatus, searchId, searchName, precise);
}
@GetMapping("job_current")
public AmisResponse jobCurrent(@RequestParam("name") String name) {
return jobCurrent(compactionYarnService, name);
}
@GetMapping("queue_list")
public AmisResponse queueList(@RequestParam(value = "names", defaultValue = "") String names) {
return queueList(compactionYarnService, names);
}
@GetMapping("queue_names")
public AmisResponse queueNames() {
return queueNames(compactionYarnService);
}
@GetMapping("cluster")
public AmisResponse cluster() {
return cluster(compactionYarnService);
}
}

View File

@@ -1,64 +0,0 @@
package com.lanyuanxiaoyao.service.web.controller;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.forest.service.SyncYarnService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Sync Yarn
*
* @author lanyuanxiaoyao
* @date 2023-04-21
*/
@RestController
@RequestMapping("sync_yarn")
public class SyncYarnController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(SyncYarnController.class);
private final SyncYarnService syncYarnService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public SyncYarnController(SyncYarnService syncYarnService) {
this.syncYarnService = syncYarnService;
}
@GetMapping("job_list")
public AmisResponse jobList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "order", defaultValue = "startedTime") String order,
@RequestParam(value = "direction", defaultValue = "desc") String direction,
@RequestParam(value = "filter_state", defaultValue = "") String filterState,
@RequestParam(value = "filter_final_status", defaultValue = "") String filterFinalStatus,
@RequestParam(value = "search_id", defaultValue = "") String searchId,
@RequestParam(value = "search_name", defaultValue = "") String searchName,
@RequestParam(value = "precise", defaultValue = "false") Boolean precise
) {
return jobList(syncYarnService, page, count, order, direction, filterState, filterFinalStatus, searchId, searchName, precise);
}
@GetMapping("job_current")
public AmisResponse jobCurrent(@RequestParam("name") String name) {
return jobCurrent(syncYarnService, name);
}
@GetMapping("queue_list")
public AmisResponse queueList(@RequestParam(value = "names", defaultValue = "") String names) {
return queueList(syncYarnService, names);
}
@GetMapping("queue_names")
public AmisResponse queueNames() {
return queueNames(syncYarnService);
}
@GetMapping("cluster")
public AmisResponse cluster() {
return cluster(syncYarnService);
}
}

View File

@@ -8,9 +8,9 @@ import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias;
import com.lanyuanxiaoyao.service.forest.service.CompactionYarnService;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import com.lanyuanxiaoyao.service.forest.service.SyncYarnService;
import com.lanyuanxiaoyao.service.forest.service.YarnB1Service;
import com.lanyuanxiaoyao.service.forest.service.YarnB5SyncService;
import com.lanyuanxiaoyao.service.web.entity.SyncStateVO;
import com.lanyuanxiaoyao.service.web.entity.TableVO;
import java.util.List;
@@ -41,14 +41,14 @@ public class TableController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(TableController.class);
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
private final InfoService infoService;
private final SyncYarnService syncYarnService;
private final CompactionYarnService compactionYarnService;
private final YarnB5SyncService yarnB5SyncService;
private final YarnB1Service yarnB1Service;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public TableController(InfoService infoService, SyncYarnService syncYarnService, CompactionYarnService compactionYarnService) {
public TableController(InfoService infoService, YarnB5SyncService yarnB5SyncService, YarnB1Service yarnB1Service) {
this.infoService = infoService;
this.syncYarnService = syncYarnService;
this.compactionYarnService = compactionYarnService;
this.yarnB5SyncService = yarnB5SyncService;
this.yarnB1Service = yarnB1Service;
}
@GetMapping("list")

View File

@@ -0,0 +1,152 @@
package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
import com.lanyuanxiaoyao.service.forest.service.*;
import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO;
import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO;
import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Sync Yarn
*
* @author lanyuanxiaoyao
* @date 2023-04-21
*/
@RestController
@RequestMapping("yarn")
public class YarnController extends BaseController {
private static final Logger logger = LoggerFactory.getLogger(YarnController.class);
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
private static final ImmutableMap<String, Function<YarnApplication, Long>> SORT_MAP = Maps.immutable.of(
"startedTime",
YarnApplication::getStartedTime,
"finishedTime",
YarnApplication::getFinishedTime
);
private final ImmutableMap<String, YarnService> yarnServices;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public YarnController(YarnB1Service yarnB1Service, YarnB5SyncService yarnB5SyncService, YarnB4Service yarnB4Service, YarnB5Service yarnB5Service) {
yarnServices = Maps.immutable.ofAll(MapUtil.<String, YarnService>builder()
.put("b1", yarnB1Service)
.put("b5-sync", yarnB5SyncService)
.put("b4", yarnB4Service)
.put("b5", yarnB5Service)
.build());
}
private YarnService getService(String cluster) {
if (!yarnServices.containsKey(cluster)) {
throw new RuntimeException(StrUtil.format("Cluster {} not found", cluster));
}
return yarnServices.get(cluster);
}
private ImmutableList<YarnService> getServices(List<String> clusters) {
return Lists.immutable.ofAll(clusters).collect(this::getService);
}
@GetMapping("job_list")
public AmisResponse jobList(
@RequestParam("clusters") List<String> clusters,
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "count", defaultValue = "10") Integer count,
@RequestParam(value = "order", defaultValue = "startedTime") String order,
@RequestParam(value = "direction", defaultValue = "desc") String direction,
@RequestParam(value = "filter_state", defaultValue = "") String filterState,
@RequestParam(value = "filter_final_status", defaultValue = "") String filterFinalStatus,
@RequestParam(value = "search_id", defaultValue = "") String searchId,
@RequestParam(value = "search_name", defaultValue = "") String searchName,
@RequestParam(value = "precise", defaultValue = "false") Boolean precise
) {
boolean isFilterState = StrUtil.isNotBlank(filterState);
boolean isFilterFinalStatus = StrUtil.isNotBlank(filterFinalStatus);
boolean isSearchId = StrUtil.isNotBlank(searchId);
boolean isSearchName = StrUtil.isNotBlank(searchName);
Comparator<YarnApplication> comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP);
ImmutableList<YarnApplication> applications = getServices(clusters)
.asParallel(EXECUTOR, 1)
.flatCollect(YarnService::jobList)
.select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState()))
.select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus()))
.select(app -> !isSearchId || (precise ? StrUtil.equals(app.getId(), searchId) : StrUtil.contains(app.getId(), searchId)))
.select(app -> !isSearchName || (precise ? StrUtil.equals(app.getName(), searchName) : StrUtil.contains(app.getName(), searchName)))
.toSortedList(comparator)
.toImmutable();
int running = applications.count(app -> ObjectUtil.equal(app.getState(), "RUNNING"));
int unRunning = applications.count(app -> ObjectUtil.notEqual(app.getState(), "RUNNING"));
ImmutableList<YarnApplicationVO> result = applications
.drop(Math.max(page - 1, 0) * count)
.take(count)
.collect(YarnApplicationVO::new);
return responseCrudData(result, applications.size())
.withData("running", running)
.withData("unRunning", unRunning);
}
@GetMapping("job_current")
public AmisResponse jobCurrent(@RequestParam("clusters") List<String> clusters, @RequestParam("name") String name) {
Optional<YarnApplication> currentApp = getServices(clusters)
.asParallel(EXECUTOR, 1)
.flatCollect(YarnService::jobList)
.select(app -> ObjectUtil.equals(app.getName(), name))
.select(app -> ObjectUtil.equals(app.getState(), "RUNNING"))
.toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP))
.getFirstOptional();
if (currentApp.isPresent()) {
return responseData()
.withData("hasCurrent", true)
.withData("current", currentApp.get());
} else {
return responseData().withData("hasCurrent", false);
}
}
@GetMapping("queue_list")
public AmisResponse queueList(@RequestParam("clusters") List<String> clusters, @RequestParam(value = "names", defaultValue = "") String names) {
boolean isFilterNames = StrUtil.isNotBlank(names);
ImmutableList<String> filterNames = Lists.immutable.of(names.split(","));
ImmutableList<YarnClusterVO> results = getServices(clusters)
.asParallel(EXECUTOR, 1)
.collect(yarnService -> {
YarnRootQueue cluster = yarnService.cluster();
ImmutableList<YarnQueue> queues = yarnService.queueList().select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n)));
return new YarnClusterVO(cluster, queues);
})
.toList()
.toImmutable();
return responseCrudData(results, results.size());
}
@GetMapping("queue_names")
public AmisResponse queueNames(@RequestParam("clusters") List<String> clusters) {
return responseData(MapUtil.of("queueNames", getServices(clusters).flatCollect(YarnService::queueList).collect(YarnQueue::getQueueName)));
}
@GetMapping("clusters")
public AmisResponse clusters(@RequestParam("clusters") List<String> clusters) {
return responseData(MapUtil.of("cluster", getServices(clusters).collect(YarnService::cluster)));
}
}

View File

@@ -35,6 +35,10 @@ public class YarnApplicationVO {
}
}
public String getCluster() {
return yarnApplication.getCluster();
}
public String getId() {
return yarnApplication.getId();
}

View File

@@ -0,0 +1,62 @@
package com.lanyuanxiaoyao.service.web.entity;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @author lanyuanxiaoyao
* @date 2023-05-05
*/
public class YarnClusterVO {
@JsonIgnore
private final YarnRootQueue yarnRootQueue;
private final ImmutableList<YarnQueue> children;
public YarnClusterVO(YarnRootQueue yarnRootQueue, ImmutableList<YarnQueue> children) {
this.yarnRootQueue = yarnRootQueue;
this.children = children;
}
public Boolean isRoot() {
return true;
}
public String getCluster() {
return yarnRootQueue.getCluster();
}
public String getQueueName() {
return StrUtil.format("{} ({})", yarnRootQueue.getQueueName(), yarnRootQueue.getCluster());
}
public String getType() {
return yarnRootQueue.getType();
}
public Float getAbsoluteCapacity() {
return yarnRootQueue.getCapacity();
}
public Float getAbsoluteUsedCapacity() {
return yarnRootQueue.getUsedCapacity();
}
public Float getAbsoluteMaxCapacity() {
return yarnRootQueue.getMaxCapacity();
}
public ImmutableList<YarnQueue> getChildren() {
return children;
}
@Override
public String toString() {
return "YarnClusterVO{" +
"yarnRootQueue=" + yarnRootQueue +
", yarnQueues=" + children +
'}';
}
}

View File

@@ -20,7 +20,9 @@ function cloudTab() {
type: 'crud',
title: '服务列表',
api: '${base}/cloud/list',
source: '$items',
interval: 2000,
syncLocation: false,
silentPolling: true,
headerToolbar: ['reload'],
columns: [
{name: 'name', label: '名称'},

View File

@@ -11,7 +11,7 @@ function yarnCrudColumns() {
{
name: 'id',
label: 'ID',
width: 240,
width: 250,
fixed: 'left',
...copyField('id')
},
@@ -21,12 +21,19 @@ function yarnCrudColumns() {
fixed: 'left',
...copyField('name')
},
{
name: 'cluster',
label: '集群',
width: 50,
align: 'center',
},
{
label: '用户',
width: 80,
type: 'tooltip-wrapper',
body: '${TRUNCATE(user, 8)}',
content: '${user}',
align: 'center',
},
{
name: 'startedTime',
@@ -162,7 +169,7 @@ function yarnCrudColumns() {
]
}
function simpleYarnDialog(mode, title) {
function simpleYarnDialog(cluster, title, filterField) {
return {
title: title,
actions: [],
@@ -194,8 +201,9 @@ function simpleYarnDialog(mode, title) {
type: 'crud',
api: {
method: 'get',
url: `\${base}/${mode}_yarn/job_list`,
url: `\${base}/yarn/job_list`,
data: {
clusters: `${cluster}`,
page: '${page|default:undefined}',
count: '${perPage|default:undefined}',
order: '${orderBy|default:undefined}',
@@ -203,7 +211,7 @@ function simpleYarnDialog(mode, title) {
filter_state: '${state|default:undefined}',
filter_final_status: '${finalStatus|default:undefined}',
search_id: '${id|default:undefined}',
search_name: `\${${mode}JobName}`,
search_name: `\${${filterField}}`,
precise: true,
}
},
@@ -286,7 +294,7 @@ function flinkJobDialog() {
type: 'action',
label: '打开同步详情',
actionType: 'dialog',
dialog: simpleYarnDialog('sync', '同步详情')
dialog: simpleYarnDialog('b5-sync', '同步详情', 'syncJobName')
},
{type: 'divider'},
{
@@ -386,14 +394,14 @@ function tableMetaDialog() {
type: 'action',
icon: 'fa fa-arrows-rotate',
actionType: 'dialog',
dialog: simpleYarnDialog('sync', '同步详情')
dialog: simpleYarnDialog('b5-sync', '同步详情', 'syncJobName')
},
{
label: '压缩情况',
type: 'action',
icon: 'fa fa-minimize',
actionType: 'dialog',
dialog: simpleYarnDialog('compaction', '压缩详情')
dialog: simpleYarnDialog('b1', '压缩详情', 'compactionJobName')
},
{
type: 'button',

View File

@@ -1,29 +1,25 @@
function yarnTab(name, title, queueNames = 'default', searchName = undefined) {
function yarnTab(cluster, title, queueNames = 'default', searchName = undefined) {
return {
title: `${title} 集群`,
tab: [
{
id: `${name}-yarn-service`,
name: `${name}-yarn-service`,
id: `${cluster}-yarn-service`,
name: `${cluster}-yarn-service`,
type: 'service',
body: [
{
type: 'tpl',
tpl: '<span class="font-bold text-xl">集群资源</span>',
// className: 'mb-2 block',
},
{
type: 'crud',
api: {
method: 'get',
url: `\${base}/${name}_yarn/queue_list`,
url: '${base}/yarn/queue_list',
data: {
clusters: `${cluster}`,
names: '${queueName|default:undefined}'
},
responseData: {
'&': '$$',
clusterUsage: '${ROUND((cluster.usedCapacity * 100 / cluster.maxCapacity), 0)}'
},
},
defaultParams: {
queueName: queueNames,
@@ -33,10 +29,6 @@ function yarnTab(name, title, queueNames = 'default', searchName = undefined) {
silentPolling: true,
headerToolbar: [
"reload",
{
type: "tpl",
tpl: "集群总资源 <span class='ml-1 font-extrabold text-xl ${IF(clusterUsage > 0, IF(clusterUsage > 30, IF(clusterUsage > 90, 'text-danger', 'text-info'), 'text-info'), 'text-success')}'>${clusterUsage}%</span>"
},
],
columns: [
{
@@ -86,6 +78,7 @@ function yarnTab(name, title, queueNames = 'default', searchName = undefined) {
type: "button",
level: "link",
tooltip: '查看队列详情',
visibleOn: '${!root}',
actionType: 'dialog',
dialog: {
closeOnEsc: true,
@@ -141,8 +134,9 @@ function yarnTab(name, title, queueNames = 'default', searchName = undefined) {
type: 'crud',
api: {
method: 'get',
url: `\${base}/${name}_yarn/job_list`,
url: '${base}/yarn/job_list',
data: {
clusters: `${cluster}`,
page: '${page|default:undefined}',
count: '${perPage|default:undefined}',
order: '${orderBy|default:undefined}',

View File

@@ -12,11 +12,11 @@
/>
<title>Hudi 服务页面</title>
<link href="sdk/sdk-icon.css" rel="stylesheet"/>
<!-- <link href="sdk/default.css" rel="stylesheet"/>-->
<!-- <link href="sdk/fontawesome.css" rel="stylesheet"/>-->
<!-- <link href="sdk/default.css" rel="stylesheet"/>-->
<!-- <link href="sdk/fontawesome.css" rel="stylesheet"/>-->
<link href="sdk/ang.css" rel="stylesheet"/>
<!-- <link href="sdk/cxd.css" rel="stylesheet"/>-->
<!-- <link href="sdk/antd.css" rel="stylesheet"/>-->
<!-- <link href="sdk/cxd.css" rel="stylesheet"/>-->
<!-- <link href="sdk/antd.css" rel="stylesheet"/>-->
<link href="sdk/helper.css" rel="stylesheet"/>
<link href="sdk/iconfont.css" rel="stylesheet"/>
<link href="sdk/fontawesome.css" rel="stylesheet"/>
@@ -52,8 +52,11 @@
tabsMode: 'strong',
tabs: [
tableTab(),
yarnTab('sync', 'Sync'),
yarnTab('compaction', 'Compaction', 'datalake,tyly', 'Compaction'),
yarnTab('b5-sync', '同步 b5', undefined, 'Sync'),
yarnTab(' b1,b4,b5', '压缩 b1 b4 b5', 'datalake,tyly'),
yarnTab(' b1', '压缩 b1', 'datalake,tyly', 'Compaction'),
yarnTab(' b4', '压缩 b4'),
yarnTab(' b5', '压缩 b5'),
cloudTab(),
]
}

View File

@@ -12,8 +12,17 @@ import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("yarn")
public class YarnConfiguration {
private String cluster;
private String webUrl;
public String getCluster() {
return cluster;
}
public void setCluster(String cluster) {
this.cluster = cluster;
}
public String getWebUrl() {
return webUrl;
}
@@ -25,7 +34,8 @@ public class YarnConfiguration {
@Override
public String toString() {
return "YarnConfiguration{" +
"webUrl='" + webUrl + '\'' +
"cluster='" + cluster + '\'' +
", webUrl='" + webUrl + '\'' +
'}';
}
}

View File

@@ -40,6 +40,6 @@ public class ClusterServiceImpl implements ClusterService {
public YarnRootQueue info() throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler");
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
return mapper.readValue(body, ClusterInfoResponse.class).getScheduler().getSchedulerInfo();
return mapper.readValue(body, ClusterInfoResponse.class).getScheduler().getSchedulerInfo().setCluster(yarnConfiguration.getCluster());
}
}

View File

@@ -41,7 +41,7 @@ public class JobAutoRefreshServiceImpl implements JobService {
public void refresh() throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps");
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
ImmutableList<YarnApplication> apps = mapper.readValue(body, ApplicationsListResponse.class).getApps().getApp();
ImmutableList<YarnApplication> apps = mapper.readValue(body, ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(yarnConfiguration.getCluster()));
CACHE.set(apps);
}

View File

@@ -44,7 +44,7 @@ public class JobServiceImpl implements JobService {
public ImmutableList<YarnApplication> list() throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps");
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp();
return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(yarnConfiguration.getCluster()));
}
}
@@ -52,14 +52,14 @@ public class JobServiceImpl implements JobService {
@Retryable(Throwable.class)
@Override
public ImmutableList<YarnApplication> listEquals(String name) throws JsonProcessingException {
return list().select(app -> StrUtil.equals(app.getName(), name));
return list().select(app -> StrUtil.equals(app.getName(), name)).tap(app -> app.setCluster(yarnConfiguration.getCluster()));
}
@Cacheable(value = "job-list", sync = true, key = "#{methodName+name}")
@Retryable(Throwable.class)
@Override
public ImmutableList<YarnApplication> listLike(String name) throws JsonProcessingException {
return list().select(app -> StrUtil.contains(app.getName(), name));
return list().select(app -> StrUtil.contains(app.getName(), name)).tap(app -> app.setCluster(yarnConfiguration.getCluster()));
}
@Override
@@ -72,7 +72,7 @@ public class JobServiceImpl implements JobService {
public YarnApplication detail(String applicationId) throws JsonProcessingException {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps/" + applicationId);
try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) {
return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp();
return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp().setCluster(yarnConfiguration.getCluster());
}
}
}

View File

@@ -42,7 +42,8 @@ public class QueueAutoRefreshServiceImpl implements QueueService {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler");
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
QueueListResponse response = mapper.readValue(body, QueueListResponse.class);
ImmutableList<YarnQueue> queues = Lists.immutable.ofAll(response.getScheduler().getSchedulerInfo().getQueues().getQueue());
ImmutableList<YarnQueue> queues = Lists.immutable.ofAll(response.getScheduler().getSchedulerInfo().getQueues().getQueue())
.tap(q -> q.setCluster(yarnConfiguration.getCluster()));
CACHE.set(queues);
}

View File

@@ -44,7 +44,7 @@ public class QueueServiceImpl implements QueueService {
String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler");
String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body();
QueueListResponse response = mapper.readValue(body, QueueListResponse.class);
return Lists.immutable.ofAll(response.getScheduler().getSchedulerInfo().getQueues().getQueue());
return Lists.immutable.ofAll(response.getScheduler().getSchedulerInfo().getQueues().getQueue()).tap(q -> q.setCluster(yarnConfiguration.getCluster()));
}
@Cacheable(value = "queue-detail", sync = true, key = "#name")
@@ -54,6 +54,7 @@ public class QueueServiceImpl implements QueueService {
return list()
.select(q -> StrUtil.equals(q.getQueueName(), name))
.getFirstOptional()
.orElseThrow(() -> new Exception("cannot found " + name));
.orElseThrow(() -> new Exception("cannot found " + name))
.setCluster(yarnConfiguration.getCluster());
}
}