refactor(flink-query): 调试和部署 Flink 查询模块
This commit is contained in:
5
bin/build-flink-query.sh
Executable file
5
bin/build-flink-query.sh
Executable file
@@ -0,0 +1,5 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
mvn -pl service-configuration,service-forest clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml
|
||||||
|
mvn -pl service-flink-query clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml
|
||||||
|
sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) scp /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-flink-query/target/service-flink-query-1.0.0-SNAPSHOT.jar iap@132.122.1.162:/apps/iap/tmp/lanyuanxiaoyao
|
||||||
|
sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) ssh -o 'StrictHostKeyChecking no' iap@132.122.1.162 'curl ftp://yyy:QeY\!68\)4nH1@132.121.122.15:2222 -T /apps/iap/tmp/lanyuanxiaoyao/service-flink-query-1.0.0-SNAPSHOT.jar'
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
package com.lanyuanxiaoyao.service.configuration.entity.flink;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Key Value 映射
|
||||||
|
*
|
||||||
|
* @author lanyuanxiaoyao
|
||||||
|
* @date 2023-05-05
|
||||||
|
*/
|
||||||
|
public class FlinkKeyValue {
|
||||||
|
private String key;
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
public String getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "FlinkKeyValue{" +
|
||||||
|
"key='" + key + '\'' +
|
||||||
|
", value='" + value + '\'' +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,7 +3,7 @@ package com.lanyuanxiaoyao.service.flink.controller;
|
|||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
|
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
|
||||||
import com.lanyuanxiaoyao.service.flink.service.FlinkService;
|
import com.lanyuanxiaoyao.service.flink.service.FlinkService;
|
||||||
import org.eclipse.collections.api.map.ImmutableMap;
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
@@ -17,7 +17,7 @@ import org.springframework.web.bind.annotation.RestController;
|
|||||||
*/
|
*/
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("flink")
|
@RequestMapping("flink")
|
||||||
public class FlinkController implements FlinkService {
|
public class FlinkController {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(FlinkController.class);
|
private static final Logger logger = LoggerFactory.getLogger(FlinkController.class);
|
||||||
|
|
||||||
private final FlinkService flinkService;
|
private final FlinkService flinkService;
|
||||||
@@ -28,67 +28,56 @@ public class FlinkController implements FlinkService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/overview")
|
@GetMapping("/overview")
|
||||||
@Override
|
|
||||||
public FlinkOverview overview(@RequestParam("url") String url) throws JsonProcessingException {
|
public FlinkOverview overview(@RequestParam("url") String url) throws JsonProcessingException {
|
||||||
return flinkService.overview(url);
|
return flinkService.overview(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/config")
|
@GetMapping("/config")
|
||||||
@Override
|
|
||||||
public FlinkConfig config(@RequestParam("url") String url) throws JsonProcessingException {
|
public FlinkConfig config(@RequestParam("url") String url) throws JsonProcessingException {
|
||||||
return flinkService.config(url);
|
return flinkService.config(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/job_manager_config")
|
@GetMapping("/job_manager_config")
|
||||||
@Override
|
public ImmutableList<FlinkKeyValue> jobManagerConfig(@RequestParam("url") String url) throws JsonProcessingException {
|
||||||
public ImmutableMap<String, String> jobManagerConfig(@RequestParam("url") String url) throws JsonProcessingException {
|
|
||||||
return flinkService.jobManagerConfig(url);
|
return flinkService.jobManagerConfig(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/vertex_overview")
|
@GetMapping("/vertex_overview")
|
||||||
@Override
|
|
||||||
public FlinkVertexOverview vertexOverview(@RequestParam("url") String url) throws JsonProcessingException {
|
public FlinkVertexOverview vertexOverview(@RequestParam("url") String url) throws JsonProcessingException {
|
||||||
return flinkService.vertexOverview(url);
|
return flinkService.vertexOverview(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/vertex")
|
@GetMapping("/vertex")
|
||||||
@Override
|
|
||||||
public FlinkVertex vertex(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
|
public FlinkVertex vertex(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
|
||||||
return flinkService.vertex(url, vertexId);
|
return flinkService.vertex(url, vertexId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/vertex_config")
|
@GetMapping("/vertex_config")
|
||||||
@Override
|
|
||||||
public FlinkVertexConfig vertexConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
|
public FlinkVertexConfig vertexConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
|
||||||
return flinkService.vertexConfig(url, vertexId);
|
return flinkService.vertexConfig(url, vertexId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/checkpoint_overview")
|
@GetMapping("/checkpoint_overview")
|
||||||
@Override
|
|
||||||
public FlinkCheckpointOverview checkpointOverview(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
|
public FlinkCheckpointOverview checkpointOverview(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
|
||||||
return flinkService.checkpointOverview(url, vertexId);
|
return flinkService.checkpointOverview(url, vertexId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/checkpoint")
|
@GetMapping("/checkpoint")
|
||||||
@Override
|
|
||||||
public FlinkCheckpoint checkpoint(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId, @RequestParam("checkpoint_id") String checkpointId) throws JsonProcessingException {
|
public FlinkCheckpoint checkpoint(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId, @RequestParam("checkpoint_id") String checkpointId) throws JsonProcessingException {
|
||||||
return flinkService.checkpoint(url, vertexId, checkpointId);
|
return flinkService.checkpoint(url, vertexId, checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/checkpoint_config")
|
@GetMapping("/checkpoint_config")
|
||||||
@Override
|
|
||||||
public FlinkCheckpointConfig checkpointConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
|
public FlinkCheckpointConfig checkpointConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException {
|
||||||
return flinkService.checkpointConfig(url, vertexId);
|
return flinkService.checkpointConfig(url, vertexId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/task_manager_overview")
|
@GetMapping("/task_manager_overview")
|
||||||
@Override
|
|
||||||
public FlinkTaskManagerOverview taskManagerOverview(@RequestParam("url") String url) throws JsonProcessingException {
|
public FlinkTaskManagerOverview taskManagerOverview(@RequestParam("url") String url) throws JsonProcessingException {
|
||||||
return flinkService.taskManagerOverview(url);
|
return flinkService.taskManagerOverview(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
@GetMapping("/task_manager")
|
@GetMapping("/task_manager")
|
||||||
@Override
|
|
||||||
public FlinkTaskManager taskManager(@RequestParam("url") String url, @RequestParam("task_manager_id") String taskManagerId) throws JsonProcessingException {
|
public FlinkTaskManager taskManager(@RequestParam("url") String url, @RequestParam("task_manager_id") String taskManagerId) throws JsonProcessingException {
|
||||||
return flinkService.taskManager(url, taskManagerId);
|
return flinkService.taskManager(url, taskManagerId);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package com.lanyuanxiaoyao.service.flink.service;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
|
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
|
||||||
import org.eclipse.collections.api.map.ImmutableMap;
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flink 服务
|
* Flink 服务
|
||||||
@@ -15,7 +15,7 @@ public interface FlinkService {
|
|||||||
|
|
||||||
FlinkConfig config(String url) throws JsonProcessingException;
|
FlinkConfig config(String url) throws JsonProcessingException;
|
||||||
|
|
||||||
ImmutableMap<String, String> jobManagerConfig(String url) throws JsonProcessingException;
|
ImmutableList<FlinkKeyValue> jobManagerConfig(String url) throws JsonProcessingException;
|
||||||
|
|
||||||
FlinkVertexOverview vertexOverview(String url) throws JsonProcessingException;
|
FlinkVertexOverview vertexOverview(String url) throws JsonProcessingException;
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
|
import com.lanyuanxiaoyao.service.configuration.entity.flink.*;
|
||||||
import com.lanyuanxiaoyao.service.flink.service.FlinkService;
|
import com.lanyuanxiaoyao.service.flink.service.FlinkService;
|
||||||
import org.eclipse.collections.api.map.ImmutableMap;
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.cache.annotation.Cacheable;
|
import org.springframework.cache.annotation.Cacheable;
|
||||||
@@ -32,10 +32,16 @@ public class FlinkServiceImpl implements FlinkService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private String get(String url, String path) {
|
private String get(String url, String path) {
|
||||||
try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(url, path))
|
String query = URLUtil.normalize(StrUtil.format("{}/{}", url, path));
|
||||||
|
logger.debug("Query: {}", query);
|
||||||
|
try (HttpResponse response = HttpUtil.createGet(query)
|
||||||
.setMaxRedirectCount(10)
|
.setMaxRedirectCount(10)
|
||||||
.execute()) {
|
.execute()) {
|
||||||
return response.body();
|
if (response.isOk()) {
|
||||||
|
return response.body();
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException(StrUtil.format("{} {}", response.getStatus(), response.body()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,8 +62,8 @@ public class FlinkServiceImpl implements FlinkService {
|
|||||||
@Cacheable(value = "flink-jobmanager-config", sync = true)
|
@Cacheable(value = "flink-jobmanager-config", sync = true)
|
||||||
@Retryable(Throwable.class)
|
@Retryable(Throwable.class)
|
||||||
@Override
|
@Override
|
||||||
public ImmutableMap<String, String> jobManagerConfig(String url) throws JsonProcessingException {
|
public ImmutableList<FlinkKeyValue> jobManagerConfig(String url) throws JsonProcessingException {
|
||||||
return mapper.readValue(get(url, "/v1/jobmanager/config"), new TypeReference<ImmutableMap<String, String>>() {
|
return mapper.readValue(get(url, "/v1/jobmanager/config"), new TypeReference<ImmutableList<FlinkKeyValue>>() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -97,3 +97,7 @@ GET http://{{url}}/v1/taskmanagers/container_1672368973318_1814_01_000002
|
|||||||
###
|
###
|
||||||
GET http://{{url}}/v1/taskmanagers/container_1672368973318_1814_01_000002/logs
|
GET http://{{url}}/v1/taskmanagers/container_1672368973318_1814_01_000002/logs
|
||||||
|
|
||||||
|
###
|
||||||
|
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.122.116.142:14986/flink/task_manager?
|
||||||
|
url=http%3A%2F%2Fb5s120.hdp.dc%3A8088%2Fproxy%2Fapplication_1672368973318_1814&
|
||||||
|
vertex_id=320c6e7438afebea43fa0f0160319717&checkpoint_id=2793&task_manager_id=container_1672368973318_1814_01_000002
|
||||||
|
|||||||
Reference in New Issue
Block a user