feature(web): 新增 Flink 相关查询内容
This commit is contained in:
@@ -25,7 +25,8 @@ public class FlinkVertex {
|
|||||||
private Integer maxParallelism;
|
private Integer maxParallelism;
|
||||||
private Long now;
|
private Long now;
|
||||||
private Timestamps timestamps;
|
private Timestamps timestamps;
|
||||||
private ImmutableList<FlinkVertex> vertices;
|
@JsonAlias("vertices")
|
||||||
|
private ImmutableList<FlinkVertex> children;
|
||||||
private Metrics metrics;
|
private Metrics metrics;
|
||||||
|
|
||||||
public String getJid() {
|
public String getJid() {
|
||||||
@@ -72,8 +73,8 @@ public class FlinkVertex {
|
|||||||
return timestamps;
|
return timestamps;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ImmutableList<FlinkVertex> getVertices() {
|
public ImmutableList<FlinkVertex> getChildren() {
|
||||||
return vertices;
|
return children;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Metrics getMetrics() {
|
public Metrics getMetrics() {
|
||||||
@@ -320,7 +321,7 @@ public class FlinkVertex {
|
|||||||
", maxParallelism=" + maxParallelism +
|
", maxParallelism=" + maxParallelism +
|
||||||
", now=" + now +
|
", now=" + now +
|
||||||
", timestamps=" + timestamps +
|
", timestamps=" + timestamps +
|
||||||
", vertices=" + vertices +
|
", children=" + children +
|
||||||
", metrics=" + metrics +
|
", metrics=" + metrics +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import com.dtflys.forest.annotation.BaseRequest;
|
|||||||
import com.dtflys.forest.annotation.Get;
|
import com.dtflys.forest.annotation.Get;
|
||||||
import com.dtflys.forest.annotation.Query;
|
import com.dtflys.forest.annotation.Query;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
|
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
import org.eclipse.collections.api.map.ImmutableMap;
|
import org.eclipse.collections.api.map.ImmutableMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -14,36 +15,36 @@ import org.eclipse.collections.api.map.ImmutableMap;
|
|||||||
*/
|
*/
|
||||||
@BaseRequest(baseURL = "http://service-flink-query")
|
@BaseRequest(baseURL = "http://service-flink-query")
|
||||||
public interface FlinkService {
|
public interface FlinkService {
|
||||||
@Get("/overview")
|
@Get("/flink/overview")
|
||||||
public FlinkOverview overview(@Query("url") String url);
|
FlinkOverview overview(@Query("url") String url);
|
||||||
|
|
||||||
@Get("/config")
|
@Get("/flink/config")
|
||||||
public FlinkConfig config(@Query("url") String url);
|
FlinkConfig config(@Query("url") String url);
|
||||||
|
|
||||||
@Get("/job_manager_config")
|
@Get("/flink/job_manager_config")
|
||||||
public ImmutableMap<String, String> jobManagerConfig(@Query("url") String url);
|
ImmutableList<FlinkKeyValue> jobManagerConfig(@Query("url") String url);
|
||||||
|
|
||||||
@Get("/vertex_overview")
|
@Get("/flink/vertex_overview")
|
||||||
public FlinkVertexOverview vertexOverview(@Query("url") String url);
|
FlinkVertexOverview vertexOverview(@Query("url") String url);
|
||||||
|
|
||||||
@Get("/vertex")
|
@Get("/flink/vertex")
|
||||||
public FlinkVertex vertex(@Query("url") String url, @Query("vertex_id") String vertexId);
|
FlinkVertex vertex(@Query("url") String url, @Query("vertex_id") String vertexId);
|
||||||
|
|
||||||
@Get("/vertex_config")
|
@Get("/flink/vertex_config")
|
||||||
public FlinkVertexConfig vertexConfig(@Query("url") String url, @Query("vertex_id") String vertexId);
|
FlinkVertexConfig vertexConfig(@Query("url") String url, @Query("vertex_id") String vertexId);
|
||||||
|
|
||||||
@Get("/checkpoint_overview")
|
@Get("/flink/checkpoint_overview")
|
||||||
public FlinkCheckpointOverview checkpointOverview(@Query("url") String url, @Query("vertex_id") String vertexId);
|
FlinkCheckpointOverview checkpointOverview(@Query("url") String url, @Query("vertex_id") String vertexId);
|
||||||
|
|
||||||
@Get("/checkpoint")
|
@Get("/flink/checkpoint")
|
||||||
public FlinkCheckpoint checkpoint(@Query("url") String url, @Query("vertex_id") String vertexId, @Query("checkpoint_id") String checkpointId);
|
FlinkCheckpoint checkpoint(@Query("url") String url, @Query("vertex_id") String vertexId, @Query("checkpoint_id") String checkpointId);
|
||||||
|
|
||||||
@Get("/checkpoint_config")
|
@Get("/flink/checkpoint_config")
|
||||||
public FlinkCheckpointConfig checkpointConfig(@Query("url") String url, @Query("vertex_id") String vertexId);
|
FlinkCheckpointConfig checkpointConfig(@Query("url") String url, @Query("vertex_id") String vertexId);
|
||||||
|
|
||||||
@Get("/task_manager_overview")
|
@Get("/flink/task_manager_overview")
|
||||||
public FlinkTaskManagerOverview taskManagerOverview(@Query("url") String url);
|
FlinkTaskManagerOverview taskManagerOverview(@Query("url") String url);
|
||||||
|
|
||||||
@Get("/task_manager")
|
@Get("/flink/task_manager")
|
||||||
public FlinkTaskManager taskManager(@Query("url") String url, @Query("task_manager_id") String taskManagerId);
|
FlinkTaskManager taskManager(@Query("url") String url, @Query("task_manager_id") String taskManagerId);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,55 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.web.controller;
|
||||||
|
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.AmisResponse;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkOverview;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex;
|
||||||
|
import com.lanyuanxiaoyao.service.forest.service.FlinkService;
|
||||||
|
import com.lanyuanxiaoyao.service.web.entity.FlinkOverviewVO;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestParam;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flink 接口
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2023-05-05
|
||||||
|
*/
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("flink")
|
||||||
|
public class FlinkController extends BaseController {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(FlinkController.class);
|
||||||
|
private static final ExecutorService EXECUTOR = Executors.newWorkStealingPool(20);
|
||||||
|
|
||||||
|
private final FlinkService flinkService;
|
||||||
|
|
||||||
|
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||||
|
public FlinkController(FlinkService flinkService) {
|
||||||
|
this.flinkService = flinkService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@GetMapping("overview")
|
||||||
|
public AmisResponse overview(@RequestParam("url") String url) throws ExecutionException, InterruptedException {
|
||||||
|
CompletableFuture<FlinkOverview> overviewFuture = CompletableFuture.supplyAsync(() -> flinkService.overview(url), EXECUTOR);
|
||||||
|
CompletableFuture<ImmutableList<FlinkVertex>> jobsFuture = CompletableFuture.supplyAsync(() -> flinkService.vertexOverview(url)
|
||||||
|
.getJobs()
|
||||||
|
.asParallel(EXECUTOR, 1)
|
||||||
|
.collect(vertex -> flinkService.vertex(url, vertex.getJid()))
|
||||||
|
.toList()
|
||||||
|
.toImmutable(), EXECUTOR);
|
||||||
|
CompletableFuture.allOf(overviewFuture).get();
|
||||||
|
|
||||||
|
return responseDetail(new FlinkOverviewVO(
|
||||||
|
overviewFuture.get(),
|
||||||
|
jobsFuture.get()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.web.entity;
|
||||||
|
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkOverview;
|
||||||
|
import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkVertex;
|
||||||
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flink overview
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2023-05-05
|
||||||
|
*/
|
||||||
|
public class FlinkOverviewVO {
|
||||||
|
private final FlinkOverview overview;
|
||||||
|
private final ImmutableList<FlinkVertex> jobs;
|
||||||
|
|
||||||
|
public FlinkOverviewVO(FlinkOverview overview, ImmutableList<FlinkVertex> jobs) {
|
||||||
|
this.overview = overview;
|
||||||
|
this.jobs = jobs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FlinkOverview getOverview() {
|
||||||
|
return overview;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<FlinkVertex> getJobs() {
|
||||||
|
return jobs;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -175,20 +175,75 @@ function simpleYarnDialog(cluster, title, filterField) {
|
|||||||
actions: [],
|
actions: [],
|
||||||
size: 'xl',
|
size: 'xl',
|
||||||
body: [
|
body: [
|
||||||
/*{
|
{
|
||||||
type: 'service',
|
type: 'service',
|
||||||
api: {
|
api: {
|
||||||
method: 'get',
|
method: 'get',
|
||||||
url: `\${base}/${mode}_yarn/job_current`,
|
url: '${base}/yarn/job_current',
|
||||||
data: {
|
data: {
|
||||||
name: `\${${mode}JobName}`,
|
clusters: `${cluster}`,
|
||||||
|
name: `\${${filterField}}`,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
body: [
|
body: [
|
||||||
{
|
{
|
||||||
type: 'iframe',
|
type: 'wrapper',
|
||||||
src: '${current.trackingUrl}',
|
size: 'none',
|
||||||
visibleOn: '${hasCurrent}',
|
visibleOn: '${hasCurrent}',
|
||||||
|
body: [
|
||||||
|
{
|
||||||
|
type: 'service',
|
||||||
|
api: {
|
||||||
|
method: 'get',
|
||||||
|
url: '${base}/flink/overview',
|
||||||
|
data: {
|
||||||
|
url: '${current.trackingUrl}'
|
||||||
|
}
|
||||||
|
},
|
||||||
|
body: [
|
||||||
|
{
|
||||||
|
type: 'property',
|
||||||
|
title: 'Flink 基本信息',
|
||||||
|
column: 4,
|
||||||
|
items: [
|
||||||
|
{label: 'Flink 版本', content: '${detail.overview.flinkVersion}'},
|
||||||
|
{label: 'Flink 小版本', content: '${detail.overview.flinkCommit}', span: 3},
|
||||||
|
{label: '运行中', content: '${detail.overview.jobsRunning}'},
|
||||||
|
{label: '已结束', content: '${detail.overview.jobsFinished}'},
|
||||||
|
{label: '已失败', content: '${detail.overview.jobsFailed}'},
|
||||||
|
{label: '被取消', content: '${detail.overview.jobsCanceled}'},
|
||||||
|
{label: 'Slot (可用/总数)', content: '${detail.overview.slotsAvailable}/${detail.overview.slotsTotal}', span: 4},
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
type: 'table',
|
||||||
|
title: '任务详情',
|
||||||
|
source: '${detail.jobs}',
|
||||||
|
columns: [
|
||||||
|
{
|
||||||
|
name: 'name',
|
||||||
|
label: '名称',
|
||||||
|
width: 2000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'metrics.readRecords',
|
||||||
|
label: '读记录数',
|
||||||
|
width: 60,
|
||||||
|
align: 'center',
|
||||||
|
fixed: 'right',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'metrics.writeRecords',
|
||||||
|
label: '写记录数',
|
||||||
|
width: 60,
|
||||||
|
align: 'center',
|
||||||
|
fixed: 'right',
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
type: 'tpl',
|
type: 'tpl',
|
||||||
@@ -196,7 +251,8 @@ function simpleYarnDialog(cluster, title, filterField) {
|
|||||||
visibleOn: '${!hasCurrent}',
|
visibleOn: '${!hasCurrent}',
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
},*/
|
},
|
||||||
|
{type: 'divider'},
|
||||||
{
|
{
|
||||||
type: 'crud',
|
type: 'crud',
|
||||||
api: {
|
api: {
|
||||||
|
|||||||
Reference in New Issue
Block a user