From 53b5a2f695183ab94af45eb4aefcfdcffde86c58 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Mon, 22 Jan 2024 16:50:59 +0800 Subject: [PATCH] =?UTF-8?q?refactor(pulsar-query):=20=E4=BC=98=E5=8C=96top?= =?UTF-8?q?ic=E5=AD=98=E5=9C=A8=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 使用consumer尝试连接pulsar的方式判断topic是否存在似乎更靠谱一点,同时可以避免获取admin url,支持更多的查询场景 --- .../pulsar/PulsarQueryApplication.java | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java index e7730b8..43bd703 100644 --- a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java +++ b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.pulsar; +import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.eshore.odcp.hudi.connector.exception.PulsarInfoNotFoundException; @@ -14,8 +15,9 @@ import java.util.List; import java.util.Map; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.schema.StringSchema; +import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.*; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; @@ -279,23 +281,20 @@ public class PulsarQueryApplication { @Cacheable(value = "exists-topic", sync = true) @GetMapping("exists_topic") - public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException { - String name = name(url); - logger.info("Detected name: {}", name); - PulsarInfo info = getInfo(name); - try (PulsarAdmin admin = PulsarAdmin.builder() - .serviceHttpUrl(adminUrl(info)) + public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) { + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(url) .build()) { - List tenants = admin.tenants().getTenants(); - for (String tenant : tenants) { - List namespaces = admin.namespaces().getNamespaces(tenant); - for (String namespace : namespaces) { - List topics = admin.topics().getList(namespace); - if (topics.contains(topic)) { - return true; - } - } + try (Consumer consumer = client.newConsumer(new StringSchema()) + .topic(topic) + .subscriptionMode(SubscriptionMode.NonDurable) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("Pulsar_Exists_Topic_Detect_" + IdUtil.nanoId(8)) + .subscribe()) { + return consumer.isConnected(); } + } catch (PulsarClientException e) { + logger.warn("Pulsar {} not contain topic {}", url, topic); } return false; }