refactor(web): 优化 Flink 信息查询

This commit is contained in:
2023-05-06 10:53:56 +08:00
parent 71df97203d
commit 92973ee09e
5 changed files with 142 additions and 17 deletions

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.web.controller;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkCheckpoint;
import com.lanyuanxiaoyao.service.forest.service.FlinkService;
@@ -40,12 +41,31 @@ public class FlinkController extends BaseController {
}
@GetMapping("jobs")
public AmisResponse jobs(@RequestParam("url") String url) {
public AmisResponse jobs(
@RequestParam("url") String url,
@RequestParam(value = "mode", required = false) String mode,
@RequestParam(value = "schema", required = false) String schema,
@RequestParam(value = "table", required = false) String table
) {
boolean isAllInOne = StrUtil.isNotBlank(mode) && StrUtil.equals(mode, "ALL_IN_ONE");
boolean isFilterSchema = StrUtil.isNotBlank(schema);
boolean isFilterTable = StrUtil.isNotBlank(table);
ImmutableList<FlinkVertexVO> vertexVOS = flinkService.vertexOverview(url)
.getJobs()
.asParallel(EXECUTOR, 1)
.collect(vertex -> flinkService.vertex(url, vertex.getJid()))
.select(vertex -> !isFilterSchema || StrUtil.contains(vertex.getName(), schema))
.select(vertex -> isAllInOne || (!isFilterTable || StrUtil.contains(vertex.getName(), table)))
.collect(vertex -> {
ImmutableList<FlinkVertexVO.FlinkVertexChildVO> children = vertex.getChildren()
// 过滤名字中包含相关内容的算子
.select(child ->
StrUtil.contains(child.getName(), "compaction_source")
|| StrUtil.contains(child.getName(), "compact_task")
|| StrUtil.contains(child.getName(), "compaction_commit")
|| (!isFilterTable || StrUtil.contains(child.getName(), table))
)
.collect(child -> new FlinkVertexVO.FlinkVertexChildVO(child, StrUtil.format("{}/#/job/{}/overview/{}", url, vertex.getJid(), child.getJid())));
ImmutableList<FlinkCheckpoint> checkpoints = flinkService.checkpointOverview(url, vertex.getJid())
.getHistory()
.asParallel(EXECUTOR, 1)
@@ -53,7 +73,7 @@ public class FlinkController extends BaseController {
.toSortedListBy(FlinkCheckpoint::getId)
.reverseThis()
.toImmutable();
return new FlinkVertexVO(vertex, checkpoints);
return new FlinkVertexVO(vertex, StrUtil.format("{}/#/job/{}", url, vertex.getJid()), children, checkpoints);
})
.toSortedListBy(FlinkVertexVO::getName)
.toImmutable();

View File

@@ -16,30 +16,34 @@ public final class FlinkVertexVO {
private final FlinkVertex vertex;
private final ImmutableList<FlinkCheckpoint> checkpoints;
private final CheckpointMetrics checkpointMetrics;
private final String page;
private final ImmutableList<FlinkVertexChildVO> children;
private Metrics metrics;
private CheckpointMetrics checkpointMetrics;
public FlinkVertexVO(FlinkVertex vertex, ImmutableList<FlinkCheckpoint> checkpoints) {
public FlinkVertexVO(FlinkVertex vertex, String page, ImmutableList<FlinkVertexChildVO> children, ImmutableList<FlinkCheckpoint> checkpoints) {
this.vertex = vertex;
this.page = page;
if (ObjectUtil.isNotNull(vertex.getChildren())) {
this.children = children;
if (ObjectUtil.isNotNull(children)) {
this.metrics = new Metrics(
vertex.getChildren()
children
.reject(ObjectUtil::isNull)
.reject(v -> ObjectUtil.isNull(v.getMetrics()))
.reject(v -> ObjectUtil.isNull(v.getMetrics().getReadBytes()))
.sumOfLong(v -> v.getMetrics().getReadBytes()),
vertex.getChildren()
children
.reject(ObjectUtil::isNull)
.reject(v -> ObjectUtil.isNull(v.getMetrics()))
.reject(v -> ObjectUtil.isNull(v.getMetrics().getReadRecords()))
.sumOfLong(v -> v.getMetrics().getReadRecords()),
vertex.getChildren()
children
.reject(ObjectUtil::isNull)
.reject(v -> ObjectUtil.isNull(v.getMetrics()))
.reject(v -> ObjectUtil.isNull(v.getMetrics().getWriteBytes()))
.sumOfLong(v -> v.getMetrics().getWriteBytes()),
vertex.getChildren()
children
.reject(ObjectUtil::isNull)
.reject(v -> ObjectUtil.isNull(v.getMetrics()))
.reject(v -> ObjectUtil.isNull(v.getMetrics().getWriteRecords()))
@@ -98,8 +102,8 @@ public final class FlinkVertexVO {
return vertex.getTimestamps();
}
public ImmutableList<FlinkVertex> getChildren() {
return vertex.getChildren();
public ImmutableList<FlinkVertexChildVO> getChildren() {
return children;
}
public Metrics getMetrics() {
@@ -114,6 +118,10 @@ public final class FlinkVertexVO {
return checkpointMetrics;
}
public String getPage() {
return page;
}
public static final class Metrics {
private final Long readBytes;
private final Long readRecords;
@@ -167,4 +175,71 @@ public final class FlinkVertexVO {
return failed;
}
}
public static final class FlinkVertexChildVO {
@JsonIgnore
private final FlinkVertex vertex;
private final String page;
public FlinkVertexChildVO(FlinkVertex vertex, String page) {
this.vertex = vertex;
this.page = page;
}
public String getJid() {
return vertex.getJid();
}
public String getName() {
return vertex.getName();
}
public String getState() {
return vertex.getState();
}
public Long getStartTime() {
return vertex.getStartTime();
}
public Long getEndTime() {
return vertex.getEndTime();
}
public Long getDuration() {
return vertex.getDuration();
}
public Long getLastModification() {
return vertex.getLastModification();
}
public FlinkVertex.Tasks getTasks() {
return vertex.getTasks();
}
public Integer getMaxParallelism() {
return vertex.getMaxParallelism();
}
public Long getNow() {
return vertex.getNow();
}
public FlinkVertex.Timestamps getTimestamps() {
return vertex.getTimestamps();
}
public ImmutableList<FlinkVertex> getChildren() {
return vertex.getChildren();
}
public FlinkVertex.Metrics getMetrics() {
return vertex.getMetrics();
}
public String getPage() {
return page;
}
}
}

View File

@@ -173,6 +173,12 @@ function simpleYarnDialog(cluster, title, filterField) {
return {
title: title,
actions: [],
data: {
base: '${base}',
name: `\${${filterField}}`,
flinkJob: '${flinkJob}',
tableMeta: '${tableMeta}',
},
size: 'xl',
body: [
{
@@ -182,7 +188,7 @@ function simpleYarnDialog(cluster, title, filterField) {
url: '${base}/yarn/job_current',
data: {
clusters: `${cluster}`,
name: `\${${filterField}}`,
name: '${name}',
}
},
silentPolling: false,
@@ -201,6 +207,7 @@ function simpleYarnDialog(cluster, title, filterField) {
url: '${current.trackingUrl}'
}
},
silentPolling: false,
body: [
{
@@ -225,7 +232,10 @@ function simpleYarnDialog(cluster, title, filterField) {
method: 'get',
url: '${base}/flink/jobs',
data: {
url: '${current.trackingUrl}'
url: '${current.trackingUrl}',
schema: '${tableMeta.schema}',
table: "${tableMeta.table}",
mode: "${flinkJob.runMode}",
}
},
silentPolling: false,
@@ -263,6 +273,26 @@ function simpleYarnDialog(cluster, title, filterField) {
align: 'center',
fixed: 'right',
},
{
label: '操作',
width: 60,
align: 'center',
fixed: 'right',
type: 'wrapper',
size: 'none',
body: [
{
disabled: true,
type: 'button',
label: '详情',
level: 'link',
size: 'xs',
actionType: 'url',
blank: true,
url: '${page}',
}
],
},
]
}
]
@@ -291,7 +321,7 @@ function simpleYarnDialog(cluster, title, filterField) {
filter_state: '${state|default:undefined}',
filter_final_status: '${finalStatus|default:undefined}',
search_id: '${id|default:undefined}',
search_name: `\${${filterField}}`,
search_name: '${name}',
precise: true,
}
},

View File

@@ -53,9 +53,9 @@
tabs: [
tableTab(),
yarnTab('b5-sync', '同步 b5', undefined, 'Sync'),
yarnTab('b1,b4,b5', '压缩 b1 b4 b5', 'datalake,tyly'),
yarnTab('b1', '压缩 b1', 'datalake,tyly', 'Compaction'),
yarnTab('b4', '压缩 b4'),
yarnTab('b1,b5', '压缩 b1 b5', 'datalake,tyly', 'Compaction'),
// yarnTab('b4', '压缩 b4'),
yarnTab('b5', '压缩 b5'),
cloudTab(),
]

File diff suppressed because one or more lines are too long