From ef983f9eb7c256c24d2842b24546dd094db1df31 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Thu, 13 Jun 2024 16:27:17 +0800 Subject: [PATCH] =?UTF-8?q?feat(service-command-pro):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?Pulsar=E5=91=BD=E4=BB=A4=E8=A1=8C=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- service-command-pro/pom.xml | 4 ++ .../command/pro/commands/PulsarCommand.java | 65 +++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/PulsarCommand.java diff --git a/service-command-pro/pom.xml b/service-command-pro/pom.xml index 628f31a..3b457b0 100644 --- a/service-command-pro/pom.xml +++ b/service-command-pro/pom.xml @@ -98,6 +98,10 @@ + + org.apache.pulsar + pulsar-client + diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/PulsarCommand.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/PulsarCommand.java new file mode 100644 index 0000000..a6ffc21 --- /dev/null +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/PulsarCommand.java @@ -0,0 +1,65 @@ +package com.lanyuanxiaoyao.service.command.pro.commands; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.impl.schema.StringSchema; +import org.apache.pulsar.client.internal.DefaultImplementation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.shell.standard.ShellComponent; +import org.springframework.shell.standard.ShellMethod; +import org.springframework.shell.standard.ShellOption; + +/** + * Pulsar + * + * @author lanyuanxiaoyao + */ +@ShellComponent("Pulsar相关操作") +public class PulsarCommand { + private static final Logger logger = LoggerFactory.getLogger(PulsarCommand.class); + + private static MessageId parseMessageId(String messageIdText) { + return DefaultImplementation.newMessageId(Long.parseLong(messageIdText.split(":")[0]), Long.parseLong(messageIdText.split(":")[1]), -1); + } + + @ShellMethod("read pulsar") + public void readPulsarStream(@ShellOption String url, @ShellOption String topic, @ShellOption(defaultValue = "-1:-1:-1") String messageId) throws IOException { + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(url) + .build()) { + AtomicBoolean running = new AtomicBoolean(true); + try (Reader reader = client.newReader(new StringSchema()) + .topic(topic) + .startMessageIdInclusive() + .startMessageId(parseMessageId(messageId)) + .receiverQueueSize(10000) + .readerName(StrUtil.format("Command-Pro-Reader-{}", IdUtil.simpleUUID())) + .create()) { + while (running.get()) { + Message message = null; + try { + message = reader.readNext(10, TimeUnit.SECONDS); + logger.info("{}", message.getMessageId()); + } catch (Throwable throwable) { + running.set(false); + logger.error("Any error", throwable); + if (ObjectUtil.isNotNull(message)) { + logger.error("Message: " + message); + } else { + logger.error("Message is null"); + } + } + } + } + } + } +}