diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnApplication.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnApplication.java index 0809755..f6166cc 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnApplication.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnApplication.java @@ -7,6 +7,7 @@ package com.lanyuanxiaoyao.service.configuration.entity.yarn; * @date 2023-04-19 */ public final class YarnApplication { + private String cluster; private String id; private String user; private String queue; @@ -27,6 +28,15 @@ public final class YarnApplication { private Float queueUsagePercentage; private Float clusterUsagePercentage; + public String getCluster() { + return cluster; + } + + public YarnApplication setCluster(String cluster) { + this.cluster = cluster; + return this; + } + public String getId() { return id; } @@ -106,7 +116,8 @@ public final class YarnApplication { @Override public String toString() { return "YarnApplication{" + - "id='" + id + '\'' + + "cluster='" + cluster + '\'' + + ", id='" + id + '\'' + ", user='" + user + '\'' + ", queue='" + queue + '\'' + ", name='" + name + '\'' + diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnQueue.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnQueue.java index 82e1244..058b3d7 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnQueue.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnQueue.java @@ -7,6 +7,7 @@ package com.lanyuanxiaoyao.service.configuration.entity.yarn; * @date 2023-04-22 */ public final class YarnQueue { + private String cluster; private String queueName; private String state; private String type; @@ -38,6 +39,15 @@ public final class YarnQueue { private ResourcesUsed usedAMResource; private ResourcesUsed userAMResourceLimit; + public String getCluster() { + return cluster; + } + + public YarnQueue setCluster(String cluster) { + this.cluster = cluster; + return this; + } + public String getQueueName() { return queueName; } @@ -161,7 +171,8 @@ public final class YarnQueue { @Override public String toString() { return "YarnQueue{" + - "queueName='" + queueName + '\'' + + "cluster='" + cluster + '\'' + + ", queueName='" + queueName + '\'' + ", state='" + state + '\'' + ", type='" + type + '\'' + ", capacity=" + capacity + @@ -180,11 +191,11 @@ public final class YarnQueue { ", maximumAllocation=" + maximumAllocation + ", queuePriority=" + queuePriority + ", orderingPolicyInfo='" + orderingPolicyInfo + '\'' + + ", maxApplications=" + maxApplications + + ", maxApplicationsPerUser=" + maxApplicationsPerUser + ", numActiveApplications=" + numActiveApplications + ", numPendingApplications=" + numPendingApplications + ", numContainers=" + numContainers + - ", maxApplications=" + maxApplications + - ", maxApplicationsPerUser=" + maxApplicationsPerUser + ", userLimit=" + userLimit + ", userLimitFactor=" + userLimitFactor + ", configuredMaxAMResourceLimit=" + configuredMaxAMResourceLimit + diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnRootQueue.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnRootQueue.java index d23cfbb..e982218 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnRootQueue.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/yarn/YarnRootQueue.java @@ -7,12 +7,22 @@ package com.lanyuanxiaoyao.service.configuration.entity.yarn; * @date 2023-04-22 */ public final class YarnRootQueue { + private String cluster; private String queueName; private String type; private Float capacity; private Float usedCapacity; private Float maxCapacity; + public String getCluster() { + return cluster; + } + + public YarnRootQueue setCluster(String cluster) { + this.cluster = cluster; + return this; + } + public String getQueueName() { return queueName; } @@ -36,7 +46,8 @@ public final class YarnRootQueue { @Override public String toString() { return "YarnRootQueue{" + - "queueName='" + queueName + '\'' + + "cluster='" + cluster + '\'' + + ", queueName='" + queueName + '\'' + ", type='" + type + '\'' + ", capacity=" + capacity + ", usedCapacity=" + usedCapacity + diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/CompactionYarnService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB1Service.java similarity index 79% rename from service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/CompactionYarnService.java rename to service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB1Service.java index e610ab8..51909b8 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/CompactionYarnService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB1Service.java @@ -9,5 +9,5 @@ import com.dtflys.forest.annotation.BaseRequest; * @date 2023-04-21 */ @BaseRequest(baseURL = "http://service-yarn-query-b1e11") -public interface CompactionYarnService extends YarnService { +public interface YarnB1Service extends YarnService { } diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB4Service.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB4Service.java new file mode 100644 index 0000000..f8d1630 --- /dev/null +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB4Service.java @@ -0,0 +1,13 @@ +package com.lanyuanxiaoyao.service.forest.service; + +import com.dtflys.forest.annotation.BaseRequest; + +/** + * Yarn 信息 + * + * @author lanyuanxiaoyao + * @date 2023-04-21 + */ +@BaseRequest(baseURL = "http://service-yarn-query-b4") +public interface YarnB4Service extends YarnService { +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5Service.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5Service.java new file mode 100644 index 0000000..824fec4 --- /dev/null +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5Service.java @@ -0,0 +1,13 @@ +package com.lanyuanxiaoyao.service.forest.service; + +import com.dtflys.forest.annotation.BaseRequest; + +/** + * Yarn 信息 + * + * @author lanyuanxiaoyao + * @date 2023-04-21 + */ +@BaseRequest(baseURL = "http://service-yarn-query-b5") +public interface YarnB5Service extends YarnService { +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/SyncYarnService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5SyncService.java similarity index 80% rename from service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/SyncYarnService.java rename to service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5SyncService.java index f09dcb8..e65ae41 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/SyncYarnService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/YarnB5SyncService.java @@ -9,5 +9,5 @@ import com.dtflys.forest.annotation.BaseRequest; * @date 2023-04-21 */ @BaseRequest(baseURL = "http://service-yarn-query-b2s119") -public interface SyncYarnService extends YarnService { +public interface YarnB5SyncService extends YarnService { } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/BaseController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/BaseController.java index 52bac28..0e1495d 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/BaseController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/BaseController.java @@ -1,23 +1,9 @@ package com.lanyuanxiaoyao.service.web.controller; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; -import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; -import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; -import com.lanyuanxiaoyao.service.forest.service.YarnService; -import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO; -import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil; -import java.util.Comparator; import java.util.HashMap; import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.factory.Maps; -import org.eclipse.collections.api.list.ImmutableList; -import org.eclipse.collections.api.map.ImmutableMap; /** * 放一些 Controller 的辅助方法 @@ -28,12 +14,6 @@ import org.eclipse.collections.api.map.ImmutableMap; public class BaseController { private static final int SUCCESS_STATUS = 0; private static final String SUCCESS_MESSAGE = "OK"; - private static final ImmutableMap> SORT_MAP = Maps.immutable.of( - "startedTime", - YarnApplication::getStartedTime, - "finishedTime", - YarnApplication::getFinishedTime - ); protected AmisResponse responseData() { return AmisResponse.builder() @@ -85,73 +65,4 @@ public class BaseController { .build()) .build(); } - - protected AmisResponse jobList( - YarnService yarnService, - Integer page, - Integer count, - String order, - String direction, - String filterState, - String filterFinalStatus, - String searchId, - String searchName, - // 是否使用精确查询 - Boolean precise - ) { - boolean isFilterState = StrUtil.isNotBlank(filterState); - boolean isFilterFinalStatus = StrUtil.isNotBlank(filterFinalStatus); - boolean isSearchId = StrUtil.isNotBlank(searchId); - boolean isSearchName = StrUtil.isNotBlank(searchName); - Comparator comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP); - ImmutableList applications = yarnService.jobList() - .select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState())) - .select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus())) - .select(app -> !isSearchId || (precise ? StrUtil.equals(app.getId(), searchId) : StrUtil.contains(app.getId(), searchId))) - .select(app -> !isSearchName || (precise ? StrUtil.equals(app.getName(), searchName) : StrUtil.contains(app.getName(), searchName))) - .toSortedList(comparator) - .toImmutable(); - int running = applications.count(app -> ObjectUtil.equal(app.getState(), "RUNNING")); - int unRunning = applications.count(app -> ObjectUtil.notEqual(app.getState(), "RUNNING")); - ImmutableList result = applications - .drop(Math.max(page - 1, 0) * count) - .take(count) - .collect(YarnApplicationVO::new); - return responseCrudData(result, applications.size()) - .withData("running", running) - .withData("unRunning", unRunning); - } - - protected AmisResponse jobCurrent(YarnService yarnService, String name) { - Optional currentApp = yarnService.jobList() - .select(app -> ObjectUtil.equals(app.getName(), name)) - .select(app -> ObjectUtil.equals(app.getState(), "RUNNING")) - .toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP)) - .getFirstOptional(); - if (currentApp.isPresent()) { - return responseData() - .withData("hasCurrent", true) - .withData("current", currentApp.get()); - } else { - return responseData().withData("hasCurrent", false); - } - } - - protected AmisResponse queueList(YarnService yarnService, String names) { - boolean isFilterNames = StrUtil.isNotBlank(names); - ImmutableList filterNames = Lists.immutable.of(names.split(",")); - ImmutableList queues = yarnService.queueList(); - ImmutableList result = queues.select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n))); - return responseCrudData(result, result.size()) - .withData("queueNames", queues.collect(YarnQueue::getQueueName)) - .withData("cluster", yarnService.cluster()); - } - - protected AmisResponse queueNames(YarnService yarnService) { - return responseData(MapUtil.of("queueNames", yarnService.queueList().collect(YarnQueue::getQueueName))); - } - - protected AmisResponse cluster(YarnService yarnService) { - return responseData(MapUtil.of("cluster", yarnService.cluster())); - } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/CompactionYarnController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/CompactionYarnController.java deleted file mode 100644 index 08ea416..0000000 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/CompactionYarnController.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.lanyuanxiaoyao.service.web.controller; - -import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; -import com.lanyuanxiaoyao.service.forest.service.CompactionYarnService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -/** - * Sync Yarn - * - * @author lanyuanxiaoyao - * @date 2023-04-21 - */ -@RestController -@RequestMapping("compaction_yarn") -public class CompactionYarnController extends BaseController { - private static final Logger logger = LoggerFactory.getLogger(CompactionYarnController.class); - - private final CompactionYarnService compactionYarnService; - - @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public CompactionYarnController(CompactionYarnService compactionYarnService) { - this.compactionYarnService = compactionYarnService; - } - - @GetMapping("job_list") - public AmisResponse jobList( - @RequestParam(value = "page", defaultValue = "1") Integer page, - @RequestParam(value = "count", defaultValue = "10") Integer count, - @RequestParam(value = "order", defaultValue = "startedTime") String order, - @RequestParam(value = "direction", defaultValue = "desc") String direction, - @RequestParam(value = "filter_state", defaultValue = "") String filterState, - @RequestParam(value = "filter_final_status", defaultValue = "") String filterFinalStatus, - @RequestParam(value = "search_id", defaultValue = "") String searchId, - @RequestParam(value = "search_name", defaultValue = "") String searchName, - @RequestParam(value = "precise", defaultValue = "false") Boolean precise - ) { - return jobList(compactionYarnService, page, count, order, direction, filterState, filterFinalStatus, searchId, searchName, precise); - } - - @GetMapping("job_current") - public AmisResponse jobCurrent(@RequestParam("name") String name) { - return jobCurrent(compactionYarnService, name); - } - - @GetMapping("queue_list") - public AmisResponse queueList(@RequestParam(value = "names", defaultValue = "") String names) { - return queueList(compactionYarnService, names); - } - - @GetMapping("queue_names") - public AmisResponse queueNames() { - return queueNames(compactionYarnService); - } - - @GetMapping("cluster") - public AmisResponse cluster() { - return cluster(compactionYarnService); - } -} diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/SyncYarnController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/SyncYarnController.java deleted file mode 100644 index 08e9151..0000000 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/SyncYarnController.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.lanyuanxiaoyao.service.web.controller; - -import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; -import com.lanyuanxiaoyao.service.forest.service.SyncYarnService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -/** - * Sync Yarn - * - * @author lanyuanxiaoyao - * @date 2023-04-21 - */ -@RestController -@RequestMapping("sync_yarn") -public class SyncYarnController extends BaseController { - private static final Logger logger = LoggerFactory.getLogger(SyncYarnController.class); - - private final SyncYarnService syncYarnService; - - @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public SyncYarnController(SyncYarnService syncYarnService) { - this.syncYarnService = syncYarnService; - } - - @GetMapping("job_list") - public AmisResponse jobList( - @RequestParam(value = "page", defaultValue = "1") Integer page, - @RequestParam(value = "count", defaultValue = "10") Integer count, - @RequestParam(value = "order", defaultValue = "startedTime") String order, - @RequestParam(value = "direction", defaultValue = "desc") String direction, - @RequestParam(value = "filter_state", defaultValue = "") String filterState, - @RequestParam(value = "filter_final_status", defaultValue = "") String filterFinalStatus, - @RequestParam(value = "search_id", defaultValue = "") String searchId, - @RequestParam(value = "search_name", defaultValue = "") String searchName, - @RequestParam(value = "precise", defaultValue = "false") Boolean precise - ) { - return jobList(syncYarnService, page, count, order, direction, filterState, filterFinalStatus, searchId, searchName, precise); - } - - @GetMapping("job_current") - public AmisResponse jobCurrent(@RequestParam("name") String name) { - return jobCurrent(syncYarnService, name); - } - - @GetMapping("queue_list") - public AmisResponse queueList(@RequestParam(value = "names", defaultValue = "") String names) { - return queueList(syncYarnService, names); - } - - @GetMapping("queue_names") - public AmisResponse queueNames() { - return queueNames(syncYarnService); - } - - @GetMapping("cluster") - public AmisResponse cluster() { - return cluster(syncYarnService); - } -} diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java index 037f6e0..b547365 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/TableController.java @@ -8,9 +8,9 @@ import com.eshore.odcp.hudi.connector.entity.TableMeta; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; -import com.lanyuanxiaoyao.service.forest.service.CompactionYarnService; import com.lanyuanxiaoyao.service.forest.service.InfoService; -import com.lanyuanxiaoyao.service.forest.service.SyncYarnService; +import com.lanyuanxiaoyao.service.forest.service.YarnB1Service; +import com.lanyuanxiaoyao.service.forest.service.YarnB5SyncService; import com.lanyuanxiaoyao.service.web.entity.SyncStateVO; import com.lanyuanxiaoyao.service.web.entity.TableVO; import java.util.List; @@ -41,14 +41,14 @@ public class TableController extends BaseController { private static final Logger logger = LoggerFactory.getLogger(TableController.class); private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); private final InfoService infoService; - private final SyncYarnService syncYarnService; - private final CompactionYarnService compactionYarnService; + private final YarnB5SyncService yarnB5SyncService; + private final YarnB1Service yarnB1Service; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public TableController(InfoService infoService, SyncYarnService syncYarnService, CompactionYarnService compactionYarnService) { + public TableController(InfoService infoService, YarnB5SyncService yarnB5SyncService, YarnB1Service yarnB1Service) { this.infoService = infoService; - this.syncYarnService = syncYarnService; - this.compactionYarnService = compactionYarnService; + this.yarnB5SyncService = yarnB5SyncService; + this.yarnB1Service = yarnB1Service; } @GetMapping("list") diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java new file mode 100644 index 0000000..cb857f4 --- /dev/null +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java @@ -0,0 +1,152 @@ +package com.lanyuanxiaoyao.service.web.controller; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; +import com.lanyuanxiaoyao.service.forest.service.*; +import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO; +import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO; +import com.lanyuanxiaoyao.service.web.utils.ComparatorUtil; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Sync Yarn + * + * @author lanyuanxiaoyao + * @date 2023-04-21 + */ +@RestController +@RequestMapping("yarn") +public class YarnController extends BaseController { + private static final Logger logger = LoggerFactory.getLogger(YarnController.class); + private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); + private static final ImmutableMap> SORT_MAP = Maps.immutable.of( + "startedTime", + YarnApplication::getStartedTime, + "finishedTime", + YarnApplication::getFinishedTime + ); + private final ImmutableMap yarnServices; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public YarnController(YarnB1Service yarnB1Service, YarnB5SyncService yarnB5SyncService, YarnB4Service yarnB4Service, YarnB5Service yarnB5Service) { + yarnServices = Maps.immutable.ofAll(MapUtil.builder() + .put("b1", yarnB1Service) + .put("b5-sync", yarnB5SyncService) + .put("b4", yarnB4Service) + .put("b5", yarnB5Service) + .build()); + } + + private YarnService getService(String cluster) { + if (!yarnServices.containsKey(cluster)) { + throw new RuntimeException(StrUtil.format("Cluster {} not found", cluster)); + } + return yarnServices.get(cluster); + } + + private ImmutableList getServices(List clusters) { + return Lists.immutable.ofAll(clusters).collect(this::getService); + } + + @GetMapping("job_list") + public AmisResponse jobList( + @RequestParam("clusters") List clusters, + @RequestParam(value = "page", defaultValue = "1") Integer page, + @RequestParam(value = "count", defaultValue = "10") Integer count, + @RequestParam(value = "order", defaultValue = "startedTime") String order, + @RequestParam(value = "direction", defaultValue = "desc") String direction, + @RequestParam(value = "filter_state", defaultValue = "") String filterState, + @RequestParam(value = "filter_final_status", defaultValue = "") String filterFinalStatus, + @RequestParam(value = "search_id", defaultValue = "") String searchId, + @RequestParam(value = "search_name", defaultValue = "") String searchName, + @RequestParam(value = "precise", defaultValue = "false") Boolean precise + ) { + boolean isFilterState = StrUtil.isNotBlank(filterState); + boolean isFilterFinalStatus = StrUtil.isNotBlank(filterFinalStatus); + boolean isSearchId = StrUtil.isNotBlank(searchId); + boolean isSearchName = StrUtil.isNotBlank(searchName); + Comparator comparator = ComparatorUtil.longComparator(order, direction, SORT_MAP); + ImmutableList applications = getServices(clusters) + .asParallel(EXECUTOR, 1) + .flatCollect(YarnService::jobList) + .select(app -> !isFilterState || ObjectUtil.contains(filterState, app.getState())) + .select(app -> !isFilterFinalStatus || ObjectUtil.contains(filterFinalStatus, app.getFinalStatus())) + .select(app -> !isSearchId || (precise ? StrUtil.equals(app.getId(), searchId) : StrUtil.contains(app.getId(), searchId))) + .select(app -> !isSearchName || (precise ? StrUtil.equals(app.getName(), searchName) : StrUtil.contains(app.getName(), searchName))) + .toSortedList(comparator) + .toImmutable(); + int running = applications.count(app -> ObjectUtil.equal(app.getState(), "RUNNING")); + int unRunning = applications.count(app -> ObjectUtil.notEqual(app.getState(), "RUNNING")); + ImmutableList result = applications + .drop(Math.max(page - 1, 0) * count) + .take(count) + .collect(YarnApplicationVO::new); + return responseCrudData(result, applications.size()) + .withData("running", running) + .withData("unRunning", unRunning); + } + + @GetMapping("job_current") + public AmisResponse jobCurrent(@RequestParam("clusters") List clusters, @RequestParam("name") String name) { + Optional currentApp = getServices(clusters) + .asParallel(EXECUTOR, 1) + .flatCollect(YarnService::jobList) + .select(app -> ObjectUtil.equals(app.getName(), name)) + .select(app -> ObjectUtil.equals(app.getState(), "RUNNING")) + .toSortedList(ComparatorUtil.longComparator("startedTime", ComparatorUtil.DESC, SORT_MAP)) + .getFirstOptional(); + if (currentApp.isPresent()) { + return responseData() + .withData("hasCurrent", true) + .withData("current", currentApp.get()); + } else { + return responseData().withData("hasCurrent", false); + } + } + + @GetMapping("queue_list") + public AmisResponse queueList(@RequestParam("clusters") List clusters, @RequestParam(value = "names", defaultValue = "") String names) { + boolean isFilterNames = StrUtil.isNotBlank(names); + ImmutableList filterNames = Lists.immutable.of(names.split(",")); + ImmutableList results = getServices(clusters) + .asParallel(EXECUTOR, 1) + .collect(yarnService -> { + YarnRootQueue cluster = yarnService.cluster(); + ImmutableList queues = yarnService.queueList().select(queue -> !isFilterNames || filterNames.anySatisfy(n -> StrUtil.equals(queue.getQueueName(), n))); + return new YarnClusterVO(cluster, queues); + }) + .toList() + .toImmutable(); + return responseCrudData(results, results.size()); + } + + @GetMapping("queue_names") + public AmisResponse queueNames(@RequestParam("clusters") List clusters) { + return responseData(MapUtil.of("queueNames", getServices(clusters).flatCollect(YarnService::queueList).collect(YarnQueue::getQueueName))); + } + + @GetMapping("clusters") + public AmisResponse clusters(@RequestParam("clusters") List clusters) { + return responseData(MapUtil.of("cluster", getServices(clusters).collect(YarnService::cluster))); + } +} diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java index 10fa130..dcfbc89 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java @@ -35,6 +35,10 @@ public class YarnApplicationVO { } } + public String getCluster() { + return yarnApplication.getCluster(); + } + public String getId() { return yarnApplication.getId(); } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnClusterVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnClusterVO.java new file mode 100644 index 0000000..4faac7e --- /dev/null +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnClusterVO.java @@ -0,0 +1,62 @@ +package com.lanyuanxiaoyao.service.web.entity; + +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-05 + */ +public class YarnClusterVO { + @JsonIgnore + private final YarnRootQueue yarnRootQueue; + private final ImmutableList children; + + public YarnClusterVO(YarnRootQueue yarnRootQueue, ImmutableList children) { + this.yarnRootQueue = yarnRootQueue; + this.children = children; + } + + public Boolean isRoot() { + return true; + } + + public String getCluster() { + return yarnRootQueue.getCluster(); + } + + public String getQueueName() { + return StrUtil.format("{} ({})", yarnRootQueue.getQueueName(), yarnRootQueue.getCluster()); + } + + public String getType() { + return yarnRootQueue.getType(); + } + + public Float getAbsoluteCapacity() { + return yarnRootQueue.getCapacity(); + } + + public Float getAbsoluteUsedCapacity() { + return yarnRootQueue.getUsedCapacity(); + } + + public Float getAbsoluteMaxCapacity() { + return yarnRootQueue.getMaxCapacity(); + } + + public ImmutableList getChildren() { + return children; + } + + @Override + public String toString() { + return "YarnClusterVO{" + + "yarnRootQueue=" + yarnRootQueue + + ", yarnQueues=" + children + + '}'; + } +} diff --git a/service-web/src/main/resources/static/components/cloud-tab.js b/service-web/src/main/resources/static/components/cloud-tab.js index 7075270..5f04737 100644 --- a/service-web/src/main/resources/static/components/cloud-tab.js +++ b/service-web/src/main/resources/static/components/cloud-tab.js @@ -20,7 +20,9 @@ function cloudTab() { type: 'crud', title: '服务列表', api: '${base}/cloud/list', - source: '$items', + interval: 2000, + syncLocation: false, + silentPolling: true, headerToolbar: ['reload'], columns: [ {name: 'name', label: '名称'}, diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index f3398dc..6d4f9f5 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -11,7 +11,7 @@ function yarnCrudColumns() { { name: 'id', label: 'ID', - width: 240, + width: 250, fixed: 'left', ...copyField('id') }, @@ -21,12 +21,19 @@ function yarnCrudColumns() { fixed: 'left', ...copyField('name') }, + { + name: 'cluster', + label: '集群', + width: 50, + align: 'center', + }, { label: '用户', width: 80, type: 'tooltip-wrapper', body: '${TRUNCATE(user, 8)}', content: '${user}', + align: 'center', }, { name: 'startedTime', @@ -162,7 +169,7 @@ function yarnCrudColumns() { ] } -function simpleYarnDialog(mode, title) { +function simpleYarnDialog(cluster, title, filterField) { return { title: title, actions: [], @@ -194,8 +201,9 @@ function simpleYarnDialog(mode, title) { type: 'crud', api: { method: 'get', - url: `\${base}/${mode}_yarn/job_list`, + url: `\${base}/yarn/job_list`, data: { + clusters: `${cluster}`, page: '${page|default:undefined}', count: '${perPage|default:undefined}', order: '${orderBy|default:undefined}', @@ -203,7 +211,7 @@ function simpleYarnDialog(mode, title) { filter_state: '${state|default:undefined}', filter_final_status: '${finalStatus|default:undefined}', search_id: '${id|default:undefined}', - search_name: `\${${mode}JobName}`, + search_name: `\${${filterField}}`, precise: true, } }, @@ -286,7 +294,7 @@ function flinkJobDialog() { type: 'action', label: '打开同步详情', actionType: 'dialog', - dialog: simpleYarnDialog('sync', '同步详情') + dialog: simpleYarnDialog('b5-sync', '同步详情', 'syncJobName') }, {type: 'divider'}, { @@ -386,14 +394,14 @@ function tableMetaDialog() { type: 'action', icon: 'fa fa-arrows-rotate', actionType: 'dialog', - dialog: simpleYarnDialog('sync', '同步详情') + dialog: simpleYarnDialog('b5-sync', '同步详情', 'syncJobName') }, { label: '压缩情况', type: 'action', icon: 'fa fa-minimize', actionType: 'dialog', - dialog: simpleYarnDialog('compaction', '压缩详情') + dialog: simpleYarnDialog('b1', '压缩详情', 'compactionJobName') }, { type: 'button', diff --git a/service-web/src/main/resources/static/components/yarn-tab.js b/service-web/src/main/resources/static/components/yarn-tab.js index 00d24d5..9f695c6 100644 --- a/service-web/src/main/resources/static/components/yarn-tab.js +++ b/service-web/src/main/resources/static/components/yarn-tab.js @@ -1,29 +1,25 @@ -function yarnTab(name, title, queueNames = 'default', searchName = undefined) { +function yarnTab(cluster, title, queueNames = 'default', searchName = undefined) { return { title: `${title} 集群`, tab: [ { - id: `${name}-yarn-service`, - name: `${name}-yarn-service`, + id: `${cluster}-yarn-service`, + name: `${cluster}-yarn-service`, type: 'service', body: [ { type: 'tpl', tpl: '集群资源', - // className: 'mb-2 block', }, { type: 'crud', api: { method: 'get', - url: `\${base}/${name}_yarn/queue_list`, + url: '${base}/yarn/queue_list', data: { + clusters: `${cluster}`, names: '${queueName|default:undefined}' }, - responseData: { - '&': '$$', - clusterUsage: '${ROUND((cluster.usedCapacity * 100 / cluster.maxCapacity), 0)}' - }, }, defaultParams: { queueName: queueNames, @@ -33,10 +29,6 @@ function yarnTab(name, title, queueNames = 'default', searchName = undefined) { silentPolling: true, headerToolbar: [ "reload", - { - type: "tpl", - tpl: "集群总资源 ${clusterUsage}%" - }, ], columns: [ { @@ -86,6 +78,7 @@ function yarnTab(name, title, queueNames = 'default', searchName = undefined) { type: "button", level: "link", tooltip: '查看队列详情', + visibleOn: '${!root}', actionType: 'dialog', dialog: { closeOnEsc: true, @@ -141,8 +134,9 @@ function yarnTab(name, title, queueNames = 'default', searchName = undefined) { type: 'crud', api: { method: 'get', - url: `\${base}/${name}_yarn/job_list`, + url: '${base}/yarn/job_list', data: { + clusters: `${cluster}`, page: '${page|default:undefined}', count: '${perPage|default:undefined}', order: '${orderBy|default:undefined}', diff --git a/service-web/src/main/resources/static/index.html b/service-web/src/main/resources/static/index.html index 40dd435..23313da 100644 --- a/service-web/src/main/resources/static/index.html +++ b/service-web/src/main/resources/static/index.html @@ -12,11 +12,11 @@ /> Hudi 服务页面 - - + + - - + + @@ -52,8 +52,11 @@ tabsMode: 'strong', tabs: [ tableTab(), - yarnTab('sync', 'Sync'), - yarnTab('compaction', 'Compaction', 'datalake,tyly', 'Compaction'), + yarnTab('b5-sync', '同步 b5', undefined, 'Sync'), + yarnTab(' b1,b4,b5', '压缩 b1 b4 b5', 'datalake,tyly'), + yarnTab(' b1', '压缩 b1', 'datalake,tyly', 'Compaction'), + yarnTab(' b4', '压缩 b4'), + yarnTab(' b5', '压缩 b5'), cloudTab(), ] } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/configuration/YarnConfiguration.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/configuration/YarnConfiguration.java index 2ea71b3..ebabf14 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/configuration/YarnConfiguration.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/configuration/YarnConfiguration.java @@ -12,8 +12,17 @@ import org.springframework.stereotype.Component; @Component @ConfigurationProperties("yarn") public class YarnConfiguration { + private String cluster; private String webUrl; + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + public String getWebUrl() { return webUrl; } @@ -25,7 +34,8 @@ public class YarnConfiguration { @Override public String toString() { return "YarnConfiguration{" + - "webUrl='" + webUrl + '\'' + + "cluster='" + cluster + '\'' + + ", webUrl='" + webUrl + '\'' + '}'; } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/ClusterServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/ClusterServiceImpl.java index 791d1db..ea03170 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/ClusterServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/ClusterServiceImpl.java @@ -40,6 +40,6 @@ public class ClusterServiceImpl implements ClusterService { public YarnRootQueue info() throws JsonProcessingException { String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler"); String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); - return mapper.readValue(body, ClusterInfoResponse.class).getScheduler().getSchedulerInfo(); + return mapper.readValue(body, ClusterInfoResponse.class).getScheduler().getSchedulerInfo().setCluster(yarnConfiguration.getCluster()); } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobAutoRefreshServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobAutoRefreshServiceImpl.java index 9464f0d..5138ade 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobAutoRefreshServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobAutoRefreshServiceImpl.java @@ -41,7 +41,7 @@ public class JobAutoRefreshServiceImpl implements JobService { public void refresh() throws JsonProcessingException { String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps"); String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); - ImmutableList apps = mapper.readValue(body, ApplicationsListResponse.class).getApps().getApp(); + ImmutableList apps = mapper.readValue(body, ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(yarnConfiguration.getCluster())); CACHE.set(apps); } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobServiceImpl.java index 212483e..72c61c7 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/JobServiceImpl.java @@ -44,7 +44,7 @@ public class JobServiceImpl implements JobService { public ImmutableList list() throws JsonProcessingException { String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps"); try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) { - return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp(); + return mapper.readValue(response.body(), ApplicationsListResponse.class).getApps().getApp().tap(app -> app.setCluster(yarnConfiguration.getCluster())); } } @@ -52,14 +52,14 @@ public class JobServiceImpl implements JobService { @Retryable(Throwable.class) @Override public ImmutableList listEquals(String name) throws JsonProcessingException { - return list().select(app -> StrUtil.equals(app.getName(), name)); + return list().select(app -> StrUtil.equals(app.getName(), name)).tap(app -> app.setCluster(yarnConfiguration.getCluster())); } @Cacheable(value = "job-list", sync = true, key = "#{methodName+name}") @Retryable(Throwable.class) @Override public ImmutableList listLike(String name) throws JsonProcessingException { - return list().select(app -> StrUtil.contains(app.getName(), name)); + return list().select(app -> StrUtil.contains(app.getName(), name)).tap(app -> app.setCluster(yarnConfiguration.getCluster())); } @Override @@ -72,7 +72,7 @@ public class JobServiceImpl implements JobService { public YarnApplication detail(String applicationId) throws JsonProcessingException { String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/apps/" + applicationId); try (HttpResponse response = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute()) { - return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp(); + return mapper.readValue(response.body(), ApplicationDetailResponse.class).getApp().setCluster(yarnConfiguration.getCluster()); } } } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueAutoRefreshServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueAutoRefreshServiceImpl.java index 32a7f43..5c81a89 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueAutoRefreshServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueAutoRefreshServiceImpl.java @@ -42,7 +42,8 @@ public class QueueAutoRefreshServiceImpl implements QueueService { String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler"); String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); QueueListResponse response = mapper.readValue(body, QueueListResponse.class); - ImmutableList queues = Lists.immutable.ofAll(response.getScheduler().getSchedulerInfo().getQueues().getQueue()); + ImmutableList queues = Lists.immutable.ofAll(response.getScheduler().getSchedulerInfo().getQueues().getQueue()) + .tap(q -> q.setCluster(yarnConfiguration.getCluster())); CACHE.set(queues); } diff --git a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java index 0610773..4453fa6 100644 --- a/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java +++ b/service-yarn-query/src/main/java/com/lanyuanxiaoyao/service/yarn/service/impl/QueueServiceImpl.java @@ -44,7 +44,7 @@ public class QueueServiceImpl implements QueueService { String queryUrl = URLUtil.completeUrl(yarnConfiguration.getWebUrl(), "/ws/v1/cluster/scheduler"); String body = HttpUtil.createGet(queryUrl).setMaxRedirectCount(10).execute().body(); QueueListResponse response = mapper.readValue(body, QueueListResponse.class); - return Lists.immutable.ofAll(response.getScheduler().getSchedulerInfo().getQueues().getQueue()); + return Lists.immutable.ofAll(response.getScheduler().getSchedulerInfo().getQueues().getQueue()).tap(q -> q.setCluster(yarnConfiguration.getCluster())); } @Cacheable(value = "queue-detail", sync = true, key = "#name") @@ -54,6 +54,7 @@ public class QueueServiceImpl implements QueueService { return list() .select(q -> StrUtil.equals(q.getQueueName(), name)) .getFirstOptional() - .orElseThrow(() -> new Exception("cannot found " + name)); + .orElseThrow(() -> new Exception("cannot found " + name)) + .setCluster(yarnConfiguration.getCluster()); } }