diff --git a/service-command-pro/pom.xml b/service-command-pro/pom.xml index 628f31a..3b457b0 100644 --- a/service-command-pro/pom.xml +++ b/service-command-pro/pom.xml @@ -98,6 +98,10 @@ + + org.apache.pulsar + pulsar-client + diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/PulsarCommand.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/PulsarCommand.java new file mode 100644 index 0000000..a6ffc21 --- /dev/null +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/PulsarCommand.java @@ -0,0 +1,65 @@ +package com.lanyuanxiaoyao.service.command.pro.commands; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.schema.StringSchema; +import org.apache.pulsar.client.internal.DefaultImplementation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.shell.standard.ShellComponent; +import org.springframework.shell.standard.ShellMethod; +import org.springframework.shell.standard.ShellOption; + +/** + * Pulsar + * + * @author lanyuanxiaoyao + */ +@ShellComponent("Pulsar相关操作") +public class PulsarCommand { + private static final Logger logger = LoggerFactory.getLogger(PulsarCommand.class); + + private static MessageId parseMessageId(String messageIdText) { + return DefaultImplementation.newMessageId(Long.parseLong(messageIdText.split(":")[0]), Long.parseLong(messageIdText.split(":")[1]), -1); + } + + @ShellMethod("read pulsar") + public void readPulsarStream(@ShellOption String url, @ShellOption String topic, @ShellOption(defaultValue = "-1:-1:-1") String messageId) throws IOException { + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(url) + .build()) { + AtomicBoolean running = new AtomicBoolean(true); + try (Reader reader = client.newReader(new StringSchema()) + .topic(topic) + .startMessageIdInclusive() + .startMessageId(parseMessageId(messageId)) + .receiverQueueSize(10000) + .readerName(StrUtil.format("Command-Pro-Reader-{}", IdUtil.simpleUUID())) + .create()) { + while (running.get()) { + Message message = null; + try { + message = reader.readNext(10, TimeUnit.SECONDS); + logger.info("{}", message.getMessageId()); + } catch (Throwable throwable) { + running.set(false); + logger.error("Any error", throwable); + if (ObjectUtil.isNotNull(message)) { + logger.error("Message: " + message); + } else { + logger.error("Message is null"); + } + } + } + } + } + } +}