feat(service-command-pro): 添加Pulsar命令行工具

This commit is contained in:
v-zhangjc9
2024-06-13 16:27:17 +08:00
parent 5bad6b6e1e
commit ef983f9eb7
2 changed files with 69 additions and 0 deletions

View File

@@ -98,6 +98,10 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -0,0 +1,65 @@
package com.lanyuanxiaoyao.service.command.pro.commands;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
/**
* Pulsar
*
* @author lanyuanxiaoyao
*/
@ShellComponent("Pulsar相关操作")
public class PulsarCommand {
private static final Logger logger = LoggerFactory.getLogger(PulsarCommand.class);
private static MessageId parseMessageId(String messageIdText) {
return DefaultImplementation.newMessageId(Long.parseLong(messageIdText.split(":")[0]), Long.parseLong(messageIdText.split(":")[1]), -1);
}
@ShellMethod("read pulsar")
public void readPulsarStream(@ShellOption String url, @ShellOption String topic, @ShellOption(defaultValue = "-1:-1:-1") String messageId) throws IOException {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(url)
.build()) {
AtomicBoolean running = new AtomicBoolean(true);
try (Reader<String> reader = client.newReader(new StringSchema())
.topic(topic)
.startMessageIdInclusive()
.startMessageId(parseMessageId(messageId))
.receiverQueueSize(10000)
.readerName(StrUtil.format("Command-Pro-Reader-{}", IdUtil.simpleUUID()))
.create()) {
while (running.get()) {
Message<String> message = null;
try {
message = reader.readNext(10, TimeUnit.SECONDS);
logger.info("{}", message.getMessageId());
} catch (Throwable throwable) {
running.set(false);
logger.error("Any error", throwable);
if (ObjectUtil.isNotNull(message)) {
logger.error("Message: " + message);
} else {
logger.error("Message is null");
}
}
}
}
}
}
}