feature(pulsar-query): 增加查询 backlog 的方法

This commit is contained in:
2023-05-24 20:16:46 +08:00
parent e3c4d51b15
commit af24551a6b
2 changed files with 23 additions and 3 deletions

View File

@@ -7,9 +7,6 @@ import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarInfo;
import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarNamespace;
import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTenant;
import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTopic;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
@@ -29,6 +26,11 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author lanyuanxiaoyao
* @date 2023-04-27
@@ -127,6 +129,20 @@ public class PulsarQueryApplication {
}
}
@RequestMapping("backlog")
public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) throws PulsarClientException, PulsarAdminException {
PulsarInfo info = getInfo(name);
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(adminUrl(info))
.build()) {
Map<String, ? extends SubscriptionStats> subscriptions = admin.topics().getStats(topic).getSubscriptions();
if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) {
return subscriptions.get(subscription).getBacklogSize();
}
return -1L;
}
}
private static PulsarTopic convert(PulsarAdmin admin, String topic) throws PulsarAdminException {
TopicStats stats = admin.topics().getStats(topic);
MessageId lastMessageId = admin.topics().getLastMessageId(topic);