feat(checker): 增加pulsar topic检查

This commit is contained in:
2024-03-04 12:02:51 +08:00
parent 6a0bbf3344
commit 6ff19c1ebe
2 changed files with 45 additions and 0 deletions

View File

@@ -1,10 +1,17 @@
package com.lanyuanxiaoyao.service.check.actions;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.check.configuration.PulsarConfigurationProperties;
import com.lanyuanxiaoyao.service.configuration.entity.pulsar.PulsarInfo;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -21,6 +28,12 @@ import org.springframework.stereotype.Component;
public class PulsarChecker extends Checker {
private static final Logger logger = LoggerFactory.getLogger(PulsarChecker.class);
private final PulsarConfigurationProperties pulsarConfigurationProperties;
public PulsarChecker(PulsarConfigurationProperties pulsarConfigurationProperties) {
this.pulsarConfigurationProperties = pulsarConfigurationProperties;
}
private String adminUrl(PulsarInfo info) {
return StrUtil.format("http://{}/admin/v2", info.getAdmin());
}
@@ -58,6 +71,26 @@ public class PulsarChecker extends Checker {
}
}
}
for (Map.Entry<String, List<String>> entry : pulsarConfigurationProperties.getTestTopics().entrySet()) {
String pulsarUrl = entry.getKey();
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarUrl)
.build()) {
for (String topic : entry.getValue()) {
logger.info("Test topic {} in {}", topic, pulsarUrl);
try (Reader<String> reader = client.newReader(new StringSchema())
.startMessageId(MessageId.earliest)
.subscriptionName(StrUtil.format("Checker_{}", topic))
.topic(topic)
.create()) {
Message<String> message = reader.readNext();
if (ObjectUtil.isNull(message) || ObjectUtil.isNull(message.getData())) {
logger.warn("Topic {} receive message is null", topic);
}
}
}
}
}
}
@Override

View File

@@ -1,5 +1,7 @@
package com.lanyuanxiaoyao.service.check.configuration;
import java.util.List;
import java.util.Map;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@@ -13,6 +15,7 @@ import org.springframework.context.annotation.Configuration;
@ConditionalOnProperty(value = "checker.pulsar.enabled", matchIfMissing = true)
public class PulsarConfigurationProperties {
private Boolean enabled = true;
private Map<String, List<String>> testTopics;
public Boolean getEnabled() {
return enabled;
@@ -22,10 +25,19 @@ public class PulsarConfigurationProperties {
this.enabled = enabled;
}
public Map<String, List<String>> getTestTopics() {
return testTopics;
}
public void setTestTopics(Map<String, List<String>> testTopics) {
this.testTopics = testTopics;
}
@Override
public String toString() {
return "PulsarConfigurationProperties{" +
"enabled=" + enabled +
", testTopics=" + testTopics +
'}';
}
}