From 617456b7d9efdf20b1160f6a4237e047f0141017 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Tue, 21 May 2024 14:35:29 +0800 Subject: [PATCH] =?UTF-8?q?perf(pulsar-query):=20=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E7=BC=93=E5=AD=98=E4=BC=98=E5=8C=96pulsar=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=97=B6=E5=80=99=E5=88=9B=E5=BB=BAadmin=E7=9A=84=E6=97=B6?= =?UTF-8?q?=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../entity/pulsar/PulsarInfo.java | 21 +++++ .../service/monitor/metric/PulsarMetrics.java | 2 +- .../pulsar/PulsarQueryApplication.java | 93 ++++++++++++------- 3 files changed, 81 insertions(+), 35 deletions(-) diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/pulsar/PulsarInfo.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/pulsar/PulsarInfo.java index 4823549..5ddf347 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/pulsar/PulsarInfo.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/pulsar/PulsarInfo.java @@ -123,6 +123,27 @@ public class PulsarInfo { '}'; } + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + PulsarInfo that = (PulsarInfo) o; + + if (!name.equals(that.name)) + return false; + return admin.equals(that.admin); + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + admin.hashCode(); + return result; + } + private static final class Pair { private final L left; private final R right; diff --git a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java index 66335a7..873776c 100644 --- a/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java +++ b/service-monitor/src/main/java/com/lanyuanxiaoyao/service/monitor/metric/PulsarMetrics.java @@ -47,7 +47,7 @@ public class PulsarMetrics extends Metrics { backlogMap = Maps.mutable.empty(); } - @Scheduled(fixedDelay = 10 * MINUTE, initialDelay = MINUTE) + @Scheduled(fixedDelay = 30 * MINUTE, initialDelay = MINUTE) @Override void update() { infoService.tableMetaList() diff --git a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java index 2f373e5..7c553b8 100644 --- a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java +++ b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java @@ -3,6 +3,8 @@ package com.lanyuanxiaoyao.service.pulsar; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.lanyuanxiaoyao.service.common.exception.PulsarInfoNotFoundException; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarInfo; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarNamespace; @@ -13,6 +15,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -49,6 +53,7 @@ import org.springframework.web.bind.annotation.RestController; * @author lanyuanxiaoyao * @date 2023-04-27 */ +@SuppressWarnings("resource") @EnableDiscoveryClient @SpringBootApplication( scanBasePackages = {"com.lanyuanxiaoyao.service"}, @@ -64,6 +69,14 @@ import org.springframework.web.bind.annotation.RestController; @RequestMapping("pulsar") public class PulsarQueryApplication { private static final Logger logger = LoggerFactory.getLogger(PulsarQueryApplication.class); + private static final LoadingCache PULSAR_ADMIN_CACHE = Caffeine.newBuilder() + .expireAfterAccess(5, TimeUnit.MINUTES) + .evictionListener((key, value, cause) -> { + if (value instanceof PulsarAdmin) { + ((PulsarAdmin) value).close(); + } + }) + .build(PulsarQueryApplication::adminBuild); public static void main(String[] args) { SpringApplication.run(PulsarQueryApplication.class, args); @@ -190,6 +203,28 @@ public class PulsarQueryApplication { ); } + private static String adminUrl(PulsarInfo info) { + return StrUtil.format("http://{}/admin/v2", info.getAdmin()); + } + + private static PulsarAdmin adminBuild(PulsarInfo info) throws PulsarClientException { + return PulsarAdmin.builder() + .serviceHttpUrl(adminUrl(info)) + .connectionTimeout(5, TimeUnit.SECONDS) + .requestTimeout(5, TimeUnit.SECONDS) + .readTimeout(5, TimeUnit.SECONDS) + .build(); + } + + private static PulsarAdmin admin(PulsarInfo info) throws PulsarClientException { + PulsarAdmin admin = PULSAR_ADMIN_CACHE.get(info); + if (ObjectUtil.isNull(admin)) { + PULSAR_ADMIN_CACHE.invalidate(info); + throw new PulsarClientException("Pulsar admin build failure"); + } + return admin; + } + private PulsarInfo getInfo(String name) { if (PulsarInfo.PULSAR_NAME_INFO_MAPPING.containsKey(name)) { return PulsarInfo.PULSAR_NAME_INFO_MAPPING.get(name); @@ -197,8 +232,9 @@ public class PulsarQueryApplication { throw new PulsarInfoNotFoundException("Pulsar info not found for " + name); } - private String adminUrl(PulsarInfo info) { - return StrUtil.format("http://{}/admin/v2", info.getAdmin()); + @PreDestroy + public void destroy() { + PULSAR_ADMIN_CACHE.invalidateAll(); } @Cacheable(value = "name", sync = true) @@ -221,64 +257,51 @@ public class PulsarQueryApplication { @GetMapping("tenants") public ImmutableList tenants(@RequestParam("name") String name) throws PulsarClientException, PulsarAdminException { PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - MutableList result = Lists.mutable.empty(); - for (String tenant : admin.tenants().getTenants()) { - result.add(new PulsarTenant(tenant)); - } - return result.toImmutable(); + PulsarAdmin admin = admin(info); + MutableList result = Lists.mutable.empty(); + for (String tenant : admin.tenants().getTenants()) { + result.add(new PulsarTenant(tenant)); } + return result.toImmutable(); } @Cacheable(value = "namespaces", sync = true) @GetMapping("namespaces") public ImmutableList namespaces(@RequestParam("name") String name, @RequestParam("tenant") String tenant) throws PulsarClientException, PulsarAdminException { PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - MutableList result = Lists.mutable.empty(); - for (String namespace : admin.namespaces().getNamespaces(tenant)) { - result.add(new PulsarNamespace(tenant, namespace)); - } - return result.toImmutable(); + PulsarAdmin admin = admin(info); + MutableList result = Lists.mutable.empty(); + for (String namespace : admin.namespaces().getNamespaces(tenant)) { + result.add(new PulsarNamespace(tenant, namespace)); } + return result.toImmutable(); } @GetMapping("topics") public ImmutableList topics(@RequestParam("name") String name, @RequestParam("namespace") String namespace) throws PulsarClientException, PulsarAdminException { PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - MutableList result = Lists.mutable.empty(); - for (String topic : admin.topics().getList(namespace)) { - result.add(convert(admin, topic)); - } - return result.toImmutable(); + PulsarAdmin admin = admin(info); + MutableList result = Lists.mutable.empty(); + for (String topic : admin.topics().getList(namespace)) { + result.add(convert(admin, topic)); } + return result.toImmutable(); } @Cacheable(value = "topic", sync = true) @GetMapping("topic") public PulsarTopic topic(@RequestParam("name") String name, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException { PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { - return convert(admin, topic); - } + PulsarAdmin admin = admin(info); + return convert(admin, topic); } @Cacheable(value = "backlog", sync = true) @GetMapping("backlog") public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) { PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) - .build()) { + try { + PulsarAdmin admin = admin(info); Map subscriptions = admin.topics().getStats(topic).getSubscriptions(); if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) { return subscriptions.get(subscription).getMsgBacklog(); @@ -294,6 +317,8 @@ public class PulsarQueryApplication { public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) { try (PulsarClient client = PulsarClient.builder() .serviceUrl(url) + .connectionTimeout(5, TimeUnit.SECONDS) + .operationTimeout(5, TimeUnit.SECONDS) .build()) { try (Consumer consumer = client.newConsumer(new StringSchema()) .topic(topic)