From eabe69a30d7a3d158b49447ed57af09cee30e6b4 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Fri, 5 May 2023 18:54:52 +0800 Subject: [PATCH] =?UTF-8?q?feature(web):=20=E6=96=B0=E5=A2=9E=20Flink=20?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E6=9F=A5=E8=AF=A2=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/flink/FlinkVertex.java | 9 +-- .../service/forest/service/FlinkService.java | 45 ++++++------ .../web/controller/FlinkController.java | 55 +++++++++++++++ .../service/web/entity/FlinkOverviewVO.java | 29 ++++++++ .../resources/static/components/common.js | 68 +++++++++++++++++-- 5 files changed, 174 insertions(+), 32 deletions(-) create mode 100644 service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java create mode 100644 service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkOverviewVO.java diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertex.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertex.java index bcc86bf..1f82788 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertex.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertex.java @@ -25,7 +25,8 @@ public class FlinkVertex { private Integer maxParallelism; private Long now; private Timestamps timestamps; - private ImmutableList vertices; + @JsonAlias("vertices") + private ImmutableList children; private Metrics metrics; public String getJid() { @@ -72,8 +73,8 @@ public class FlinkVertex { return timestamps; } - public ImmutableList getVertices() { - return vertices; + public ImmutableList getChildren() { + return children; } public Metrics getMetrics() { @@ -320,7 +321,7 @@ public class FlinkVertex { ", maxParallelism=" + maxParallelism + ", now=" + now + ", timestamps=" + timestamps + - ", vertices=" + vertices + + ", children=" + children + ", metrics=" + metrics + '}'; } diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/FlinkService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/FlinkService.java index e4e176d..4c1ea91 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/FlinkService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/FlinkService.java @@ -4,6 +4,7 @@ import com.dtflys.forest.annotation.BaseRequest; import com.dtflys.forest.annotation.Get; import com.dtflys.forest.annotation.Query; import com.lanyuanxiaoyao.service.configuration.entity.flink.*; +import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.map.ImmutableMap; /** @@ -14,36 +15,36 @@ import org.eclipse.collections.api.map.ImmutableMap; */ @BaseRequest(baseURL = "http://service-flink-query") public interface FlinkService { - @Get("/overview") - public FlinkOverview overview(@Query("url") String url); + @Get("/flink/overview") + FlinkOverview overview(@Query("url") String url); - @Get("/config") - public FlinkConfig config(@Query("url") String url); + @Get("/flink/config") + FlinkConfig config(@Query("url") String url); - @Get("/job_manager_config") - public ImmutableMap jobManagerConfig(@Query("url") String url); + @Get("/flink/job_manager_config") + ImmutableList jobManagerConfig(@Query("url") String url); - @Get("/vertex_overview") - public FlinkVertexOverview vertexOverview(@Query("url") String url); + @Get("/flink/vertex_overview") + FlinkVertexOverview vertexOverview(@Query("url") String url); - @Get("/vertex") - public FlinkVertex vertex(@Query("url") String url, @Query("vertex_id") String vertexId); + @Get("/flink/vertex") + FlinkVertex vertex(@Query("url") String url, @Query("vertex_id") String vertexId); - @Get("/vertex_config") - public FlinkVertexConfig vertexConfig(@Query("url") String url, @Query("vertex_id") String vertexId); + @Get("/flink/vertex_config") + FlinkVertexConfig vertexConfig(@Query("url") String url, @Query("vertex_id") String vertexId); - @Get("/checkpoint_overview") - public FlinkCheckpointOverview checkpointOverview(@Query("url") String url, @Query("vertex_id") String vertexId); + @Get("/flink/checkpoint_overview") + FlinkCheckpointOverview checkpointOverview(@Query("url") String url, @Query("vertex_id") String vertexId); - @Get("/checkpoint") - public FlinkCheckpoint checkpoint(@Query("url") String url, @Query("vertex_id") String vertexId, @Query("checkpoint_id") String checkpointId); + @Get("/flink/checkpoint") + FlinkCheckpoint checkpoint(@Query("url") String url, @Query("vertex_id") String vertexId, @Query("checkpoint_id") String checkpointId); - @Get("/checkpoint_config") - public FlinkCheckpointConfig checkpointConfig(@Query("url") String url, @Query("vertex_id") String vertexId); + @Get("/flink/checkpoint_config") + FlinkCheckpointConfig checkpointConfig(@Query("url") String url, @Query("vertex_id") String vertexId); - @Get("/task_manager_overview") - public FlinkTaskManagerOverview taskManagerOverview(@Query("url") String url); + @Get("/flink/task_manager_overview") + FlinkTaskManagerOverview taskManagerOverview(@Query("url") String url); - @Get("/task_manager") - public FlinkTaskManager taskManager(@Query("url") String url, @Query("task_manager_id") String taskManagerId); + @Get("/flink/task_manager") + FlinkTaskManager taskManager(@Query("url") String url, @Query("task_manager_id") String taskManagerId); } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java new file mode 100644 index 0000000..c772d7e --- /dev/null +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java @@ -0,0 +1,55 @@ +package com.lanyuanxiaoyao.service.web.controller; + +import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkOverview; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex; +import com.lanyuanxiaoyao.service.forest.service.FlinkService; +import com.lanyuanxiaoyao.service.web.entity.FlinkOverviewVO; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Flink 接口 + * + * @author lanyuanxiaoyao + * @date 2023-05-05 + */ +@RestController +@RequestMapping("flink") +public class FlinkController extends BaseController { + private static final Logger logger = LoggerFactory.getLogger(FlinkController.class); + private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20); + + private final FlinkService flinkService; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public FlinkController(FlinkService flinkService) { + this.flinkService = flinkService; + } + + @GetMapping("overview") + public AmisResponse overview(@RequestParam("url") String url) throws ExecutionException, InterruptedException { + CompletableFuture overviewFuture = CompletableFuture.supplyAsync(() -> flinkService.overview(url), EXECUTOR); + CompletableFuture> jobsFuture = CompletableFuture.supplyAsync(() -> flinkService.vertexOverview(url) + .getJobs() + .asParallel(EXECUTOR, 1) + .collect(vertex -> flinkService.vertex(url, vertex.getJid())) + .toList() + .toImmutable(), EXECUTOR); + CompletableFuture.allOf(overviewFuture).get(); + + return responseDetail(new FlinkOverviewVO( + overviewFuture.get(), + jobsFuture.get() + )); + } +} diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkOverviewVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkOverviewVO.java new file mode 100644 index 0000000..60722a0 --- /dev/null +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkOverviewVO.java @@ -0,0 +1,29 @@ +package com.lanyuanxiaoyao.service.web.entity; + +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkOverview; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * Flink overview + * + * @author lanyuanxiaoyao + * @date 2023-05-05 + */ +public class FlinkOverviewVO { + private final FlinkOverview overview; + private final ImmutableList jobs; + + public FlinkOverviewVO(FlinkOverview overview, ImmutableList jobs) { + this.overview = overview; + this.jobs = jobs; + } + + public FlinkOverview getOverview() { + return overview; + } + + public ImmutableList getJobs() { + return jobs; + } +} diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index 6d4f9f5..3355211 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -175,20 +175,75 @@ function simpleYarnDialog(cluster, title, filterField) { actions: [], size: 'xl', body: [ - /*{ + { type: 'service', api: { method: 'get', - url: `\${base}/${mode}_yarn/job_current`, + url: '${base}/yarn/job_current', data: { - name: `\${${mode}JobName}`, + clusters: `${cluster}`, + name: `\${${filterField}}`, } }, body: [ { - type: 'iframe', - src: '${current.trackingUrl}', + type: 'wrapper', + size: 'none', visibleOn: '${hasCurrent}', + body: [ + { + type: 'service', + api: { + method: 'get', + url: '${base}/flink/overview', + data: { + url: '${current.trackingUrl}' + } + }, + body: [ + { + type: 'property', + title: 'Flink 基本信息', + column: 4, + items: [ + {label: 'Flink 版本', content: '${detail.overview.flinkVersion}'}, + {label: 'Flink 小版本', content: '${detail.overview.flinkCommit}', span: 3}, + {label: '运行中', content: '${detail.overview.jobsRunning}'}, + {label: '已结束', content: '${detail.overview.jobsFinished}'}, + {label: '已失败', content: '${detail.overview.jobsFailed}'}, + {label: '被取消', content: '${detail.overview.jobsCanceled}'}, + {label: 'Slot (可用/总数)', content: '${detail.overview.slotsAvailable}/${detail.overview.slotsTotal}', span: 4}, + ] + }, + { + type: 'table', + title: '任务详情', + source: '${detail.jobs}', + columns: [ + { + name: 'name', + label: '名称', + width: 2000, + }, + { + name: 'metrics.readRecords', + label: '读记录数', + width: 60, + align: 'center', + fixed: 'right', + }, + { + name: 'metrics.writeRecords', + label: '写记录数', + width: 60, + align: 'center', + fixed: 'right', + }, + ] + } + ] + } + ] }, { type: 'tpl', @@ -196,7 +251,8 @@ function simpleYarnDialog(cluster, title, filterField) { visibleOn: '${!hasCurrent}', }, ], - },*/ + }, + {type: 'divider'}, { type: 'crud', api: {