diff --git a/service-check/src/main/java/com/lanyuanxiaoyao/service/check/actions/PulsarChecker.java b/service-check/src/main/java/com/lanyuanxiaoyao/service/check/actions/PulsarChecker.java index 8c28953..9e5086b 100644 --- a/service-check/src/main/java/com/lanyuanxiaoyao/service/check/actions/PulsarChecker.java +++ b/service-check/src/main/java/com/lanyuanxiaoyao/service/check/actions/PulsarChecker.java @@ -1,10 +1,17 @@ package com.lanyuanxiaoyao.service.check.actions; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.check.configuration.PulsarConfigurationProperties; import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarInfo; +import java.util.List; +import java.util.Map; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.schema.StringSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; @@ -21,6 +28,12 @@ import org.springframework.stereotype.Component; public class PulsarChecker extends Checker { private static final Logger logger = LoggerFactory.getLogger(PulsarChecker.class); + private final PulsarConfigurationProperties pulsarConfigurationProperties; + + public PulsarChecker(PulsarConfigurationProperties pulsarConfigurationProperties) { + this.pulsarConfigurationProperties = pulsarConfigurationProperties; + } + private String adminUrl(PulsarInfo info) { return StrUtil.format("http://{}/admin/v2", info.getAdmin()); } @@ -58,6 +71,26 @@ public class PulsarChecker extends Checker { } } } + for (Map.Entry> entry : pulsarConfigurationProperties.getTestTopics().entrySet()) { + String pulsarUrl = entry.getKey(); + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarUrl) + .build()) { + for (String topic : entry.getValue()) { + logger.info("Test topic {} in {}", topic, pulsarUrl); + try (Reader reader = client.newReader(new StringSchema()) + .startMessageId(MessageId.earliest) + .subscriptionName(StrUtil.format("Checker_{}", topic)) + .topic(topic) + .create()) { + Message message = reader.readNext(); + if (ObjectUtil.isNull(message) || ObjectUtil.isNull(message.getData())) { + logger.warn("Topic {} receive message is null", topic); + } + } + } + } + } } @Override diff --git a/service-check/src/main/java/com/lanyuanxiaoyao/service/check/configuration/PulsarConfigurationProperties.java b/service-check/src/main/java/com/lanyuanxiaoyao/service/check/configuration/PulsarConfigurationProperties.java index d89901c..7214f6c 100644 --- a/service-check/src/main/java/com/lanyuanxiaoyao/service/check/configuration/PulsarConfigurationProperties.java +++ b/service-check/src/main/java/com/lanyuanxiaoyao/service/check/configuration/PulsarConfigurationProperties.java @@ -1,5 +1,7 @@ package com.lanyuanxiaoyao.service.check.configuration; +import java.util.List; +import java.util.Map; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -13,6 +15,7 @@ import org.springframework.context.annotation.Configuration; @ConditionalOnProperty(value = "checker.pulsar.enabled", matchIfMissing = true) public class PulsarConfigurationProperties { private Boolean enabled = true; + private Map> testTopics; public Boolean getEnabled() { return enabled; @@ -22,10 +25,19 @@ public class PulsarConfigurationProperties { this.enabled = enabled; } + public Map> getTestTopics() { + return testTopics; + } + + public void setTestTopics(Map> testTopics) { + this.testTopics = testTopics; + } + @Override public String toString() { return "PulsarConfigurationProperties{" + "enabled=" + enabled + + ", testTopics=" + testTopics + '}'; } }