feat(zookeeper-query): 增加查询run meta接口

This commit is contained in:
2023-12-21 00:30:26 +08:00
parent 5fc451f98f
commit cae9fd5dc3
2 changed files with 47 additions and 1 deletions

View File

@@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.forest.service;
import com.dtflys.forest.annotation.BaseRequest;
import com.dtflys.forest.annotation.Get;
import com.dtflys.forest.annotation.Query;
import com.eshore.odcp.hudi.connector.entity.RunMeta;
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
import org.eclipse.collections.api.list.ImmutableList;
@@ -25,4 +26,16 @@ public interface ZookeeperService {
@Get("/get_children")
ImmutableList<ZookeeperNode> getChildren(@Query("path") String path);
@Get("/get_sync_meta_by_flink_job_id")
RunMeta getSyncRunMeta(@Query("flink_job_id") Long flinkJobId);
@Get("/get_sync_meta_by_flink_job_id_and_alias")
RunMeta getSyncRunMeta(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);
@Get("/get_compaction_meta_by_flink_job_id_and_alias")
RunMeta getCompactionRunMeta(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias);
@Get("/get_run_meta")
RunMeta getRunMeta();
}

View File

@@ -1,7 +1,11 @@
package com.lanyuanxiaoyao.service.zookeeper;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.eshore.odcp.hudi.connector.Constants;
import com.eshore.odcp.hudi.connector.entity.RunMeta;
import com.eshore.odcp.hudi.connector.utils.NameHelper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.configuration.entity.zookeeper.ZookeeperNode;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
@@ -16,6 +20,7 @@ import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -34,8 +39,9 @@ public class ZookeeperController {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperController.class);
private final CuratorFramework client;
private final ObjectMapper mapper;
public ZookeeperController(ZookeeperConfiguration zookeeperConfiguration) {
public ZookeeperController(ZookeeperConfiguration zookeeperConfiguration, Jackson2ObjectMapperBuilder builder) {
this.client = CuratorFrameworkFactory.builder()
.connectString(zookeeperConfiguration.getConnectUrl())
.retryPolicy(new ExponentialBackoffRetry((int) Constants.SECOND, 3))
@@ -43,6 +49,8 @@ public class ZookeeperController {
.connectionTimeoutMs((int) (30 * Constants.SECOND))
.build();
client.start();
this.mapper = builder.build();
}
@PreDestroy
@@ -95,4 +103,29 @@ public class ZookeeperController {
}
}
}
@GetMapping("get_sync_meta_by_flink_job_id")
public RunMeta getSyncRunMeta(@RequestParam("flink_job_id") Long flinkJobId) throws Exception {
return getRunMeta(NameHelper.syncRunningLockPath(flinkJobId));
}
@GetMapping("get_sync_meta_by_flink_job_id_and_alias")
public RunMeta getSyncRunMeta(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) throws Exception {
return getRunMeta(NameHelper.syncRunningLockPath(flinkJobId, alias));
}
@GetMapping("get_compaction_meta_by_flink_job_id_and_alias")
public RunMeta getCompactionRunMeta(@RequestParam("flink_job_id") Long flinkJobId, @RequestParam("alias") String alias) throws Exception {
return getRunMeta(NameHelper.compactionRunningLockPath(flinkJobId, alias));
}
@GetMapping("get_run_meta")
public RunMeta getRunMeta(@RequestParam("path") String path) throws Exception {
String data = getData(path);
if (StrUtil.isNotBlank(data)) {
return mapper.readValue(data, RunMeta.class);
} else {
throw new RuntimeException(StrUtil.format("{} is empty", path));
}
}
}