From cae9fd5dc38501ccf89c823ea25c13518ceb1c97 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Thu, 21 Dec 2023 00:30:26 +0800 Subject: [PATCH] =?UTF-8?q?feat(zookeeper-query):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2run=20meta=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../forest/service/ZookeeperService.java | 13 +++++++ .../zookeeper/ZookeeperController.java | 35 ++++++++++++++++++- 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/ZookeeperService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/ZookeeperService.java index 2ba107f..6fad5ce 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/ZookeeperService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/ZookeeperService.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.forest.service; import com.dtflys.forest.annotation.BaseRequest; import com.dtflys.forest.annotation.Get; import com.dtflys.forest.annotation.Query; +import com.eshore.odcp.hudi.connector.entity.RunMeta; import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode; import org.eclipse.collections.api.list.ImmutableList; @@ -25,4 +26,16 @@ public interface ZookeeperService { @Get("/get_children") ImmutableList getChildren(@Query("path") String path); + + @Get("/get_sync_meta_by_flink_job_id") + RunMeta getSyncRunMeta(@Query("flink_job_id") Long flinkJobId); + + @Get("/get_sync_meta_by_flink_job_id_and_alias") + RunMeta getSyncRunMeta(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/get_compaction_meta_by_flink_job_id_and_alias") + RunMeta getCompactionRunMeta(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/get_run_meta") + RunMeta getRunMeta(); } diff --git a/service-zookeeper-query/src/main/java/com/lanyuanxiaoyao/service/zookeeper/ZookeeperController.java b/service-zookeeper-query/src/main/java/com/lanyuanxiaoyao/service/zookeeper/ZookeeperController.java index 473b3df..5b7040b 100644 --- a/service-zookeeper-query/src/main/java/com/lanyuanxiaoyao/service/zookeeper/ZookeeperController.java +++ b/service-zookeeper-query/src/main/java/com/lanyuanxiaoyao/service/zookeeper/ZookeeperController.java @@ -1,7 +1,11 @@ package com.lanyuanxiaoyao.service.zookeeper; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.Constants; +import com.eshore.odcp.hudi.connector.entity.RunMeta; +import com.eshore.odcp.hudi.connector.utils.NameHelper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode; import java.util.concurrent.TimeUnit; import javax.annotation.PreDestroy; @@ -16,6 +20,7 @@ import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.MutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -34,8 +39,9 @@ public class ZookeeperController { private static final Logger logger = LoggerFactory.getLogger(ZookeeperController.class); private final CuratorFramework client; + private final ObjectMapper mapper; - public ZookeeperController(ZookeeperConfiguration zookeeperConfiguration) { + public ZookeeperController(ZookeeperConfiguration zookeeperConfiguration, Jackson2ObjectMapperBuilder builder) { this.client = CuratorFrameworkFactory.builder() .connectString(zookeeperConfiguration.getConnectUrl()) .retryPolicy(new ExponentialBackoffRetry((int) Constants.SECOND, 3)) @@ -43,6 +49,8 @@ public class ZookeeperController { .connectionTimeoutMs((int) (30 * Constants.SECOND)) .build(); client.start(); + + this.mapper = builder.build(); } @PreDestroy @@ -95,4 +103,29 @@ public class ZookeeperController { } } } + + @GetMapping("get_sync_meta_by_flink_job_id") + public RunMeta getSyncRunMeta(@RequestParam("flink_job_id") Long flinkJobId) throws Exception { + return getRunMeta(NameHelper.syncRunningLockPath(flinkJobId)); + } + + @GetMapping("get_sync_meta_by_flink_job_id_and_alias") + public RunMeta getSyncRunMeta(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) throws Exception { + return getRunMeta(NameHelper.syncRunningLockPath(flinkJobId, alias)); + } + + @GetMapping("get_compaction_meta_by_flink_job_id_and_alias") + public RunMeta getCompactionRunMeta(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) throws Exception { + return getRunMeta(NameHelper.compactionRunningLockPath(flinkJobId, alias)); + } + + @GetMapping("get_run_meta") + public RunMeta getRunMeta(@RequestParam("path") String path) throws Exception { + String data = getData(path); + if (StrUtil.isNotBlank(data)) { + return mapper.readValue(data, RunMeta.class); + } else { + throw new RuntimeException(StrUtil.format("{} is empty", path)); + } + } }