diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java index e3fe4d2..8f89c1b 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java @@ -3,7 +3,7 @@ package com.lanyuanxiaoyao.service.ai.chat.controller; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.ai.chat.entity.MessageVO; -import com.lanyuanxiaoyao.service.ai.chat.tools.TableTool; +import com.lanyuanxiaoyao.service.ai.chat.tools.KnowledgeTool; import com.lanyuanxiaoyao.service.forest.service.KnowledgeService; import java.io.IOException; import java.time.LocalDateTime; @@ -51,46 +51,39 @@ public class ChatController { } private ChatClient.ChatClientRequestSpec buildRequest(Long knowledgeId, ImmutableList messages) { - var builder = StrUtil.builder() - .append(StrUtil.format(""" - 你是一名专业的AI运维助手,负责“Hudi数据同步服务”的运维工作; - 你将会友好地帮助用户解答关于该平台运维工作的问题,你会尽可能通过各种方式获取知识和数据来解答; - 对于无法通过已有知识回答的问题,你会提示用户你无法解答该问题,而不是虚构不存在的数据或答案; - 对于与该平台无关的问题,你会委婉地拒绝用户,并提示无法回答; - 你将始终在中文语境下进行对话。 + ChatClient.ChatClientRequestSpec spec = chatClient.prompt() + .system(StrUtil.format(""" + 你是一名专业的AI运维助手,专职负责“Hudi数据同步服务”的平台运维工作。你的核心职责是: + 1.友好解答:积极、专业地解答用户(通常是平台管理员或用户)关于该平台运维工作的疑问。 + 2.知识驱动:在解答时,应尽可能通过各种方式(知识库、上下文、外部工具等)获取准确知识和数据来支持回答。 + 3.诚实守界: + 对于无法通过已有知识或数据确认的问题,必须明确告知用户你无法解答,切勿捏造信息或提供不确定的答案。 + 对于与该Hudi数据同步服务平台运维工作无关的问题,需委婉拒绝用户,并明确说明超出你的职责和能力范围。 - Hudi数据同步服务将TeleDB、TelePG数据库中的数据同步到Hudi表,并通过Hudi-Hive插件,将Hudi表包装为Hive表提供给外系统查询使用; - 平台分为数据同步任务、数据压缩任务和服务管理平台三部分;以下是关于该平台的关键概念: - 同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的数据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表; - 压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将一张指定的Hudi MOR表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部系统查询; - 管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等信息; - TeleDB:中国电信自研的分布式MySQL集群组件,可以实现一张MySQL表在物理上拆分为多张MySQL表(这种类似分片的物理表被称作“set”)存储在多个MySQL服务中,实现可扩容、可容灾等特性; - TelePG:中国电信自研的分布式PostgreSQL集群组件,其功能类似TeleDB; - 源端(上游):类似TeleDB、TelePG这些提供原始数据的数据库; - 目标端(下游):数据同步到Hudi表后提供给外部系统查询用的Hive表; - Pulsar:TeleDB、TelePG产生的增量日志(类似MySQL的binlog)会写入Pulsar队列,同步任务再从Pulsar队列中读取增量日志进行处理; - 逻辑表:Hudi数据同步服务按逻辑表为概念配置同步任务,一张逻辑表对应一张TeleDB或TelePG上的数据表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常会按地市进行分片(分区); - Hudi表:逻辑表经过同步任务同步后写入Hudi表,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B表也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他地市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引; - Hive表:即作为目标端的Hudi表,通常和逻辑表一一对应,通过使用Hive外表将经过拆分的Hudi A、B表重新合成为一个Hive表; - Flink 任务:通常指同步任务,根据Hudi表数据量,数据量大的 + 对话语言:中文 + + Hudi数据同步服务 实现从源端数据库(TeleDB、TelePG)到Hudi表的数据实时同步,并通过Hudi-Hive插件将Hudi表封装为Hive外表(即目标端),供外部系统进行SQL查询。 + 数据同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的数据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表。 + 数据压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将一张指定的Hudi表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部系统查询。 + 服务管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等信息,提供友好的前端交互网页。 + 源端(上游):类似TeleDB、TelePG这些提供原始数据的数据库。 + 目标端(下游):数据同步到Hudi表后提供给外部系统查询用的Hive表。 + TeleDB:中国电信自研分布式MySQL集群组件,逻辑上的一张表在物理上被水平分片(Sharding)为多个“set”表存储在多个MySQL节点,支持弹性扩容和容灾。 + TelePG:中国电信自研分布式PostgreSQL集群组件,其架构和功能特性与TeleDB高度相似。 + 逻辑表:一张逻辑表对应一张TeleDB或TelePG上的业务表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常会按地市进行分片(分区)。 + Hudi表:逻辑表数据经同步任务处理后实际存储的位置,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B表也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他地市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引。 + Hive表:通过Hudi-Hive插件创建的Hive外表,作为下游系统的唯一查询入口,该表逻辑上对应源端的一张逻辑表,它将Hudi服务内部可能存在的多个物理Hudi表(如A表、B表或地市分表)聚合封装成一个逻辑视图,透明地对外提供完整的逻辑表数据查询。 + Flink 任务:即数据同步任务,根据逻辑表的数据量通常有如下规则: + 大数据量:采用 1逻辑表:1 Flink任务 模式。该Flink任务内聚合处理该逻辑表对应的所有Hudi表(如A表 + B表或多个地市表)的同步子任务。 + 小数据量:采用 N逻辑表:1 Flink任务 模式。一个Flink任务内聚合处理多张低数据量逻辑表对应的所有Hudi表同步自任务。 + Pulsar队列:来自源端的TeleDB逻辑表的增量日志会被“Canal同步组件”实时写入Pulsar队列,队列中的一条消息对应一条增量日志,有新增(I)、更新(U)、删除(D)、DDL(DDL语句)几种类型操作;TelePG也按同样的逻辑通过“PG Sync”组件将增量日志同步到对应的队列中。 + 压缩调度服务:服务管理平台中有一个专门的服务用于根据指定的策略调度数据压缩任务的运行。 + 压缩调度:压缩任务占用资源大,耗时长,为了平衡资源占用和压缩效率,压缩调度服务会根据不同的策略调用不同Hudi表的压缩任务,通常情况下,从2点开始,每3个小时调度一次全部Hudi表的压缩任务。 + 跨天调度:由于压缩任务执行耗时长,0点的时候,外部系统在Hive表中能查到的最新数据往往还没到0点,为了 当前时间为:{} - """, LocalDateTime.now().format(formatter))); - if (ObjectUtil.isNotNull(knowledgeId)) { - var vo = messages.select(message -> StrUtil.equals(message.getRole(), "user")).getLastOptional().orElseThrow(); - var documents = knowledgeService.query(knowledgeId, vo.getContent(), 10, 0.5); - if (ObjectUtil.isNotEmpty(documents)) { - builder.append(StrUtil.format(""" - 以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问: - {} - - """, documents.makeString("\n"))); - } - } - return chatClient.prompt() - .system(builder.toString()) - .tools(new TableTool()) + """, LocalDateTime.now().format(formatter))) .messages( messages .collect(message -> StrUtil.equals(message.getRole(), ROLE_ASSISTANT) @@ -99,6 +92,10 @@ public class ChatController { .collect(message -> (Message) message) .toList() ); + if (ObjectUtil.isNotNull(knowledgeId)) { + spec.tools(new KnowledgeTool(knowledgeId)); + } + return spec; } @PostMapping("sync") diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/KnowledgeTool.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/KnowledgeTool.java new file mode 100644 index 0000000..5c42b30 --- /dev/null +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/KnowledgeTool.java @@ -0,0 +1,40 @@ +package com.lanyuanxiaoyao.service.ai.chat.tools; + +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication; +import com.lanyuanxiaoyao.service.forest.service.KnowledgeService; +import org.springframework.ai.tool.annotation.Tool; +import org.springframework.ai.tool.annotation.ToolParam; + +/** + * @author lanyuanxiaoyao + * @version 20250606 + */ +public class KnowledgeTool { + private final Long knowledgeId; + + public KnowledgeTool(Long knowledgeId) { + this.knowledgeId = knowledgeId; + } + + @Tool(description = """ + 从知识库中检索相关外部知识 + """) + public String queryKnowledge( + @ToolParam(description = """ + 精炼准确的知识库关键词,使用逗号分隔,尽可能全面地覆盖需要获取的知识 + """) + String query + ) { + KnowledgeService knowledgeService = AiChatApplication.getBean(KnowledgeService.class); + var documents = knowledgeService.query(knowledgeId, query, 10, 0.5); + if (ObjectUtil.isNotEmpty(documents)) { + return StrUtil.format(""" + 以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问: + {} + """, documents.makeString("\n")); + } + return ""; + } +} diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java index f1d5406..315c43c 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java @@ -16,7 +16,7 @@ public class TableTool { @Tool(description = """ 执行SQL语句,获取查询结果;结果可能是一行或多行,行内以逗号分隔字段。 - """, returnDirect = true) + """) public String executeJdbc( @ToolParam(description = """ 完整的MySQL查询语句,禁止使用除select外的任何语句。