feature(flink-query): 新增 Flink 相关查询接口

This commit is contained in:
2023-05-05 00:13:13 +08:00
parent 88c1714238
commit 22dd6df3f3
19 changed files with 1753 additions and 6 deletions

View File

@@ -0,0 +1,95 @@
package com.lanyuanxiaoyao.service.flink.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
import com.lanyuanxiaoyao.service.flink.service.FlinkService;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author lanyuanxiaoyao
* @date 2023-05-04
*/
@RestController
@RequestMapping("flink")
public class FlinkController implements FlinkService {
private static final Logger logger = LoggerFactory.getLogger(FlinkController.class);
private final FlinkService flinkService;
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
public FlinkController(FlinkService flinkService) {
this.flinkService = flinkService;
}
@GetMapping("/overview")
@Override
public FlinkOverview overview(@RequestParam("url") String url) throws JsonProcessingException {
return flinkService.overview(url);
}
@GetMapping("/config")
@Override
public FlinkConfig config(@RequestParam("url") String url) throws JsonProcessingException {
return flinkService.config(url);
}
@GetMapping("/job_manager_config")
@Override
public ImmutableMap<String, String> jobManagerConfig(@RequestParam("url") String url) throws JsonProcessingException {
return flinkService.jobManagerConfig(url);
}
@GetMapping("/vertex_overview")
@Override
public FlinkVertexOverview vertexOverview(@RequestParam("url") String url) throws JsonProcessingException {
return flinkService.vertexOverview(url);
}
@GetMapping("/vertex")
@Override
public FlinkVertex vertex(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
return flinkService.vertex(url, vertexId);
}
@GetMapping("/vertex_config")
@Override
public FlinkVertexConfig vertexConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
return flinkService.vertexConfig(url, vertexId);
}
@GetMapping("/checkpoint_overview")
@Override
public FlinkCheckpointOverview checkpointOverview(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
return flinkService.checkpointOverview(url, vertexId);
}
@GetMapping("/checkpoint")
@Override
public FlinkCheckpoint checkpoint(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId, @RequestParam("checkpoint_id") String checkpointId) throws JsonProcessingException {
return flinkService.checkpoint(url, vertexId, checkpointId);
}
@GetMapping("/checkpoint_config")
@Override
public FlinkCheckpointConfig checkpointConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
return flinkService.checkpointConfig(url, vertexId);
}
@GetMapping("/task_manager_overview")
@Override
public FlinkTaskManagerOverview taskManagerOverview(@RequestParam("url") String url) throws JsonProcessingException {
return flinkService.taskManagerOverview(url);
}
@GetMapping("/task_manager")
@Override
public FlinkTaskManager taskManager(@RequestParam("url") String url, @RequestParam("task_manager_id") String taskManagerId) throws JsonProcessingException {
return flinkService.taskManager(url, taskManagerId);
}
}

View File

@@ -0,0 +1,35 @@
package com.lanyuanxiaoyao.service.flink.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
import org.eclipse.collections.api.map.ImmutableMap;
/**
* Flink 服务
*
* @author lanyuanxiaoyao
* @date 2023-05-04
*/
public interface FlinkService {
FlinkOverview overview(String url) throws JsonProcessingException;
FlinkConfig config(String url) throws JsonProcessingException;
ImmutableMap<String, String> jobManagerConfig(String url) throws JsonProcessingException;
FlinkVertexOverview vertexOverview(String url) throws JsonProcessingException;
FlinkVertex vertex(String url, String vertexId) throws JsonProcessingException;
FlinkVertexConfig vertexConfig(String url, String vertexId) throws JsonProcessingException;
FlinkCheckpointOverview checkpointOverview(String url, String vertexId) throws JsonProcessingException;
FlinkCheckpoint checkpoint(String url, String vertexId, String checkpointId) throws JsonProcessingException;
FlinkCheckpointConfig checkpointConfig(String url, String vertexId) throws JsonProcessingException;
FlinkTaskManagerOverview taskManagerOverview(String url) throws JsonProcessingException;
FlinkTaskManager taskManager(String url, String taskManagerId) throws JsonProcessingException;
}

View File

@@ -0,0 +1,119 @@
package com.lanyuanxiaoyao.service.flink.service.impl;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.URLUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
import com.lanyuanxiaoyao.service.flink.service.FlinkService;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
/**
* @author lanyuanxiaoyao
* @date 2023-05-04
*/
@Service
public class FlinkServiceImpl implements FlinkService {
private static final Logger logger = LoggerFactory.getLogger(FlinkServiceImpl.class);
private final ObjectMapper mapper;
public FlinkServiceImpl(Jackson2ObjectMapperBuilder builder) {
mapper = builder.build();
}
private String get(String url, String path) {
try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(url, path))
.setMaxRedirectCount(10)
.execute()) {
return response.body();
}
}
@Cacheable(value = "flink-overview", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkOverview overview(String url) throws JsonProcessingException {
return mapper.readValue(get(url, "/v1/overview"), FlinkOverview.class);
}
@Cacheable(value = "flink-config", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkConfig config(String url) throws JsonProcessingException {
return mapper.readValue(get(url, "/v1/config"), FlinkConfig.class);
}
@Cacheable(value = "flink-jobmanager-config", sync = true)
@Retryable(Throwable.class)
@Override
public ImmutableMap<String, String> jobManagerConfig(String url) throws JsonProcessingException {
return mapper.readValue(get(url, "/v1/jobmanager/config"), new TypeReference<ImmutableMap<String, String>>() {
});
}
@Cacheable(value = "flink-vertex-overview", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkVertexOverview vertexOverview(String url) throws JsonProcessingException {
return mapper.readValue(get(url, "/v1/jobs/overview"), FlinkVertexOverview.class);
}
@Cacheable(value = "flink-vertex", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkVertex vertex(String url, String vertexId) throws JsonProcessingException {
return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}", vertexId)), FlinkVertex.class);
}
@Cacheable(value = "flink-vertex-config", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkVertexConfig vertexConfig(String url, String vertexId) throws JsonProcessingException {
return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}/config", vertexId)), FlinkVertexConfig.class);
}
@Cacheable(value = "flink-checkpoint-overview", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkCheckpointOverview checkpointOverview(String url, String vertexId) throws JsonProcessingException {
return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}/checkpoints", vertexId)), FlinkCheckpointOverview.class);
}
@Cacheable(value = "flink-checkpoint", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkCheckpoint checkpoint(String url, String vertexId, String checkpointId) throws JsonProcessingException {
return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}/checkpoints/details/{}", vertexId, checkpointId)), FlinkCheckpoint.class);
}
@Cacheable(value = "flink-checkpoint-config", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkCheckpointConfig checkpointConfig(String url, String vertexId) throws JsonProcessingException {
return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}/checkpoints/config", vertexId)), FlinkCheckpointConfig.class);
}
@Cacheable(value = "flink-taskmanager-overview", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkTaskManagerOverview taskManagerOverview(String url) throws JsonProcessingException {
return mapper.readValue(get(url, "/v1/taskmanagers"), FlinkTaskManagerOverview.class);
}
@Cacheable(value = "flink-taskmanager", sync = true)
@Retryable(Throwable.class)
@Override
public FlinkTaskManager taskManager(String url, String taskManagerId) throws JsonProcessingException {
return mapper.readValue(get(url, StrUtil.format("/v1/taskmanagers/{}", taskManagerId)), FlinkTaskManager.class);
}
}