diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java index 5ff5932..013f5ef 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/PulsarService.java @@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarNamespace; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTenant; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTopic; import org.eclipse.collections.api.list.ImmutableList; +import org.springframework.web.bind.annotation.RequestParam; /** * Pulsar @@ -33,4 +34,7 @@ public interface PulsarService { @Get("/topics") ImmutableList topics(@Query("name") String name, @Query("namespace") String namespace); + + @Get("/backlog") + Long backlog(@Query("name") String name, @Query("topic") String topic, @RequestParam("subscription") String subscription); } diff --git a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java index 21d7efb..89f88c1 100644 --- a/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java +++ b/service-pulsar-query/src/main/java/com/lanyuanxiaoyao/service/pulsar/PulsarQueryApplication.java @@ -7,9 +7,6 @@ import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarInfo; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarNamespace; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTenant; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarTopic; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; @@ -29,6 +26,11 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * @author lanyuanxiaoyao * @date 2023-04-27 @@ -127,6 +129,20 @@ public class PulsarQueryApplication { } } + @RequestMapping("backlog") + public Long backlog(@RequestParam("name") String name, @RequestParam("topic") String topic, @RequestParam("subscription") String subscription) throws PulsarClientException, PulsarAdminException { + PulsarInfo info = getInfo(name); + try (PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(adminUrl(info)) + .build()) { + Map subscriptions = admin.topics().getStats(topic).getSubscriptions(); + if (ObjectUtil.isNotNull(subscriptions) && subscriptions.containsKey(subscription)) { + return subscriptions.get(subscription).getBacklogSize(); + } + return -1L; + } + } + private static PulsarTopic convert(PulsarAdmin admin, String topic) throws PulsarAdminException { TopicStats stats = admin.topics().getStats(topic); MessageId lastMessageId = admin.topics().getLastMessageId(topic);