From af24551a6b482bb571275508d0e5296233192ac6 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 24 May 2023 20:16:46 +0800 Subject: [PATCH] =?UTF-8?q?feature(pulsar-query):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=20backlog=20=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/forest/service/PulsarService.java | 4 ++++ .../pulsar/PulsarQueryApplication.java | 22 ++++++++++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java index 5ff5932..013f5ef 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java @@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarNamespace; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTenant; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTopic; import org.eclipse.collections.api.list.ImmutableList; +import org.springframework.web.bind.annotation.RequestParam; /** * Pulsar @@ -33,4 +34,7 @@ public interface PulsarService { @Get("/topics") ImmutableList topics(@Query("name") String name, @Query("namespace") String namespace); + + @Get("/backlog") + Long backlog(@Query("name") String name, @Query("topic") String topic, @RequestParam("subscription") String subscription); } diff --git a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java index 21d7efb..89f88c1 100644 --- a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java +++ b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java @@ -7,9 +7,6 @@ import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarInfo; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarNamespace; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTenant; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTopic; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; @@ -29,6 +26,11 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * @author lanyuanxiaoyao * @date 2023-04-27 @@ -127,6 +129,20 @@ public class PulsarQueryApplication { } } + @RequestMapping("backlog") + public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) throws PulsarClientException, PulsarAdminException { + PulsarInfo info = getInfo(name); + try (PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl(info)) + .build()) { + Map subscriptions = admin.topics().getStats(topic).getSubscriptions(); + if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) { + return subscriptions.get(subscription).getBacklogSize(); + } + return -1L; + } + } + private static PulsarTopic convert(PulsarAdmin admin, String topic) throws PulsarAdminException { TopicStats stats = admin.topics().getStats(topic); MessageId lastMessageId = admin.topics().getLastMessageId(topic);