refactor(pulsar-query): 优化topic存在判断
使用consumer尝试连接pulsar的方式判断topic是否存在似乎更靠谱一点,同时可以避免获取admin url,支持更多的查询场景
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
package com.lanyuanxiaoyao.service.pulsar;
|
package com.lanyuanxiaoyao.service.pulsar;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.IdUtil;
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.eshore.odcp.hudi.connector.exception.PulsarInfoNotFoundException;
|
import com.eshore.odcp.hudi.connector.exception.PulsarInfoNotFoundException;
|
||||||
@@ -14,8 +15,9 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.apache.pulsar.client.admin.PulsarAdmin;
|
import org.apache.pulsar.client.admin.PulsarAdmin;
|
||||||
import org.apache.pulsar.client.admin.PulsarAdminException;
|
import org.apache.pulsar.client.admin.PulsarAdminException;
|
||||||
import org.apache.pulsar.client.api.MessageId;
|
import org.apache.pulsar.client.api.*;
|
||||||
import org.apache.pulsar.client.api.PulsarClientException;
|
import org.apache.pulsar.client.impl.schema.StringSchema;
|
||||||
|
import org.apache.pulsar.common.policies.data.ConsumerStats;
|
||||||
import org.apache.pulsar.common.policies.data.*;
|
import org.apache.pulsar.common.policies.data.*;
|
||||||
import org.eclipse.collections.api.factory.Lists;
|
import org.eclipse.collections.api.factory.Lists;
|
||||||
import org.eclipse.collections.api.list.ImmutableList;
|
import org.eclipse.collections.api.list.ImmutableList;
|
||||||
@@ -279,23 +281,20 @@ public class PulsarQueryApplication {
|
|||||||
|
|
||||||
@Cacheable(value = "exists-topic", sync = true)
|
@Cacheable(value = "exists-topic", sync = true)
|
||||||
@GetMapping("exists_topic")
|
@GetMapping("exists_topic")
|
||||||
public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException {
|
public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) {
|
||||||
String name = name(url);
|
try (PulsarClient client = PulsarClient.builder()
|
||||||
logger.info("Detected name: {}", name);
|
.serviceUrl(url)
|
||||||
PulsarInfo info = getInfo(name);
|
|
||||||
try (PulsarAdmin admin = PulsarAdmin.builder()
|
|
||||||
.serviceHttpUrl(adminUrl(info))
|
|
||||||
.build()) {
|
.build()) {
|
||||||
List<String> tenants = admin.tenants().getTenants();
|
try (Consumer<String> consumer = client.newConsumer(new StringSchema())
|
||||||
for (String tenant : tenants) {
|
.topic(topic)
|
||||||
List<String> namespaces = admin.namespaces().getNamespaces(tenant);
|
.subscriptionMode(SubscriptionMode.NonDurable)
|
||||||
for (String namespace : namespaces) {
|
.subscriptionType(SubscriptionType.Exclusive)
|
||||||
List<String> topics = admin.topics().getList(namespace);
|
.subscriptionName("Pulsar_Exists_Topic_Detect_" + IdUtil.nanoId(8))
|
||||||
if (topics.contains(topic)) {
|
.subscribe()) {
|
||||||
return true;
|
return consumer.isConnected();
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
} catch (PulsarClientException e) {
|
||||||
|
logger.warn("Pulsar {} not contain topic {}", url, topic);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user