diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index b907290..2622278 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -122,6 +122,9 @@ public interface InfoService { @Get("/info/non_exists_table_by_hdfs") Boolean nonExistsTableByHdfs(@Query("hdfs") String hdfs); + @Get("/info/all_flink_job_id_and_alias") + ImmutableList allFlinkJobIdAndAlias(); + @Get("/info/all_flink_job_id") ImmutableList allFlinkJobId(); diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java index 9c7923f..46aa0a3 100644 --- a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/InfoController.java @@ -63,6 +63,12 @@ public class InfoController { return infoService.jobAndMetas(); } + @GetMapping("/all_flink_job_id_and_alias") + public ImmutableList allFlinkJobIdAndAlias() { + return infoService.allTableInfoSearchCache() + .collect(cache -> new JobIdAndAlias(cache.getFlinkJobId(), cache.getAlias())); + } + @GetMapping("/all_flink_job_id") public ImmutableList allFlinkJobId( @RequestParam(value = "key", required = false) String key, diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/OverviewController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/OverviewController.java index c31e89b..caa32f4 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/OverviewController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/OverviewController.java @@ -1,14 +1,13 @@ package com.lanyuanxiaoyao.service.web.controller; import cn.hutool.core.util.StrUtil; +import com.eshore.odcp.hudi.connector.utils.NameHelper; import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.info.JobIdAndAlias; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; -import com.lanyuanxiaoyao.service.forest.service.InfoService; -import com.lanyuanxiaoyao.service.forest.service.QueueService; -import com.lanyuanxiaoyao.service.forest.service.ScheduleService; -import com.lanyuanxiaoyao.service.forest.service.YarnService; +import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode; +import com.lanyuanxiaoyao.service.forest.service.*; import com.lanyuanxiaoyao.service.web.controller.base.AmisCrudResponse; import com.lanyuanxiaoyao.service.web.controller.base.AmisMapResponse; import com.lanyuanxiaoyao.service.web.controller.base.AmisResponse; @@ -18,8 +17,10 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; @@ -42,13 +43,15 @@ public class OverviewController extends BaseController { private final YarnService yarnService; private final QueueService queueService; private final ScheduleService scheduleService; + private final ZookeeperService zookeeperService; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public OverviewController(InfoService infoService, YarnService yarnService, QueueService queueService, ScheduleService scheduleService) { + public OverviewController(InfoService infoService, YarnService yarnService, QueueService queueService, ScheduleService scheduleService, ZookeeperService zookeeperService) { this.infoService = infoService; this.yarnService = yarnService; this.queueService = queueService; this.scheduleService = scheduleService; + this.zookeeperService = zookeeperService; } @GetMapping("") @@ -143,4 +146,44 @@ public class OverviewController extends BaseController { public AmisCrudResponse scheduleJobs() { return AmisResponse.responseCrudData(scheduleService.scheduleJobs()); } + + @GetMapping("sync_running_status") + public AmisMapResponse syncRunningStatus() { + ImmutableList locks = zookeeperService.getChildren(NameHelper.ZK_SYNC_RUNNING_LOCK_PATH).collect(ZookeeperNode::getPath); + MutableList runningJob = Lists.mutable.empty(); + MutableList unRunningJob = Lists.mutable.empty(); + MutableList runningTable = Lists.mutable.empty(); + MutableList unRunningTable = Lists.mutable.empty(); + ImmutableList idAliases = infoService.allFlinkJobIdAndAlias(); + ImmutableList ids = idAliases.collect(JobIdAndAlias::getId).distinct(); + ids + .forEach(id -> { + String lockPath = NameHelper.syncRunningLockPath(id); + if (locks.contains(lockPath)) { + runningJob.add(new JobIdAndAlias(id, "")); + } else { + unRunningJob.add(new JobIdAndAlias(id, "")); + } + }); + idAliases + .forEach(ia -> { + String lockPath = NameHelper.syncRunningLockPath(ia.getId(), ia.getAlias()); + if (locks.contains(lockPath)) { + runningTable.add(ia); + } else { + unRunningTable.add(ia); + } + }); + return AmisResponse.responseMapData() + .setData("totalJob", ids.size()) + .setData("runningJob", runningJob.size()) + .setData("runningJobList", runningJob) + .setData("unRunningJob", unRunningJob.size()) + .setData("unRunningJobList", unRunningJob) + .setData("totalTable", idAliases.size()) + .setData("runningTable", runningTable.size()) + .setData("runningTableList", runningTable) + .setData("unRunningTable", unRunningTable.size()) + .setData("unRunningTableList", unRunningTable); + } } diff --git a/service-web/src/main/resources/static/components/overview-tab.js b/service-web/src/main/resources/static/components/overview-tab.js index 2b5dc10..29e0a20 100644 --- a/service-web/src/main/resources/static/components/overview-tab.js +++ b/service-web/src/main/resources/static/components/overview-tab.js @@ -94,6 +94,123 @@ function versionDetailDialog(variable, target) { } } +function jobDetailDialog(variable, targetList) { + return { + disabledOn: `${variable} === 0`, + type: 'action', + label: '详情', + level: 'link', + size: 'sm', + actionType: 'dialog', + dialog: { + title: '详情', + size: 'sm', + ...readOnlyDialogOptions(), + body: [ + { + type: "table", + source: `\${${targetList}}`, + affixHeader: false, + columns: [ + { + label: 'Flink job id', + fixed: 'left', + type: 'wrapper', + size: 'none', + body: [ + { + type: 'tpl', + tpl: '${id}', + }, + { + type: 'action', + level: 'link', + label: '', + icon: 'fa fa-copy', + size: 'xs', + actionType: 'copy', + content: '${id}', + tooltip: '复制 ID', + } + ], + }, + ] + } + ] + } + } +} + +function tableDetailDialog(variable, targetList) { + return { + disabledOn: `${variable} === 0`, + type: 'action', + label: '详情', + level: 'link', + size: 'sm', + actionType: 'dialog', + dialog: { + title: '详情', + size: 'md', + ...readOnlyDialogOptions(), + body: [ + { + type: "table", + source: `\${${targetList}}`, + affixHeader: false, + columns: [ + { + label: 'Flink job id', + fixed: 'left', + type: 'wrapper', + size: 'none', + body: [ + { + type: 'tpl', + tpl: '${id}', + }, + { + type: 'action', + level: 'link', + label: '', + icon: 'fa fa-copy', + size: 'xs', + actionType: 'copy', + content: '${id}', + tooltip: '复制 ID', + } + ], + }, + { + label: '别名', + type: 'wrapper', + fixed: 'left', + size: 'none', + className: 'nowrap', + body: [ + { + type: 'tpl', + tpl: '${alias}', + }, + { + type: 'action', + level: 'link', + label: '', + icon: 'fa fa-copy', + size: 'xs', + actionType: 'copy', + content: '${alias}', + tooltip: '复制别名', + }, + ], + }, + ] + } + ] + } + } +} + function overviewYarnJob(cluster, search, queue, yarnQueue) { return { className: 'font-mono', @@ -220,6 +337,42 @@ function overviewTab() { ] }, {type: 'divider'}, + { + type: 'service', + api: '${base}/overview/sync_running_status', + body: [ + { + type: 'tpl', + tpl: '任务数${totalJob}' + }, + { + className: 'mx-2', + type: 'tpl', + tpl: '运行中${PADSTART(runningJob, 3)}' + }, + { + type: 'tpl', + tpl: '已停止${PADSTART(unRunningJob, 3)}' + }, + jobDetailDialog('unRunningJob', 'unRunningJobList'), + '
', + { + type: 'tpl', + tpl: '总表数${totalTable}' + }, + { + className: 'mx-2', + type: 'tpl', + tpl: '运行中${PADSTART(runningTable, 3)}' + }, + { + type: 'tpl', + tpl: '已停止${PADSTART(unRunningTable, 3)}' + }, + tableDetailDialog('unRunningTable', 'unRunningTableList'), + ] + }, + {type: 'divider'}, '集群 (集群总资源使用,队列资源使用)(调度中任务数,运行中任务数)', overviewYarnJob('b12', 'Sync', undefined, 'default'), {type: 'divider'}, diff --git a/test/test.http b/test/test.http index 7c0992e..d6110e5 100644 --- a/test/test.http +++ b/test/test.http @@ -73,3 +73,6 @@ GET {{web-url}}/cloud/deploy_plan ### Get info query services GET {{services-url}}/cloud/services?service_name=center-gateway-new + +### Sync status +GET {{web-url}}/overview/sync_running_status