diff --git a/.gitignore b/.gitignore index 13d1d74..5835221 100644 --- a/.gitignore +++ b/.gitignore @@ -31,22 +31,7 @@ buildNumber.properties !.vscode/*.code-snippets .history/ *.vsix -.idea/**/workspace.xml -.idea/**/tasks.xml -.idea/**/usage.statistics.xml -.idea/**/dictionaries -.idea/**/shelf -.idea/**/aws.xml -.idea/**/contentModel.xml -.idea/**/dataSources/ -.idea/**/dataSources.ids -.idea/**/dataSources.local.xml -.idea/**/sqlDataSources.xml -.idea/**/dynamic.xml -.idea/**/uiDesigner.xml -.idea/**/dbnavigator.xml -.idea/**/gradle.xml -.idea/**/libraries +.idea/** cmake-build-*/ .idea/**/mongoSettings.xml *.iws diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java index d84ce96..61ca866 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/YarnController.java @@ -5,9 +5,12 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertexOverview; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; +import com.lanyuanxiaoyao.service.forest.service.FlinkService; import com.lanyuanxiaoyao.service.forest.service.YarnService; import com.lanyuanxiaoyao.service.web.entity.YarnApplicationVO; import com.lanyuanxiaoyao.service.web.entity.YarnClusterVO; @@ -44,10 +47,12 @@ public class YarnController extends BaseController { YarnApplication::getFinishedTime ); private final YarnService yarnService; + private final FlinkService flinkService; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") - public YarnController(YarnService yarnService) { + public YarnController(YarnService yarnService, FlinkService flinkService) { this.yarnService = yarnService; + this.flinkService = flinkService; } @GetMapping("job_list") @@ -61,7 +66,8 @@ public class YarnController extends BaseController { @RequestParam(value = "filter_final_status", defaultValue = "") String filterFinalStatus, @RequestParam(value = "search_id", defaultValue = "") String searchId, @RequestParam(value = "search_name", defaultValue = "") String searchName, - @RequestParam(value = "precise", defaultValue = "false") Boolean precise + @RequestParam(value = "precise", defaultValue = "false") Boolean precise, + @RequestParam(value = "completion", defaultValue = "false") Boolean completion ) { boolean isFilterState = StrUtil.isNotBlank(filterState); boolean isFilterFinalStatus = StrUtil.isNotBlank(filterFinalStatus); @@ -82,7 +88,36 @@ public class YarnController extends BaseController { ImmutableList result = applications .drop(Math.max(page - 1, 0) * count) .take(count) - .collect(YarnApplicationVO::new); + .asParallel(ExecutorProvider.EXECUTORS, 1) + .collect(yarnApplication -> { + YarnApplicationVO vo = new YarnApplicationVO(yarnApplication); + if (!completion) { + return vo; + } + try { + if (vo.getCompactionApplication() + && StrUtil.equals(yarnApplication.getState(), "RUNNING") + && StrUtil.isNotBlank(yarnApplication.getTrackingUrl())) { + FlinkVertexOverview vertexOverview = flinkService.vertexOverview(yarnApplication.getTrackingUrl()); + if (ObjectUtil.isNotNull(vertexOverview) && ObjectUtil.isNotEmpty(vertexOverview.getJobs())) { + Optional vertex = vertexOverview.getJobs().getFirstOptional(); + if (vertex.isPresent() && ObjectUtil.isNotNull(vertex.get().getTasks())) { + FlinkVertex.Tasks tasks = vertex.get().getTasks(); + vo.setCompactionCompletionRatio( + ObjectUtil.isNotNull(tasks.getTotal()) && tasks.getTotal() != 0 + ? tasks.getRunning() * 1.0 / tasks.getTotal() + : 0 + ); + } + } + } + } catch (Throwable e) { + logger.error("Get ratio failure", e); + } + return vo; + }) + .toList() + .toImmutable(); return responseCrudData(result, applications.size()) .withData("running", running) .withData("unRunning", unRunning); diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java index a73d7af..e0c2fe0 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/YarnApplicationVO.java @@ -28,6 +28,8 @@ public class YarnApplicationVO { private String flinkJobName; private String alias; + private Double compactionCompletionRatio = 0.0; + public YarnApplicationVO(YarnApplication yarnApplication) { this.yarnApplication = yarnApplication; @@ -164,4 +166,12 @@ public class YarnApplicationVO { public String getAlias() { return alias; } + + public Double getCompactionCompletionRatio() { + return compactionCompletionRatio; + } + + public void setCompactionCompletionRatio(Double compactionCompletionRatio) { + this.compactionCompletionRatio = compactionCompletionRatio; + } } diff --git a/test/test.http b/test/test.http index e117961..ca15255 100644 --- a/test/test.http +++ b/test/test.http @@ -12,7 +12,7 @@ GET {{queue-url}}/queue/names GET {{queue-url}}/queue/all/compaction-queue-b1 ### 新增 -POST {{queue-url}}/queue/add/compaction-queue-test +POST {{queue-url}}/queue/add/compaction-queue-b1 Content-Type: application/json { @@ -20,8 +20,8 @@ Content-Type: application/json "priority": 1, "data": { "id": "{{$guid}}", - "flinkJobId": "1542097983881048064", - "alias": "crm_cfguse_area_code", + "flinkJobId": "1542097996099055616", + "alias": "acct_acct_item_fs", "batch": "ojvfodao_hj", "status": "SCHEDULE", "comment": "Comment" diff --git a/web/components/common.js b/web/components/common.js index 7804537..6502a50 100644 --- a/web/components/common.js +++ b/web/components/common.js @@ -214,6 +214,15 @@ function yarnCrudColumns() { className: 'nowrap', type: 'tpl', tpl: "${IF(syncApplication, 'S', IF(compactionApplication, 'C', ''))}${IF(hudiApplication, '', '')}${IF(syncApplication, flinkJobName, IF(compactionApplication, alias, name))}", + backgroundScale: { + min: 0.001, + max: 1.000, + source: '${compactionCompletionRatio}', + colors: [ + '#FFFFFF', + '#DD4150', + ] + } }, { name: 'cluster', diff --git a/web/components/yarn-tab.js b/web/components/yarn-tab.js index 31c39eb..545edd7 100644 --- a/web/components/yarn-tab.js +++ b/web/components/yarn-tab.js @@ -32,6 +32,7 @@ function yarnTab(cluster, title, queueNames = 'root', searchName = undefined) { filter_final_status: '${finalStatus|default:undefined}', search_id: '${id|default:undefined}', search_name: '${name|default:undefined}', + completion: 'true', } }, defaultParams: {