From 90fea22de5c1b10c326f68748dc331e2a76eb3b5 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Thu, 5 Jun 2025 19:53:25 +0800 Subject: [PATCH] =?UTF-8?q?feat(chat):=20=E5=A2=9E=E5=8A=A0=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93SQL=E8=AE=BF=E9=97=AE=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/ai/chat/AiChatApplication.java | 16 ++++++- .../ai/chat/controller/ChatController.java | 22 +++++++++- .../service/ai/chat/tools/TableTool.java | 35 +++++++++++++++ .../service/forest/service/InfoService.java | 4 ++ .../info/controller/JdbcController.java | 28 ++++++++++++ .../service/info/service/JdbcService.java | 43 +++++++++++++++++++ 6 files changed, 145 insertions(+), 3 deletions(-) create mode 100644 service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/JdbcController.java create mode 100644 service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/JdbcService.java diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/AiChatApplication.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/AiChatApplication.java index 4243a37..1d9b9f5 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/AiChatApplication.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/AiChatApplication.java @@ -1,10 +1,13 @@ package com.lanyuanxiaoyao.service.ai.chat; import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties; +import org.springframework.beans.BeansException; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.retry.annotation.EnableRetry; /** @@ -16,8 +19,19 @@ import org.springframework.retry.annotation.EnableRetry; @EnableConfigurationProperties @EnableEncryptableProperties @EnableRetry -public class AiChatApplication { +public class AiChatApplication implements ApplicationContextAware { + private static ApplicationContext context; + public static void main(String[] args) { SpringApplication.run(AiChatApplication.class, args); } + + public static T getBean(Class clazz) { + return context.getBean(clazz); + } + + @Override + public void setApplicationContext(ApplicationContext context) throws BeansException { + AiChatApplication.context = context; + } } diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java index 3b0690a..e3fe4d2 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java @@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.ai.chat.controller; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.ai.chat.entity.MessageVO; +import com.lanyuanxiaoyao.service.ai.chat.tools.TableTool; import com.lanyuanxiaoyao.service.forest.service.KnowledgeService; import java.io.IOException; import java.time.LocalDateTime; @@ -52,11 +53,27 @@ public class ChatController { private ChatClient.ChatClientRequestSpec buildRequest(Long knowledgeId, ImmutableList messages) { var builder = StrUtil.builder() .append(StrUtil.format(""" - 你是一名专业的AI运维助手,负责“Hudi数据同步服务平台”的运维工作; + 你是一名专业的AI运维助手,负责“Hudi数据同步服务”的运维工作; 你将会友好地帮助用户解答关于该平台运维工作的问题,你会尽可能通过各种方式获取知识和数据来解答; 对于无法通过已有知识回答的问题,你会提示用户你无法解答该问题,而不是虚构不存在的数据或答案; 对于与该平台无关的问题,你会委婉地拒绝用户,并提示无法回答; 你将始终在中文语境下进行对话。 + + Hudi数据同步服务将TeleDB、TelePG数据库中的数据同步到Hudi表,并通过Hudi-Hive插件,将Hudi表包装为Hive表提供给外系统查询使用; + 平台分为数据同步任务、数据压缩任务和服务管理平台三部分;以下是关于该平台的关键概念: + 同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的数据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表; + 压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将一张指定的Hudi MOR表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部系统查询; + 管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等信息; + TeleDB:中国电信自研的分布式MySQL集群组件,可以实现一张MySQL表在物理上拆分为多张MySQL表(这种类似分片的物理表被称作“set”)存储在多个MySQL服务中,实现可扩容、可容灾等特性; + TelePG:中国电信自研的分布式PostgreSQL集群组件,其功能类似TeleDB; + 源端(上游):类似TeleDB、TelePG这些提供原始数据的数据库; + 目标端(下游):数据同步到Hudi表后提供给外部系统查询用的Hive表; + Pulsar:TeleDB、TelePG产生的增量日志(类似MySQL的binlog)会写入Pulsar队列,同步任务再从Pulsar队列中读取增量日志进行处理; + 逻辑表:Hudi数据同步服务按逻辑表为概念配置同步任务,一张逻辑表对应一张TeleDB或TelePG上的数据表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常会按地市进行分片(分区); + Hudi表:逻辑表经过同步任务同步后写入Hudi表,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B表也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他地市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引; + Hive表:即作为目标端的Hudi表,通常和逻辑表一一对应,通过使用Hive外表将经过拆分的Hudi A、B表重新合成为一个Hive表; + Flink 任务:通常指同步任务,根据Hudi表数据量,数据量大的 + 当前时间为:{} """, LocalDateTime.now().format(formatter))); @@ -65,7 +82,7 @@ public class ChatController { var documents = knowledgeService.query(knowledgeId, vo.getContent(), 10, 0.5); if (ObjectUtil.isNotEmpty(documents)) { builder.append(StrUtil.format(""" - 以下是与用户问题有关的外部知识,优先利用该知识回答用户的提问: + 以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问: {} """, documents.makeString("\n"))); @@ -73,6 +90,7 @@ public class ChatController { } return chatClient.prompt() .system(builder.toString()) + .tools(new TableTool()) .messages( messages .collect(message -> StrUtil.equals(message.getRole(), ROLE_ASSISTANT) diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java new file mode 100644 index 0000000..f1d5406 --- /dev/null +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java @@ -0,0 +1,35 @@ +package com.lanyuanxiaoyao.service.ai.chat.tools; + +import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication; +import com.lanyuanxiaoyao.service.forest.service.InfoService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.ai.tool.annotation.Tool; +import org.springframework.ai.tool.annotation.ToolParam; + +/** + * @author lanyuanxiaoyao + * @version 20250605 + */ +public class TableTool { + private static final Logger logger = LoggerFactory.getLogger(TableTool.class); + + @Tool(description = """ + 执行SQL语句,获取查询结果;结果可能是一行或多行,行内以逗号分隔字段。 + """, returnDirect = true) + public String executeJdbc( + @ToolParam(description = """ + 完整的MySQL查询语句,禁止使用除select外的任何语句。 + """) String sql + ) { + InfoService infoService = AiChatApplication.getBean(InfoService.class); + String result = infoService.jdbc(sql) + .collect(map -> map.valuesView().makeString(",")) + .makeString("\n"); + logger.info("SQL result: \n{}", result); + return result; + } + + private void queryTableMeta() { + } +} diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java index 7944f0f..862fd31 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/InfoService.java @@ -8,6 +8,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.PageResponse; import com.lanyuanxiaoyao.service.configuration.entity.info.*; import java.util.Map; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; /** * Info 接口 @@ -193,4 +194,7 @@ public interface InfoService { @Get("/info/clean_table_version") Integer cleanTableVersion(); + + @Post(value = "/jdbc", contentType = "plain/text") + ImmutableList> jdbc(@Body String sql); } diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/JdbcController.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/JdbcController.java new file mode 100644 index 0000000..59c65a2 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/controller/JdbcController.java @@ -0,0 +1,28 @@ +package com.lanyuanxiaoyao.service.info.controller; + +import com.lanyuanxiaoyao.service.info.service.JdbcService; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author lanyuanxiaoyao + * @version 20250605 + */ +@RestController +@RequestMapping("/jdbc") +public class JdbcController { + private final JdbcService jdbcService; + + public JdbcController(JdbcService jdbcService) { + this.jdbcService = jdbcService; + } + + @PostMapping("") + public ImmutableList> jdbc(@RequestBody String sql) { + return jdbcService.jdbc(sql); + } +} diff --git a/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/JdbcService.java b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/JdbcService.java new file mode 100644 index 0000000..8d3daa5 --- /dev/null +++ b/service-info-query/src/main/java/com/lanyuanxiaoyao/service/info/service/JdbcService.java @@ -0,0 +1,43 @@ +package com.lanyuanxiaoyao.service.info.service; + +import java.util.List; +import java.util.Map; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.map.ImmutableMap; +import org.eclipse.collections.api.map.MutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +/** + * 执行JDBC + * + * @author lanyuanxiaoyao + * @version 20250605 + */ +@Service +public class JdbcService { + private static final Logger logger = LoggerFactory.getLogger(JdbcService.class); + + private final JdbcTemplate jdbcTemplate; + + public JdbcService(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + public ImmutableList> jdbc(String sql) { + logger.info("Executing SQL: {}", sql); + List> lines = jdbcTemplate.queryForList(sql); + return Lists.immutable.ofAll(lines) + .collect(map -> { + MutableMap item = Maps.mutable.empty(); + for (Map.Entry entry : map.entrySet()) { + item.put(entry.getKey(), String.valueOf(entry.getValue())); + } + return item.toImmutable(); + }); + } +}