refactor(pulsar-query): 使backlog返回默认值-1 方便后续处理

This commit is contained in:
v-zhangjc9
2024-05-06 15:16:48 +08:00
parent f151374afd
commit 240cef07c4

View File

@@ -15,10 +15,18 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.StringSchema; import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.*; import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList; import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList; import org.eclipse.collections.api.list.MutableList;
@@ -266,7 +274,7 @@ public class PulsarQueryApplication {
@Cacheable(value = "backlog", sync = true) @Cacheable(value = "backlog", sync = true)
@GetMapping("backlog") @GetMapping("backlog")
public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) throws PulsarClientException, PulsarAdminException { public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) {
PulsarInfo info = getInfo(name); PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder() try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info)) .serviceHttpUrl(adminUrl(info))
@@ -275,8 +283,10 @@ public class PulsarQueryApplication {
if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) { if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) {
return subscriptions.get(subscription).getMsgBacklog(); return subscriptions.get(subscription).getMsgBacklog();
} }
return -1L; } catch (PulsarClientException | PulsarAdminException e) {
logger.warn("Pulsar connect error", e);
} }
return -1L;
} }
@Cacheable(value = "exists-topic", sync = true) @Cacheable(value = "exists-topic", sync = true)