perf(pulsar-query): 使用缓存优化pulsar查询时候创建admin的时间

This commit is contained in:
v-zhangjc9
2024-05-21 14:35:29 +08:00
parent ac7763c40f
commit 617456b7d9
3 changed files with 81 additions and 35 deletions

View File

@@ -123,6 +123,27 @@ public class PulsarInfo {
'}'; '}';
} }
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
PulsarInfo that = (PulsarInfo) o;
if (!name.equals(that.name))
return false;
return admin.equals(that.admin);
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + admin.hashCode();
return result;
}
private static final class Pair<L, R> { private static final class Pair<L, R> {
private final L left; private final L left;
private final R right; private final R right;

View File

@@ -47,7 +47,7 @@ public class PulsarMetrics extends Metrics {
backlogMap = Maps.mutable.empty(); backlogMap = Maps.mutable.empty();
} }
@Scheduled(fixedDelay = 10 * MINUTE, initialDelay = MINUTE) @Scheduled(fixedDelay = 30 * MINUTE, initialDelay = MINUTE)
@Override @Override
void update() { void update() {
infoService.tableMetaList() infoService.tableMetaList()

View File

@@ -3,6 +3,8 @@ package com.lanyuanxiaoyao.service.pulsar;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.lanyuanxiaoyao.service.common.exception.PulsarInfoNotFoundException; import com.lanyuanxiaoyao.service.common.exception.PulsarInfoNotFoundException;
import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarInfo; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarInfo;
import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarNamespace; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarNamespace;
@@ -13,6 +15,8 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
@@ -49,6 +53,7 @@ import org.springframework.web.bind.annotation.RestController;
* @author lanyuanxiaoyao * @author lanyuanxiaoyao
* @date 2023-04-27 * @date 2023-04-27
*/ */
@SuppressWarnings("resource")
@EnableDiscoveryClient @EnableDiscoveryClient
@SpringBootApplication( @SpringBootApplication(
scanBasePackages = {"com.lanyuanxiaoyao.service"}, scanBasePackages = {"com.lanyuanxiaoyao.service"},
@@ -64,6 +69,14 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("pulsar") @RequestMapping("pulsar")
public class PulsarQueryApplication { public class PulsarQueryApplication {
private static final Logger logger = LoggerFactory.getLogger(PulsarQueryApplication.class); private static final Logger logger = LoggerFactory.getLogger(PulsarQueryApplication.class);
private static final LoadingCache<PulsarInfo, PulsarAdmin> PULSAR_ADMIN_CACHE = Caffeine.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.evictionListener((key, value, cause) -> {
if (value instanceof PulsarAdmin) {
((PulsarAdmin) value).close();
}
})
.build(PulsarQueryApplication::adminBuild);
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(PulsarQueryApplication.class, args); SpringApplication.run(PulsarQueryApplication.class, args);
@@ -190,6 +203,28 @@ public class PulsarQueryApplication {
); );
} }
private static String adminUrl(PulsarInfo info) {
return StrUtil.format("http://{}/admin/v2", info.getAdmin());
}
private static PulsarAdmin adminBuild(PulsarInfo info) throws PulsarClientException {
return PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.connectionTimeout(5, TimeUnit.SECONDS)
.requestTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.build();
}
private static PulsarAdmin admin(PulsarInfo info) throws PulsarClientException {
PulsarAdmin admin = PULSAR_ADMIN_CACHE.get(info);
if (ObjectUtil.isNull(admin)) {
PULSAR_ADMIN_CACHE.invalidate(info);
throw new PulsarClientException("Pulsar admin build failure");
}
return admin;
}
private PulsarInfo getInfo(String name) { private PulsarInfo getInfo(String name) {
if (PulsarInfo.PULSAR_NAME_INFO_MAPPING.containsKey(name)) { if (PulsarInfo.PULSAR_NAME_INFO_MAPPING.containsKey(name)) {
return PulsarInfo.PULSAR_NAME_INFO_MAPPING.get(name); return PulsarInfo.PULSAR_NAME_INFO_MAPPING.get(name);
@@ -197,8 +232,9 @@ public class PulsarQueryApplication {
throw new PulsarInfoNotFoundException("Pulsar info not found for " + name); throw new PulsarInfoNotFoundException("Pulsar info not found for " + name);
} }
private String adminUrl(PulsarInfo info) { @PreDestroy
return StrUtil.format("http://{}/admin/v2", info.getAdmin()); public void destroy() {
PULSAR_ADMIN_CACHE.invalidateAll();
} }
@Cacheable(value = "name", sync = true) @Cacheable(value = "name", sync = true)
@@ -221,64 +257,51 @@ public class PulsarQueryApplication {
@GetMapping("tenants") @GetMapping("tenants")
public ImmutableList<PulsarTenant> tenants(@RequestParam("name") String name) throws PulsarClientException, PulsarAdminException { public ImmutableList<PulsarTenant> tenants(@RequestParam("name") String name) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name); PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder() PulsarAdmin admin = admin(info);
.serviceHttpUrl(adminUrl(info)) MutableList<PulsarTenant> result = Lists.mutable.empty();
.build()) { for (String tenant : admin.tenants().getTenants()) {
MutableList<PulsarTenant> result = Lists.mutable.empty(); result.add(new PulsarTenant(tenant));
for (String tenant : admin.tenants().getTenants()) {
result.add(new PulsarTenant(tenant));
}
return result.toImmutable();
} }
return result.toImmutable();
} }
@Cacheable(value = "namespaces", sync = true) @Cacheable(value = "namespaces", sync = true)
@GetMapping("namespaces") @GetMapping("namespaces")
public ImmutableList<PulsarNamespace> namespaces(@RequestParam("name") String name, @RequestParam("tenant") String tenant) throws PulsarClientException, PulsarAdminException { public ImmutableList<PulsarNamespace> namespaces(@RequestParam("name") String name, @RequestParam("tenant") String tenant) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name); PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder() PulsarAdmin admin = admin(info);
.serviceHttpUrl(adminUrl(info)) MutableList<PulsarNamespace> result = Lists.mutable.empty();
.build()) { for (String namespace : admin.namespaces().getNamespaces(tenant)) {
MutableList<PulsarNamespace> result = Lists.mutable.empty(); result.add(new PulsarNamespace(tenant, namespace));
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
result.add(new PulsarNamespace(tenant, namespace));
}
return result.toImmutable();
} }
return result.toImmutable();
} }
@GetMapping("topics") @GetMapping("topics")
public ImmutableList<PulsarTopic> topics(@RequestParam("name") String name, @RequestParam("namespace") String namespace) throws PulsarClientException, PulsarAdminException { public ImmutableList<PulsarTopic> topics(@RequestParam("name") String name, @RequestParam("namespace") String namespace) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name); PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder() PulsarAdmin admin = admin(info);
.serviceHttpUrl(adminUrl(info)) MutableList<PulsarTopic> result = Lists.mutable.empty();
.build()) { for (String topic : admin.topics().getList(namespace)) {
MutableList<PulsarTopic> result = Lists.mutable.empty(); result.add(convert(admin, topic));
for (String topic : admin.topics().getList(namespace)) {
result.add(convert(admin, topic));
}
return result.toImmutable();
} }
return result.toImmutable();
} }
@Cacheable(value = "topic", sync = true) @Cacheable(value = "topic", sync = true)
@GetMapping("topic") @GetMapping("topic")
public PulsarTopic topic(@RequestParam("name") String name, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException { public PulsarTopic topic(@RequestParam("name") String name, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name); PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder() PulsarAdmin admin = admin(info);
.serviceHttpUrl(adminUrl(info)) return convert(admin, topic);
.build()) {
return convert(admin, topic);
}
} }
@Cacheable(value = "backlog", sync = true) @Cacheable(value = "backlog", sync = true)
@GetMapping("backlog") @GetMapping("backlog")
public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) { public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) {
PulsarInfo info = getInfo(name); PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder() try {
.serviceHttpUrl(adminUrl(info)) PulsarAdmin admin = admin(info);
.build()) {
Map<String, ? extends SubscriptionStats> subscriptions = admin.topics().getStats(topic).getSubscriptions(); Map<String, ? extends SubscriptionStats> subscriptions = admin.topics().getStats(topic).getSubscriptions();
if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) { if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) {
return subscriptions.get(subscription).getMsgBacklog(); return subscriptions.get(subscription).getMsgBacklog();
@@ -294,6 +317,8 @@ public class PulsarQueryApplication {
public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) { public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) {
try (PulsarClient client = PulsarClient.builder() try (PulsarClient client = PulsarClient.builder()
.serviceUrl(url) .serviceUrl(url)
.connectionTimeout(5, TimeUnit.SECONDS)
.operationTimeout(5, TimeUnit.SECONDS)
.build()) { .build()) {
try (Consumer<String> consumer = client.newConsumer(new StringSchema()) try (Consumer<String> consumer = client.newConsumer(new StringSchema())
.topic(topic) .topic(topic)