From c8cb1a1c40c1f823abdcaba720fb979dc555cda7 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Fri, 5 May 2023 17:01:30 +0800 Subject: [PATCH] =?UTF-8?q?refactor(flink-query):=20=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E5=92=8C=E9=83=A8=E7=BD=B2=20Flink=20=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/build-flink-query.sh | 5 ++++ .../entity/flink/FlinkKeyValue.java | 28 +++++++++++++++++++ .../flink/controller/FlinkController.java | 17 ++--------- .../service/flink/service/FlinkService.java | 4 +-- .../flink/service/impl/FlinkServiceImpl.java | 16 +++++++---- .../src/test/resources/flink.http | 4 +++ 6 files changed, 53 insertions(+), 21 deletions(-) create mode 100755 bin/build-flink-query.sh create mode 100644 service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkKeyValue.java diff --git a/bin/build-flink-query.sh b/bin/build-flink-query.sh new file mode 100755 index 0000000..92d6a70 --- /dev/null +++ b/bin/build-flink-query.sh @@ -0,0 +1,5 @@ +#!/bin/bash +mvn -pl service-configuration,service-forest clean deploy -D skipTests -P local -s ~/.m2/settings-development.xml +mvn -pl service-flink-query clean package spring-boot:repackage -D skipTests -s ~/.m2/settings-development.xml +sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) scp /Users/lanyuanxiaoyao/Project/IdeaProjects/hudi-service/service-flink-query/target/service-flink-query-1.0.0-SNAPSHOT.jar iap@132.122.1.162:/apps/iap/tmp/lanyuanxiaoyao +sshpass -p $(/Users/lanyuanxiaoyao/Project/Work/Host/keepassxc-password.sh SSH/iap/132.122.1.162) ssh -o 'StrictHostKeyChecking no' iap@132.122.1.162 'curl ftp://yyy:QeY\!68\)4nH1@132.121.122.15:2222 -T /apps/iap/tmp/lanyuanxiaoyao/service-flink-query-1.0.0-SNAPSHOT.jar' \ No newline at end of file diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkKeyValue.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkKeyValue.java new file mode 100644 index 0000000..e4f2e31 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkKeyValue.java @@ -0,0 +1,28 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +/** + * Key Value 映射 + * + * @author lanyuanxiaoyao + * @date 2023-05-05 + */ +public class FlinkKeyValue { + private String key; + private String value; + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return "FlinkKeyValue{" + + "key='" + key + '\'' + + ", value='" + value + '\'' + + '}'; + } +} diff --git a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/controller/FlinkController.java b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/controller/FlinkController.java index 0bb9e42..143feec 100644 --- a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/controller/FlinkController.java +++ b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/controller/FlinkController.java @@ -3,7 +3,7 @@ package com.lanyuanxiaoyao.service.flink.controller; import com.fasterxml.jackson.core.JsonProcessingException; import com.lanyuanxiaoyao.service.configuration.entity.flink.*; import com.lanyuanxiaoyao.service.flink.service.FlinkService; -import org.eclipse.collections.api.map.ImmutableMap; +import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; @@ -17,7 +17,7 @@ import org.springframework.web.bind.annotation.RestController; */ @RestController @RequestMapping("flink") -public class FlinkController implements FlinkService { +public class FlinkController { private static final Logger logger = LoggerFactory.getLogger(FlinkController.class); private final FlinkService flinkService; @@ -28,67 +28,56 @@ public class FlinkController implements FlinkService { } @GetMapping("/overview") - @Override public FlinkOverview overview(@RequestParam("url") String url) throws JsonProcessingException { return flinkService.overview(url); } @GetMapping("/config") - @Override public FlinkConfig config(@RequestParam("url") String url) throws JsonProcessingException { return flinkService.config(url); } @GetMapping("/job_manager_config") - @Override - public ImmutableMap jobManagerConfig(@RequestParam("url") String url) throws JsonProcessingException { + public ImmutableList jobManagerConfig(@RequestParam("url") String url) throws JsonProcessingException { return flinkService.jobManagerConfig(url); } @GetMapping("/vertex_overview") - @Override public FlinkVertexOverview vertexOverview(@RequestParam("url") String url) throws JsonProcessingException { return flinkService.vertexOverview(url); } @GetMapping("/vertex") - @Override public FlinkVertex vertex(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException { return flinkService.vertex(url, vertexId); } @GetMapping("/vertex_config") - @Override public FlinkVertexConfig vertexConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException { return flinkService.vertexConfig(url, vertexId); } @GetMapping("/checkpoint_overview") - @Override public FlinkCheckpointOverview checkpointOverview(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException { return flinkService.checkpointOverview(url, vertexId); } @GetMapping("/checkpoint") - @Override public FlinkCheckpoint checkpoint(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId, @RequestParam("checkpoint_id") String checkpointId) throws JsonProcessingException { return flinkService.checkpoint(url, vertexId, checkpointId); } @GetMapping("/checkpoint_config") - @Override public FlinkCheckpointConfig checkpointConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException { return flinkService.checkpointConfig(url, vertexId); } @GetMapping("/task_manager_overview") - @Override public FlinkTaskManagerOverview taskManagerOverview(@RequestParam("url") String url) throws JsonProcessingException { return flinkService.taskManagerOverview(url); } @GetMapping("/task_manager") - @Override public FlinkTaskManager taskManager(@RequestParam("url") String url, @RequestParam("task_manager_id") String taskManagerId) throws JsonProcessingException { return flinkService.taskManager(url, taskManagerId); } diff --git a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/FlinkService.java b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/FlinkService.java index df05952..88dfedf 100644 --- a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/FlinkService.java +++ b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/FlinkService.java @@ -2,7 +2,7 @@ package com.lanyuanxiaoyao.service.flink.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.lanyuanxiaoyao.service.configuration.entity.flink.*; -import org.eclipse.collections.api.map.ImmutableMap; +import org.eclipse.collections.api.list.ImmutableList; /** * Flink 服务 @@ -15,7 +15,7 @@ public interface FlinkService { FlinkConfig config(String url) throws JsonProcessingException; - ImmutableMap jobManagerConfig(String url) throws JsonProcessingException; + ImmutableList jobManagerConfig(String url) throws JsonProcessingException; FlinkVertexOverview vertexOverview(String url) throws JsonProcessingException; diff --git a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/impl/FlinkServiceImpl.java b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/impl/FlinkServiceImpl.java index d8b04f5..9273e1c 100644 --- a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/impl/FlinkServiceImpl.java +++ b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/impl/FlinkServiceImpl.java @@ -9,7 +9,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.configuration.entity.flink.*; import com.lanyuanxiaoyao.service.flink.service.FlinkService; -import org.eclipse.collections.api.map.ImmutableMap; +import org.eclipse.collections.api.list.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.Cacheable; @@ -32,10 +32,16 @@ public class FlinkServiceImpl implements FlinkService { } private String get(String url, String path) { - try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(url, path)) + String query = URLUtil.normalize(StrUtil.format("{}/{}", url, path)); + logger.debug("Query: {}", query); + try (HttpResponse response = HttpUtil.createGet(query) .setMaxRedirectCount(10) .execute()) { - return response.body(); + if (response.isOk()) { + return response.body(); + } else { + throw new RuntimeException(StrUtil.format("{} {}", response.getStatus(), response.body())); + } } } @@ -56,8 +62,8 @@ public class FlinkServiceImpl implements FlinkService { @Cacheable(value = "flink-jobmanager-config", sync = true) @Retryable(Throwable.class) @Override - public ImmutableMap jobManagerConfig(String url) throws JsonProcessingException { - return mapper.readValue(get(url, "/v1/jobmanager/config"), new TypeReference>() { + public ImmutableList jobManagerConfig(String url) throws JsonProcessingException { + return mapper.readValue(get(url, "/v1/jobmanager/config"), new TypeReference>() { }); } diff --git a/service-flink-query/src/test/resources/flink.http b/service-flink-query/src/test/resources/flink.http index 7f54dfe..59fba57 100644 --- a/service-flink-query/src/test/resources/flink.http +++ b/service-flink-query/src/test/resources/flink.http @@ -97,3 +97,7 @@ GET http://{{url}}/v1/taskmanagers/container_1672368973318_1814_01_000002 ### GET http://{{url}}/v1/taskmanagers/container_1672368973318_1814_01_000002/logs +### +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.122.116.142:14986/flink/task_manager? + url=http%3A%2F%2Fb5s120.hdp.dc%3A8088%2Fproxy%2Fapplication_1672368973318_1814& + vertex_id=320c6e7438afebea43fa0f0160319717&checkpoint_id=2793&task_manager_id=container_1672368973318_1814_01_000002