From 521e82104f0155b63ad0f516683ce80e5877a19d Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Fri, 19 Jan 2024 14:35:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(pulsar-query):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=8C=89=E6=8C=87=E5=AE=9Aurl=E5=92=8Ctopic=E5=88=A4=E6=96=AD?= =?UTF-8?q?=E6=98=AF=E5=90=A6=E5=8C=B9=E9=85=8D=E7=9A=84=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/forest/service/PulsarService.java | 5 +- .../pulsar/PulsarQueryApplication.java | 210 ++++++++++-------- 2 files changed, 123 insertions(+), 92 deletions(-) diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java index 013f5ef..31733ef 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java @@ -36,5 +36,8 @@ public interface PulsarService { ImmutableList topics(@Query("name") String name, @Query("namespace") String namespace); @Get("/backlog") - Long backlog(@Query("name") String name, @Query("topic") String topic, @RequestParam("subscription") String subscription); + Long backlog(@Query("name") String name, @Query("topic") String topic, @Query("subscription") String subscription); + + @Get("/exists_topic") + Boolean existsTopic(@Query("url") String url, @Query("topic") String topic); } diff --git a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java index cd597af..0b88e2c 100644 --- a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java +++ b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java @@ -25,8 +25,10 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cache.annotation.Cacheable; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.retry.annotation.EnableRetry; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -53,97 +55,6 @@ public class PulsarQueryApplication { SpringApplication.run(PulsarQueryApplication.class, args); } - private PulsarInfo getInfo(String name) { - if (PulsarInfo.PULSAR_NAME_INFO_MAPPING.containsKey(name)) { - return PulsarInfo.PULSAR_NAME_INFO_MAPPING.get(name); - } - throw new PulsarInfoNotFoundException("Pulsar info not found for " + name); - } - - private String adminUrl(PulsarInfo info) { - return StrUtil.format("http://{}/admin/v2", info.getAdmin()); - } - - @RequestMapping("names") - public ImmutableList names() { - return Lists.immutable.ofAll(PulsarInfo.DEFAULT_INFOS).collect(PulsarInfo::getName); - } - - @RequestMapping("name") - public String name(@RequestParam("pulsar_url") String pulsarUrl) { - String[] urls = pulsarUrl.replaceAll("pulsar://", "").split(","); - if (ObjectUtil.isNotEmpty(urls)) { - return PulsarInfo.PULSAR_CLIENT_NAME_MAPPING.get(urls[0]); - } - throw new PulsarInfoNotFoundException("Pulsar name not found for " + pulsarUrl); - } - - @RequestMapping("tenants") - public ImmutableList tenants(@RequestParam("name") String name) throws PulsarClientException, PulsarAdminException { - PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - MutableList result = Lists.mutable.empty(); - for (String tenant : admin.tenants().getTenants()) { - result.add(new PulsarTenant(tenant)); - } - return result.toImmutable(); - } - } - - @RequestMapping("namespaces") - public ImmutableList namespaces(@RequestParam("name") String name, @RequestParam("tenant") String tenant) throws PulsarClientException, PulsarAdminException { - PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - MutableList result = Lists.mutable.empty(); - for (String namespace : admin.namespaces().getNamespaces(tenant)) { - result.add(new PulsarNamespace(tenant, namespace)); - } - return result.toImmutable(); - } - } - - @RequestMapping("topics") - public ImmutableList topics(@RequestParam("name") String name, @RequestParam("namespace") String namespace) throws PulsarClientException, PulsarAdminException { - PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - MutableList result = Lists.mutable.empty(); - for (String topic : admin.topics().getList(namespace)) { - result.add(convert(admin, topic)); - } - return result.toImmutable(); - } - } - - @RequestMapping("topic") - public PulsarTopic topic(@RequestParam("name") String name, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException { - PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - return convert(admin, topic); - } - } - - @RequestMapping("backlog") - public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) throws PulsarClientException, PulsarAdminException { - PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - Map subscriptions = admin.topics().getStats(topic).getSubscriptions(); - if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) { - return subscriptions.get(subscription).getBacklogSize(); - } - return -1L; - } - } - private static PulsarTopic convert(PulsarAdmin admin, String topic) throws PulsarAdminException { TopicStats stats = admin.topics().getStats(topic); MessageId lastMessageId = admin.topics().getLastMessageId(topic); @@ -264,4 +175,121 @@ public class PulsarQueryApplication { subscriptions ); } + + private PulsarInfo getInfo(String name) { + if (PulsarInfo.PULSAR_NAME_INFO_MAPPING.containsKey(name)) { + return PulsarInfo.PULSAR_NAME_INFO_MAPPING.get(name); + } + throw new PulsarInfoNotFoundException("Pulsar info not found for " + name); + } + + private String adminUrl(PulsarInfo info) { + return StrUtil.format("http://{}/admin/v2", info.getAdmin()); + } + + @Cacheable(value = "name", sync = true) + @GetMapping("names") + public ImmutableList names() { + return Lists.immutable.ofAll(PulsarInfo.DEFAULT_INFOS).collect(PulsarInfo::getName); + } + + @Cacheable(value = "name", sync = true) + @GetMapping("name") + public String name(@RequestParam("pulsar_url") String pulsarUrl) { + String[] urls = pulsarUrl.replaceAll("pulsar://", "").split(","); + if (ObjectUtil.isNotEmpty(urls)) { + return PulsarInfo.PULSAR_CLIENT_NAME_MAPPING.get(urls[0]); + } + throw new PulsarInfoNotFoundException("Pulsar name not found for " + pulsarUrl); + } + + @Cacheable(value = "tenants", sync = true) + @GetMapping("tenants") + public ImmutableList tenants(@RequestParam("name") String name) throws PulsarClientException, PulsarAdminException { + PulsarInfo info = getInfo(name); + try (PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl(info)) + .build()) { + MutableList result = Lists.mutable.empty(); + for (String tenant : admin.tenants().getTenants()) { + result.add(new PulsarTenant(tenant)); + } + return result.toImmutable(); + } + } + + @Cacheable(value = "namespaces", sync = true) + @GetMapping("namespaces") + public ImmutableList namespaces(@RequestParam("name") String name, @RequestParam("tenant") String tenant) throws PulsarClientException, PulsarAdminException { + PulsarInfo info = getInfo(name); + try (PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl(info)) + .build()) { + MutableList result = Lists.mutable.empty(); + for (String namespace : admin.namespaces().getNamespaces(tenant)) { + result.add(new PulsarNamespace(tenant, namespace)); + } + return result.toImmutable(); + } + } + + @GetMapping("topics") + public ImmutableList topics(@RequestParam("name") String name, @RequestParam("namespace") String namespace) throws PulsarClientException, PulsarAdminException { + PulsarInfo info = getInfo(name); + try (PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl(info)) + .build()) { + MutableList result = Lists.mutable.empty(); + for (String topic : admin.topics().getList(namespace)) { + result.add(convert(admin, topic)); + } + return result.toImmutable(); + } + } + + @Cacheable(value = "topic", sync = true) + @GetMapping("topic") + public PulsarTopic topic(@RequestParam("name") String name, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException { + PulsarInfo info = getInfo(name); + try (PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl(info)) + .build()) { + return convert(admin, topic); + } + } + + @Cacheable(value = "backlog", sync = true) + @GetMapping("backlog") + public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) throws PulsarClientException, PulsarAdminException { + PulsarInfo info = getInfo(name); + try (PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl(info)) + .build()) { + Map subscriptions = admin.topics().getStats(topic).getSubscriptions(); + if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) { + return subscriptions.get(subscription).getBacklogSize(); + } + return -1L; + } + } + + @Cacheable(value = "exists-topic", sync = true) + @GetMapping("exists_topic") + public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException { + try (PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(url) + .build()) { + List tenants = admin.tenants().getTenants(); + for (String tenant : tenants) { + List namespaces = admin.namespaces().getNamespaces(tenant); + for (String namespace : namespaces) { + List topics = admin.topics().getList(namespace); + if (topics.contains(topic)) { + return true; + } + } + } + } + return false; + } }