feat(pulsar-query): 增加按指定url和topic判断是否匹配的接口

This commit is contained in:
2024-01-19 14:35:04 +08:00
parent 2686bf7686
commit 521e82104f
2 changed files with 123 additions and 92 deletions

View File

@@ -36,5 +36,8 @@ public interface PulsarService {
ImmutableList<PulsarTopic> topics(@Query("name") String name, @Query("namespace") String namespace);
@Get("/backlog")
Long backlog(@Query("name") String name, @Query("topic") String topic, @RequestParam("subscription") String subscription);
Long backlog(@Query("name") String name, @Query("topic") String topic, @Query("subscription") String subscription);
@Get("/exists_topic")
Boolean existsTopic(@Query("url") String url, @Query("topic") String topic);
}

View File

@@ -25,8 +25,10 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@@ -53,97 +55,6 @@ public class PulsarQueryApplication {
SpringApplication.run(PulsarQueryApplication.class, args);
}
private PulsarInfo getInfo(String name) {
if (PulsarInfo.PULSAR_NAME_INFO_MAPPING.containsKey(name)) {
return PulsarInfo.PULSAR_NAME_INFO_MAPPING.get(name);
}
throw new PulsarInfoNotFoundException("Pulsar info not found for " + name);
}
private String adminUrl(PulsarInfo info) {
return StrUtil.format("http://{}/admin/v2", info.getAdmin());
}
@RequestMapping("names")
public ImmutableList<String> names() {
return Lists.immutable.ofAll(PulsarInfo.DEFAULT_INFOS).collect(PulsarInfo::getName);
}
@RequestMapping("name")
public String name(@RequestParam("pulsar_url") String pulsarUrl) {
String[] urls = pulsarUrl.replaceAll("pulsar://", "").split(",");
if (ObjectUtil.isNotEmpty(urls)) {
return PulsarInfo.PULSAR_CLIENT_NAME_MAPPING.get(urls[0]);
}
throw new PulsarInfoNotFoundException("Pulsar name not found for " + pulsarUrl);
}
@RequestMapping("tenants")
public ImmutableList<PulsarTenant> tenants(@RequestParam("name") String name) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
MutableList<PulsarTenant> result = Lists.mutable.empty();
for (String tenant : admin.tenants().getTenants()) {
result.add(new PulsarTenant(tenant));
}
return result.toImmutable();
}
}
@RequestMapping("namespaces")
public ImmutableList<PulsarNamespace> namespaces(@RequestParam("name") String name, @RequestParam("tenant") String tenant) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
MutableList<PulsarNamespace> result = Lists.mutable.empty();
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
result.add(new PulsarNamespace(tenant, namespace));
}
return result.toImmutable();
}
}
@RequestMapping("topics")
public ImmutableList<PulsarTopic> topics(@RequestParam("name") String name, @RequestParam("namespace") String namespace) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
MutableList<PulsarTopic> result = Lists.mutable.empty();
for (String topic : admin.topics().getList(namespace)) {
result.add(convert(admin, topic));
}
return result.toImmutable();
}
}
@RequestMapping("topic")
public PulsarTopic topic(@RequestParam("name") String name, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
return convert(admin, topic);
}
}
@RequestMapping("backlog")
public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
Map<String, ? extends SubscriptionStats> subscriptions = admin.topics().getStats(topic).getSubscriptions();
if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) {
return subscriptions.get(subscription).getBacklogSize();
}
return -1L;
}
}
private static PulsarTopic convert(PulsarAdmin admin, String topic) throws PulsarAdminException {
TopicStats stats = admin.topics().getStats(topic);
MessageId lastMessageId = admin.topics().getLastMessageId(topic);
@@ -264,4 +175,121 @@ public class PulsarQueryApplication {
subscriptions
);
}
private PulsarInfo getInfo(String name) {
if (PulsarInfo.PULSAR_NAME_INFO_MAPPING.containsKey(name)) {
return PulsarInfo.PULSAR_NAME_INFO_MAPPING.get(name);
}
throw new PulsarInfoNotFoundException("Pulsar info not found for " + name);
}
private String adminUrl(PulsarInfo info) {
return StrUtil.format("http://{}/admin/v2", info.getAdmin());
}
@Cacheable(value = "name", sync = true)
@GetMapping("names")
public ImmutableList<String> names() {
return Lists.immutable.ofAll(PulsarInfo.DEFAULT_INFOS).collect(PulsarInfo::getName);
}
@Cacheable(value = "name", sync = true)
@GetMapping("name")
public String name(@RequestParam("pulsar_url") String pulsarUrl) {
String[] urls = pulsarUrl.replaceAll("pulsar://", "").split(",");
if (ObjectUtil.isNotEmpty(urls)) {
return PulsarInfo.PULSAR_CLIENT_NAME_MAPPING.get(urls[0]);
}
throw new PulsarInfoNotFoundException("Pulsar name not found for " + pulsarUrl);
}
@Cacheable(value = "tenants", sync = true)
@GetMapping("tenants")
public ImmutableList<PulsarTenant> tenants(@RequestParam("name") String name) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
MutableList<PulsarTenant> result = Lists.mutable.empty();
for (String tenant : admin.tenants().getTenants()) {
result.add(new PulsarTenant(tenant));
}
return result.toImmutable();
}
}
@Cacheable(value = "namespaces", sync = true)
@GetMapping("namespaces")
public ImmutableList<PulsarNamespace> namespaces(@RequestParam("name") String name, @RequestParam("tenant") String tenant) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
MutableList<PulsarNamespace> result = Lists.mutable.empty();
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
result.add(new PulsarNamespace(tenant, namespace));
}
return result.toImmutable();
}
}
@GetMapping("topics")
public ImmutableList<PulsarTopic> topics(@RequestParam("name") String name, @RequestParam("namespace") String namespace) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
MutableList<PulsarTopic> result = Lists.mutable.empty();
for (String topic : admin.topics().getList(namespace)) {
result.add(convert(admin, topic));
}
return result.toImmutable();
}
}
@Cacheable(value = "topic", sync = true)
@GetMapping("topic")
public PulsarTopic topic(@RequestParam("name") String name, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
return convert(admin, topic);
}
}
@Cacheable(value = "backlog", sync = true)
@GetMapping("backlog")
public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
Map<String, ? extends SubscriptionStats> subscriptions = admin.topics().getStats(topic).getSubscriptions();
if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) {
return subscriptions.get(subscription).getBacklogSize();
}
return -1L;
}
}
@Cacheable(value = "exists-topic", sync = true)
@GetMapping("exists_topic")
public Boolean existsTopic(@RequestParam("url") String url, @RequestParam("topic") String topic) throws PulsarClientException, PulsarAdminException {
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(url)
.build()) {
List<String> tenants = admin.tenants().getTenants();
for (String tenant : tenants) {
List<String> namespaces = admin.namespaces().getNamespaces(tenant);
for (String namespace : namespaces) {
List<String> topics = admin.topics().getList(namespace);
if (topics.contains(topic)) {
return true;
}
}
}
}
return false;
}
}