diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java index c772d7e..daf36b3 100644 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/controller/FlinkController.java @@ -1,11 +1,9 @@ package com.lanyuanxiaoyao.service.web.controller; import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse; -import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkOverview; -import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkCheckpoint; import com.lanyuanxiaoyao.service.forest.service.FlinkService; -import com.lanyuanxiaoyao.service.web.entity.FlinkOverviewVO; -import java.util.concurrent.CompletableFuture; +import com.lanyuanxiaoyao.service.web.entity.FlinkVertexVO; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -38,18 +36,27 @@ public class FlinkController extends BaseController { @GetMapping("overview") public AmisResponse overview(@RequestParam("url") String url) throws ExecutionException, InterruptedException { - CompletableFuture overviewFuture = CompletableFuture.supplyAsync(() -> flinkService.overview(url), EXECUTOR); - CompletableFuture> jobsFuture = CompletableFuture.supplyAsync(() -> flinkService.vertexOverview(url) + return responseDetail(flinkService.overview(url)); + } + + @GetMapping("jobs") + public AmisResponse jobs(@RequestParam("url") String url) { + ImmutableList vertexVOS = flinkService.vertexOverview(url) .getJobs() .asParallel(EXECUTOR, 1) .collect(vertex -> flinkService.vertex(url, vertex.getJid())) - .toList() - .toImmutable(), EXECUTOR); - CompletableFuture.allOf(overviewFuture).get(); - - return responseDetail(new FlinkOverviewVO( - overviewFuture.get(), - jobsFuture.get() - )); + .collect(vertex -> { + ImmutableList checkpoints = flinkService.checkpointOverview(url, vertex.getJid()) + .getHistory() + .asParallel(EXECUTOR, 1) + .collect(checkpoint -> flinkService.checkpoint(url, vertex.getJid(), checkpoint.getId())) + .toSortedListBy(FlinkCheckpoint::getId) + .reverseThis() + .toImmutable(); + return new FlinkVertexVO(vertex, checkpoints); + }) + .toSortedListBy(FlinkVertexVO::getName) + .toImmutable(); + return responseCrudData(vertexVOS, vertexVOS.size()); } } diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkOverviewVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkOverviewVO.java deleted file mode 100644 index 60722a0..0000000 --- a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkOverviewVO.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.lanyuanxiaoyao.service.web.entity; - -import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkOverview; -import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex; -import org.eclipse.collections.api.list.ImmutableList; - -/** - * Flink overview - * - * @author lanyuanxiaoyao - * @date 2023-05-05 - */ -public class FlinkOverviewVO { - private final FlinkOverview overview; - private final ImmutableList jobs; - - public FlinkOverviewVO(FlinkOverview overview, ImmutableList jobs) { - this.overview = overview; - this.jobs = jobs; - } - - public FlinkOverview getOverview() { - return overview; - } - - public ImmutableList getJobs() { - return jobs; - } -} diff --git a/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkVertexVO.java b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkVertexVO.java new file mode 100644 index 0000000..60a781c --- /dev/null +++ b/service-web/src/main/java/com/lanyuanxiaoyao/service/web/entity/FlinkVertexVO.java @@ -0,0 +1,170 @@ +package com.lanyuanxiaoyao.service.web.entity; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkCheckpoint; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-05 + */ +public final class FlinkVertexVO { + @JsonIgnore + private final FlinkVertex vertex; + + private final ImmutableList checkpoints; + private Metrics metrics; + private CheckpointMetrics checkpointMetrics; + + public FlinkVertexVO(FlinkVertex vertex, ImmutableList checkpoints) { + this.vertex = vertex; + + if (ObjectUtil.isNotNull(vertex.getChildren())) { + this.metrics = new Metrics( + vertex.getChildren() + .reject(ObjectUtil::isNull) + .reject(v -> ObjectUtil.isNull(v.getMetrics())) + .reject(v -> ObjectUtil.isNull(v.getMetrics().getReadBytes())) + .sumOfLong(v -> v.getMetrics().getReadBytes()), + vertex.getChildren() + .reject(ObjectUtil::isNull) + .reject(v -> ObjectUtil.isNull(v.getMetrics())) + .reject(v -> ObjectUtil.isNull(v.getMetrics().getReadRecords())) + .sumOfLong(v -> v.getMetrics().getReadRecords()), + vertex.getChildren() + .reject(ObjectUtil::isNull) + .reject(v -> ObjectUtil.isNull(v.getMetrics())) + .reject(v -> ObjectUtil.isNull(v.getMetrics().getWriteBytes())) + .sumOfLong(v -> v.getMetrics().getWriteBytes()), + vertex.getChildren() + .reject(ObjectUtil::isNull) + .reject(v -> ObjectUtil.isNull(v.getMetrics())) + .reject(v -> ObjectUtil.isNull(v.getMetrics().getWriteRecords())) + .sumOfLong(v -> v.getMetrics().getWriteRecords()) + ); + } + this.checkpoints = checkpoints; + this.checkpointMetrics = new CheckpointMetrics( + checkpoints.count(c -> StrUtil.equals("COMPLETED", c.getStatus())), + checkpoints.count(c -> StrUtil.equals("IN_PROGRESS", c.getStatus())), + checkpoints.count(c -> StrUtil.equals("FAILED", c.getStatus())) + ); + } + + public String getJid() { + return vertex.getJid(); + } + + public String getName() { + return vertex.getName(); + } + + public String getState() { + return vertex.getState(); + } + + public Long getStartTime() { + return vertex.getStartTime(); + } + + public Long getEndTime() { + return vertex.getEndTime(); + } + + public Long getDuration() { + return vertex.getDuration(); + } + + public Long getLastModification() { + return vertex.getLastModification(); + } + + public FlinkVertex.Tasks getTasks() { + return vertex.getTasks(); + } + + public Integer getMaxParallelism() { + return vertex.getMaxParallelism(); + } + + public Long getNow() { + return vertex.getNow(); + } + + public FlinkVertex.Timestamps getTimestamps() { + return vertex.getTimestamps(); + } + + public ImmutableList getChildren() { + return vertex.getChildren(); + } + + public Metrics getMetrics() { + return metrics; + } + + public ImmutableList getCheckpoints() { + return checkpoints; + } + + public CheckpointMetrics getCheckpointMetrics() { + return checkpointMetrics; + } + + public static final class Metrics { + private final Long readBytes; + private final Long readRecords; + private final Long writeBytes; + private final Long writeRecords; + + public Metrics(Long readBytes, Long readRecords, Long writeBytes, Long writeRecords) { + this.readBytes = readBytes; + this.readRecords = readRecords; + this.writeBytes = writeBytes; + this.writeRecords = writeRecords; + } + + public Long getReadBytes() { + return readBytes; + } + + public Long getReadRecords() { + return readRecords; + } + + public Long getWriteBytes() { + return writeBytes; + } + + public Long getWriteRecords() { + return writeRecords; + } + } + + public static final class CheckpointMetrics { + private final Integer complete; + private final Integer inProgress; + private final Integer failed; + + public CheckpointMetrics(Integer complete, Integer inProgress, Integer failed) { + this.complete = complete; + this.inProgress = inProgress; + this.failed = failed; + } + + public Integer getComplete() { + return complete; + } + + public Integer getInProgress() { + return inProgress; + } + + public Integer getFailed() { + return failed; + } + } +} diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index 3355211..204fe96 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -185,6 +185,7 @@ function simpleYarnDialog(cluster, title, filterField) { name: `\${${filterField}}`, } }, + silentPolling: false, body: [ { type: 'wrapper', @@ -200,31 +201,54 @@ function simpleYarnDialog(cluster, title, filterField) { url: '${current.trackingUrl}' } }, + silentPolling: false, body: [ { type: 'property', title: 'Flink 基本信息', column: 4, items: [ - {label: 'Flink 版本', content: '${detail.overview.flinkVersion}'}, - {label: 'Flink 小版本', content: '${detail.overview.flinkCommit}', span: 3}, - {label: '运行中', content: '${detail.overview.jobsRunning}'}, - {label: '已结束', content: '${detail.overview.jobsFinished}'}, - {label: '已失败', content: '${detail.overview.jobsFailed}'}, - {label: '被取消', content: '${detail.overview.jobsCanceled}'}, - {label: 'Slot (可用/总数)', content: '${detail.overview.slotsAvailable}/${detail.overview.slotsTotal}', span: 4}, + {label: 'Flink 版本', content: '${detail.flinkVersion}'}, + {label: 'Flink 小版本', content: '${detail.flinkCommit}', span: 3}, + {label: '运行中', content: '${detail.jobsRunning}'}, + {label: '已结束', content: '${detail.jobsFinished}'}, + {label: '已失败', content: '${detail.jobsFailed}'}, + {label: '被取消', content: '${detail.jobsCanceled}'}, + {label: 'Slot (可用/总数)', content: '${detail.slotsAvailable}/${detail.slotsTotal}', span: 4}, ] }, + ] + }, + { + type: 'service', + api: { + method: 'get', + url: '${base}/flink/jobs', + data: { + url: '${current.trackingUrl}' + } + }, + silentPolling: false, + body: [ { type: 'table', title: '任务详情', - source: '${detail.jobs}', + source: '${items}', + affixHeader: false, columns: [ { name: 'name', label: '名称', width: 2000, }, + { + label: 'Checkpoint', + width: 60, + align: 'center', + fixed: 'right', + type: 'tpl', + tpl: "${IF(checkpointMetrics, checkpointMetrics.complete + '/' + checkpointMetrics.inProgress +'/' + checkpointMetrics.failed, '-')}", + }, { name: 'metrics.readRecords', label: '读记录数', @@ -242,7 +266,7 @@ function simpleYarnDialog(cluster, title, filterField) { ] } ] - } + }, ] }, {