refactor(web): 优化 Flink 相关信息查询

This commit is contained in:
2023-05-05 23:05:51 +08:00
parent 1dde155fac
commit 47b3656d73
4 changed files with 224 additions and 52 deletions

View File

@@ -1,11 +1,9 @@
package com.lanyuanxiaoyao.service.web.controller; package com.lanyuanxiaoyao.service.web.controller;
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkOverview; import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkCheckpoint;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex;
import com.lanyuanxiaoyao.service.forest.service.FlinkService; import com.lanyuanxiaoyao.service.forest.service.FlinkService;
import com.lanyuanxiaoyao.service.web.entity.FlinkOverviewVO; import com.lanyuanxiaoyao.service.web.entity.FlinkVertexVO;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@@ -38,18 +36,27 @@ public class FlinkController extends BaseController {
@GetMapping("overview") @GetMapping("overview")
public AmisResponse overview(@RequestParam("url") String url) throws ExecutionException, InterruptedException { public AmisResponse overview(@RequestParam("url") String url) throws ExecutionException, InterruptedException {
CompletableFuture<FlinkOverview> overviewFuture = CompletableFuture.supplyAsync(() -> flinkService.overview(url), EXECUTOR); return responseDetail(flinkService.overview(url));
CompletableFuture<ImmutableList<FlinkVertex>> jobsFuture = CompletableFuture.supplyAsync(() -> flinkService.vertexOverview(url) }
@GetMapping("jobs")
public AmisResponse jobs(@RequestParam("url") String url) {
ImmutableList<FlinkVertexVO> vertexVOS = flinkService.vertexOverview(url)
.getJobs() .getJobs()
.asParallel(EXECUTOR, 1) .asParallel(EXECUTOR, 1)
.collect(vertex -> flinkService.vertex(url, vertex.getJid())) .collect(vertex -> flinkService.vertex(url, vertex.getJid()))
.toList() .collect(vertex -> {
.toImmutable(), EXECUTOR); ImmutableList<FlinkCheckpoint> checkpoints = flinkService.checkpointOverview(url, vertex.getJid())
CompletableFuture.allOf(overviewFuture).get(); .getHistory()
.asParallel(EXECUTOR, 1)
return responseDetail(new FlinkOverviewVO( .collect(checkpoint -> flinkService.checkpoint(url, vertex.getJid(), checkpoint.getId()))
overviewFuture.get(), .toSortedListBy(FlinkCheckpoint::getId)
jobsFuture.get() .reverseThis()
)); .toImmutable();
return new FlinkVertexVO(vertex, checkpoints);
})
.toSortedListBy(FlinkVertexVO::getName)
.toImmutable();
return responseCrudData(vertexVOS, vertexVOS.size());
} }
} }

View File

@@ -1,29 +0,0 @@
package com.lanyuanxiaoyao.service.web.entity;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkOverview;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex;
import org.eclipse.collections.api.list.ImmutableList;
/**
* Flink overview
*
* @author lanyuanxiaoyao
* @date 2023-05-05
*/
public class FlinkOverviewVO {
private final FlinkOverview overview;
private final ImmutableList<FlinkVertex> jobs;
public FlinkOverviewVO(FlinkOverview overview, ImmutableList<FlinkVertex> jobs) {
this.overview = overview;
this.jobs = jobs;
}
public FlinkOverview getOverview() {
return overview;
}
public ImmutableList<FlinkVertex> getJobs() {
return jobs;
}
}

View File

@@ -0,0 +1,170 @@
package com.lanyuanxiaoyao.service.web.entity;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkCheckpoint;
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex;
import org.eclipse.collections.api.list.ImmutableList;
/**
* @author lanyuanxiaoyao
* @date 2023-05-05
*/
public final class FlinkVertexVO {
@JsonIgnore
private final FlinkVertex vertex;
private final ImmutableList<FlinkCheckpoint> checkpoints;
private Metrics metrics;
private CheckpointMetrics checkpointMetrics;
public FlinkVertexVO(FlinkVertex vertex, ImmutableList<FlinkCheckpoint> checkpoints) {
this.vertex = vertex;
if (ObjectUtil.isNotNull(vertex.getChildren())) {
this.metrics = new Metrics(
vertex.getChildren()
.reject(ObjectUtil::isNull)
.reject(v -> ObjectUtil.isNull(v.getMetrics()))
.reject(v -> ObjectUtil.isNull(v.getMetrics().getReadBytes()))
.sumOfLong(v -> v.getMetrics().getReadBytes()),
vertex.getChildren()
.reject(ObjectUtil::isNull)
.reject(v -> ObjectUtil.isNull(v.getMetrics()))
.reject(v -> ObjectUtil.isNull(v.getMetrics().getReadRecords()))
.sumOfLong(v -> v.getMetrics().getReadRecords()),
vertex.getChildren()
.reject(ObjectUtil::isNull)
.reject(v -> ObjectUtil.isNull(v.getMetrics()))
.reject(v -> ObjectUtil.isNull(v.getMetrics().getWriteBytes()))
.sumOfLong(v -> v.getMetrics().getWriteBytes()),
vertex.getChildren()
.reject(ObjectUtil::isNull)
.reject(v -> ObjectUtil.isNull(v.getMetrics()))
.reject(v -> ObjectUtil.isNull(v.getMetrics().getWriteRecords()))
.sumOfLong(v -> v.getMetrics().getWriteRecords())
);
}
this.checkpoints = checkpoints;
this.checkpointMetrics = new CheckpointMetrics(
checkpoints.count(c -> StrUtil.equals("COMPLETED", c.getStatus())),
checkpoints.count(c -> StrUtil.equals("IN_PROGRESS", c.getStatus())),
checkpoints.count(c -> StrUtil.equals("FAILED", c.getStatus()))
);
}
public String getJid() {
return vertex.getJid();
}
public String getName() {
return vertex.getName();
}
public String getState() {
return vertex.getState();
}
public Long getStartTime() {
return vertex.getStartTime();
}
public Long getEndTime() {
return vertex.getEndTime();
}
public Long getDuration() {
return vertex.getDuration();
}
public Long getLastModification() {
return vertex.getLastModification();
}
public FlinkVertex.Tasks getTasks() {
return vertex.getTasks();
}
public Integer getMaxParallelism() {
return vertex.getMaxParallelism();
}
public Long getNow() {
return vertex.getNow();
}
public FlinkVertex.Timestamps getTimestamps() {
return vertex.getTimestamps();
}
public ImmutableList<FlinkVertex> getChildren() {
return vertex.getChildren();
}
public Metrics getMetrics() {
return metrics;
}
public ImmutableList<FlinkCheckpoint> getCheckpoints() {
return checkpoints;
}
public CheckpointMetrics getCheckpointMetrics() {
return checkpointMetrics;
}
public static final class Metrics {
private final Long readBytes;
private final Long readRecords;
private final Long writeBytes;
private final Long writeRecords;
public Metrics(Long readBytes, Long readRecords, Long writeBytes, Long writeRecords) {
this.readBytes = readBytes;
this.readRecords = readRecords;
this.writeBytes = writeBytes;
this.writeRecords = writeRecords;
}
public Long getReadBytes() {
return readBytes;
}
public Long getReadRecords() {
return readRecords;
}
public Long getWriteBytes() {
return writeBytes;
}
public Long getWriteRecords() {
return writeRecords;
}
}
public static final class CheckpointMetrics {
private final Integer complete;
private final Integer inProgress;
private final Integer failed;
public CheckpointMetrics(Integer complete, Integer inProgress, Integer failed) {
this.complete = complete;
this.inProgress = inProgress;
this.failed = failed;
}
public Integer getComplete() {
return complete;
}
public Integer getInProgress() {
return inProgress;
}
public Integer getFailed() {
return failed;
}
}
}

View File

@@ -185,6 +185,7 @@ function simpleYarnDialog(cluster, title, filterField) {
name: `\${${filterField}}`, name: `\${${filterField}}`,
} }
}, },
silentPolling: false,
body: [ body: [
{ {
type: 'wrapper', type: 'wrapper',
@@ -200,31 +201,54 @@ function simpleYarnDialog(cluster, title, filterField) {
url: '${current.trackingUrl}' url: '${current.trackingUrl}'
} }
}, },
silentPolling: false,
body: [ body: [
{ {
type: 'property', type: 'property',
title: 'Flink 基本信息', title: 'Flink 基本信息',
column: 4, column: 4,
items: [ items: [
{label: 'Flink 版本', content: '${detail.overview.flinkVersion}'}, {label: 'Flink 版本', content: '${detail.flinkVersion}'},
{label: 'Flink 小版本', content: '${detail.overview.flinkCommit}', span: 3}, {label: 'Flink 小版本', content: '${detail.flinkCommit}', span: 3},
{label: '运行中', content: '${detail.overview.jobsRunning}'}, {label: '运行中', content: '${detail.jobsRunning}'},
{label: '已结束', content: '${detail.overview.jobsFinished}'}, {label: '已结束', content: '${detail.jobsFinished}'},
{label: '已失败', content: '${detail.overview.jobsFailed}'}, {label: '已失败', content: '${detail.jobsFailed}'},
{label: '被取消', content: '${detail.overview.jobsCanceled}'}, {label: '被取消', content: '${detail.jobsCanceled}'},
{label: 'Slot (可用/总数)', content: '${detail.overview.slotsAvailable}/${detail.overview.slotsTotal}', span: 4}, {label: 'Slot (可用/总数)', content: '${detail.slotsAvailable}/${detail.slotsTotal}', span: 4},
] ]
}, },
]
},
{
type: 'service',
api: {
method: 'get',
url: '${base}/flink/jobs',
data: {
url: '${current.trackingUrl}'
}
},
silentPolling: false,
body: [
{ {
type: 'table', type: 'table',
title: '任务详情', title: '任务详情',
source: '${detail.jobs}', source: '${items}',
affixHeader: false,
columns: [ columns: [
{ {
name: 'name', name: 'name',
label: '名称', label: '名称',
width: 2000, width: 2000,
}, },
{
label: 'Checkpoint',
width: 60,
align: 'center',
fixed: 'right',
type: 'tpl',
tpl: "${IF(checkpointMetrics, checkpointMetrics.complete + '/' + checkpointMetrics.inProgress +'/' + checkpointMetrics.failed, '-')}",
},
{ {
name: 'metrics.readRecords', name: 'metrics.readRecords',
label: '读记录数', label: '读记录数',
@@ -242,7 +266,7 @@ function simpleYarnDialog(cluster, title, filterField) {
] ]
} }
] ]
} },
] ]
}, },
{ {