refactor(ai): 迁移chat到知识库中
This commit is contained in:
@@ -1,12 +1,15 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge;
|
||||
|
||||
import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.retry.annotation.EnableRetry;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@@ -20,12 +23,23 @@ import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
@EnableEncryptableProperties
|
||||
@EnableRetry
|
||||
@EnableScheduling
|
||||
public class KnowledgeApplication implements ApplicationRunner {
|
||||
public class KnowledgeApplication implements ApplicationRunner, ApplicationContextAware {
|
||||
private static ApplicationContext context;
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(KnowledgeApplication.class, args);
|
||||
}
|
||||
|
||||
public static <T> T getBean(Class<T> clazz) {
|
||||
return context.getBean(clazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext context) throws BeansException {
|
||||
KnowledgeApplication.context = context;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge.configuration;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
|
||||
public interface Prompts {
|
||||
String hudiBase = """
|
||||
Hudi数据同步服务
|
||||
实现从源端数据库(TeleDB、TelePG)到Hudi表的数据实时同步,并通过Hudi-Hive插件将Hudi表封为Hive外表(即目标端),供外部系统进行SQL查询。
|
||||
数据同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表。
|
||||
数据压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将张指定的Hudi表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部统查询。
|
||||
服务管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等息,提供友好的前端交互网页。
|
||||
源端(上游):类似TeleDB、TelePG这些提供原始数据的数据库。
|
||||
目标端(下游):数据同步到Hudi表后提供给外部系统查询用的Hive表。
|
||||
TeleDB:中国电信自研分布式MySQL集群组件,逻辑上的一张表在物理上被水平分片(Sharding)为多个“set”表存储在个MySQL节点,支持弹性扩容和容灾。
|
||||
TelePG:中国电信自研分布式PostgreSQL集群组件,其架构和功能特性与TeleDB高度相似。
|
||||
逻辑表:一张逻辑表对应一张TeleDB或TelePG上的业务表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常按地市进行分片(分区)。
|
||||
Hudi表:逻辑表数据经同步任务处理后实际存储的位置,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引。
|
||||
Hive表:通过Hudi-Hive插件创建的Hive外表,作为下游系统的唯一查询入口,该表逻辑上对应源端的一张逻辑表,它Hudi服务内部可能存在的多个物理Hudi表(如A表、B表或地市分表)聚合封装成一个逻辑视图,透明地对外提供完整的辑表数据查询。
|
||||
重点表:根据业务系统的要求,对于有的表及时性和数据完整性有更高的要求,这些表被称为重点表,在tb_app_collect_table_info表中的tags字段,包含“FOCUS”字符的是重点表。
|
||||
Flink 任务:即数据同步任务,根据逻辑表的数据量通常有如下规则:
|
||||
大数据量:采用 1逻辑表:1 Flink任务 模式。该Flink任务内聚合处理该逻辑表对应的所有Hudi表(如A表 + B表或多地市表)的同步子任务。
|
||||
小数据量:采用 N逻辑表:1 Flink任务 模式。一个Flink任务内聚合处理多张低数据量逻辑表对应的所有Hudi表同步自务。
|
||||
Pulsar队列(消息队列):源端TeleDB逻辑表增量日志(包含新增-I、更新-U、删除-D、DDL操作类型)由Canal同步组件实时写入Pulsar队列。TelePG逻辑表增量日志由PGSync组件以相同逻辑同步到对应队列。
|
||||
压缩调度服务:service-scheduler(单实例):按策略将压缩任务放入预调度队列,定时将预调度任务转移至各集群对应的压缩任务队列;service-launcher(与集群一一对应):定时轮询对应集群的压缩任务队列,发现任务即调度执行;service-queue:提供队列机制;scheduler与launcher均基于集群资源状态动态调节任务产生与执行速率,利用队列缓冲,避免资源超限。
|
||||
压缩调度:压缩任务耗时长、资源大。为平衡效率与资源,调度服务通常从凌晨2点起,每3小时调度一次全部Hudi表的压缩任务。
|
||||
跨天调度:为确保关键表数据在0点后及时可用(此时Hive最新数据常未达0点),0点至2点间,对重点表进行更高频压缩调度。此高频率调度持续直至目标表被标记为“已跨天”
|
||||
跨天判断:按照先后次序1. 源端同步组件(Canal/PGSync)判断源表数据是否跨天。若跨天,向队列写入跨天消息;2.同步任务接收跨天消息,在tb_app_collect_table_version表插入记录;3.独立程序判断Hudi表数据是否已跨天。若跨天,更新tb_app_collect_table_version对应记录状态。此时逻辑表标记为“已跨天”;一张表必须先接收到跨天标记,然后才能完成跨天。
|
||||
集群:指Hadoop Yarn集群,同步任务仅在b12运行,压缩任务主要在b12运行,部分重点表在b1或a4运行,调度服务根据资源动态在多个集群间分配压缩任务;其中b12集群使用default队列,b1集群使用datalake队列,a4集群使用ten_iap.datalake队列。
|
||||
""";
|
||||
|
||||
String hudiDatabase = """
|
||||
Hudi数据同步服务使用的数据表详情如下
|
||||
hudi_collect_build_b12.tb_app_collect_table_info表:记录同步任务配置信息
|
||||
id:主键
|
||||
alias:表别名,同样可以唯一标识该记录
|
||||
flink_job_id:tb_app_flink_job_config表主键,对应的Flink任务
|
||||
hudi_job_id:hudi:tb_app_hudi_job_config表主键,对应Hudi表的配置
|
||||
sync_yarn_job_id:tb_app_yarn_job_config表主键,同步任务的yarn资源配置
|
||||
compaction_yarn_job_id:tb_app_yarn_job_config表主键,压缩任务的yarn资源配置
|
||||
src_db:源端数据库名称
|
||||
src_type:源端数据库类型,取值有teledb、telepg
|
||||
src_schema:源端数据库schema名称
|
||||
src_table:源端表名
|
||||
src_pulsar_addr:pulsar地址
|
||||
src_topic:pulsar队列名称
|
||||
tgt_hdfs_path:Hudi表对应的hdfs路径
|
||||
status:逻辑删除状态(y为正常,n为删除)
|
||||
filter_field:过滤字段,用于指定源端消息中的某个字段,进行消息过滤,如B表,同步任务会使用CITY_ID字段,并过滤出该字段为广深东佛编码的消息
|
||||
filter_values:过滤值
|
||||
filter_type:过滤类型,INCLUDE表示包含指定过滤值的记录保留,EXCLUDE表示包含指定过滤值的记录丢弃
|
||||
bucket_number:Hudi bucket index的参数值
|
||||
partition_field:Hudi表分区字段,通常使用CITY_ID
|
||||
priority:优先级,整数,数字越大优先级越高,优先级高,该表对应在压缩调度队列中会排在更靠前的位置
|
||||
hive_db:目标端hive库名称
|
||||
hive_table:目标端hive表名
|
||||
tags:标签,使用英文逗号分隔,用于标识表的特殊属性
|
||||
|
||||
hudi_collect_build_b12.tb_app_collect_table_version表:记录跨天版本
|
||||
flink_job_id:tb_app_flink_job_config表主键,对应的Flink任务
|
||||
alias:表别名 tb_app_collect_table_info表alias字段,对应唯一同步任务
|
||||
version:版本,格式为yyyyMMdd,如2025年6月6日跨天版本,则该值为20250605
|
||||
op_ts:操作时间,接收到跨天标记的时间
|
||||
create_time:记录创建时间
|
||||
update_time:记录创建时间
|
||||
scheduled:是否已跨天,1为已跨天,0为未跨天
|
||||
|
||||
hudi_collect_build_b12.tb_app_flink_job_config表:Flink 任务配置
|
||||
id:主键
|
||||
name:Flink任务名称
|
||||
status:逻辑删除状态(y为正常,n为删除)
|
||||
application_id:flink任务对应的yarn任务的application id
|
||||
|
||||
hudi_collect_build_b12.tb_app_hudi_job_config表:Hudi表原生配置
|
||||
id:主键
|
||||
source_tasks:读取Pulsar消息的Flink算子并行度
|
||||
write_tasks:写Hudi表的Flink算子并行度
|
||||
keep_file_version:保留数据文件版本
|
||||
keep_commit_version:保留时间线版本
|
||||
|
||||
hudi_collect_build_b12.tb_app_hudi_sync_state表:同步、压缩任务运行状态
|
||||
id:主键,由flink_job_id和alias使用短横线连接而成,格式为:flink_job_id-alias
|
||||
message_id:最新消费到的Pulsar消息的message id
|
||||
source_start_time:同步任务启动时间
|
||||
source_receive_time:同步任务最新接收到消息队列消息的时间
|
||||
source_checkpoint_time:同步任务对应Flink任务最近一次执行checkpoint的时间,这个时间每15分钟更新一次,可以用来判断flink任务是否还在运行
|
||||
source_publish_time:同步任务接收到的最新一条消息队列的消息被发布到队列中的时间
|
||||
source_op_time:同步任务接收到的最新一条消息队列的消息在源端产生的时间
|
||||
source_application_id:同步任务对应Flink任务对应的yarn任务的application id
|
||||
source_cluster:同步任务运行的yarn集群
|
||||
compaction_start_time:最近一次压缩任务启动时间
|
||||
compaction_finish_time:最近一次压缩任务完成时间
|
||||
compaction_application_id:压缩任务对应的Flink任务对应的yarn任务的application id
|
||||
compaction_cluster:压缩任务运行所在的yarn集群
|
||||
|
||||
hudi_collect_build_b12.tb_app_yarn_job_config表:同步、压缩任务对应的yarn任务资源配置
|
||||
id:主键
|
||||
job_manager_memory:Job Manager内存(MB)
|
||||
task_manager_memory:Task Manager内存(MB)
|
||||
""";
|
||||
|
||||
String hudi = StrUtil.format(
|
||||
"""
|
||||
{}
|
||||
|
||||
{}
|
||||
""",
|
||||
hudiBase,
|
||||
hudiDatabase
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,138 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge.controller.caht;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.configuration.Prompts;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.entity.vo.MessageVO;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.tools.ChartTool;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.tools.KnowledgeTool;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.tools.TableTool;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.tools.YarnTool;
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Optional;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.ai.chat.client.ChatClient;
|
||||
import org.springframework.ai.chat.messages.AssistantMessage;
|
||||
import org.springframework.ai.chat.messages.Message;
|
||||
import org.springframework.ai.chat.messages.UserMessage;
|
||||
import org.springframework.ai.chat.model.ChatResponse;
|
||||
import org.springframework.ai.chat.model.Generation;
|
||||
import org.springframework.ai.deepseek.DeepSeekAssistantMessage;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
/**
|
||||
* 聊天
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250514
|
||||
*/
|
||||
@Controller
|
||||
@RequestMapping("chat")
|
||||
public class ChatController {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ChatController.class);
|
||||
private final static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
private static final String ROLE_ASSISTANT = "assistant";
|
||||
private static final String ROLE_USER = "user";
|
||||
|
||||
private final ChatClient chatClient;
|
||||
|
||||
public ChatController(@Qualifier("chat") ChatClient.Builder builder) {
|
||||
this.chatClient = builder.build();
|
||||
}
|
||||
|
||||
private ChatClient.ChatClientRequestSpec buildRequest(ImmutableList<MessageVO> messages) {
|
||||
ChatClient.ChatClientRequestSpec spec = chatClient.prompt()
|
||||
.system(
|
||||
StrUtil.format("""
|
||||
你是一名专业的AI运维助手,专职负责“Hudi数据同步服务”的平台运维工作。你的核心职责是:
|
||||
1.友好解答:积极、专业地解答用户(通常是平台管理员或用户)关于该平台运维工作的疑问。
|
||||
2.知识驱动:在解答时,应尽可能通过各种方式(知识库、上下文、外部工具等)全面获取准确知识和数据来支持回答。
|
||||
3.诚实守界:
|
||||
对于无法通过已有知识或数据确认的问题,必须明确告知用户你无法解答,切勿捏造信息或提供不确定的答案。
|
||||
对于与该Hudi数据同步服务平台运维工作无关的问题,需委婉拒绝用户,并明确说明超出你的职责和能力范围。
|
||||
|
||||
对话语言:中文
|
||||
|
||||
{}
|
||||
|
||||
当前时间为:{}
|
||||
|
||||
""",
|
||||
Prompts.hudi,
|
||||
LocalDateTime.now().format(formatter)
|
||||
))
|
||||
.messages(
|
||||
messages
|
||||
.collect(message -> StrUtil.equals(message.getRole(), ROLE_ASSISTANT)
|
||||
? new AssistantMessage(message.getContent())
|
||||
: new UserMessage(message.getContent()))
|
||||
.collect(message -> (Message) message)
|
||||
.toList()
|
||||
);
|
||||
spec.tools(
|
||||
new TableTool(),
|
||||
new YarnTool(),
|
||||
new ChartTool()
|
||||
);
|
||||
return spec;
|
||||
}
|
||||
|
||||
@PostMapping("sync")
|
||||
@ResponseBody
|
||||
public MessageVO chatSync(
|
||||
@RequestBody ImmutableList<MessageVO> messages
|
||||
) {
|
||||
ChatResponse response = buildRequest(messages)
|
||||
.call()
|
||||
.chatResponse();
|
||||
return toMessage(response);
|
||||
}
|
||||
|
||||
@PostMapping("async")
|
||||
public SseEmitter chatAsync(
|
||||
@RequestBody ImmutableList<MessageVO> messages
|
||||
) {
|
||||
SseEmitter emitter = new SseEmitter();
|
||||
buildRequest(messages)
|
||||
.stream()
|
||||
.chatResponse()
|
||||
.subscribe(
|
||||
response -> {
|
||||
try {
|
||||
emitter.send(toMessage(response));
|
||||
} catch (IOException e) {
|
||||
emitter.completeWithError(e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
},
|
||||
emitter::completeWithError,
|
||||
emitter::complete
|
||||
);
|
||||
return emitter;
|
||||
}
|
||||
|
||||
private MessageVO toMessage(ChatResponse response) {
|
||||
AssistantMessage message = Optional.ofNullable(response)
|
||||
.map(ChatResponse::getResult)
|
||||
.map(Generation::getOutput)
|
||||
.orElseThrow(() -> new RuntimeException("ChatResponse is null"));
|
||||
MessageVO vo = new MessageVO();
|
||||
vo.setRole(ROLE_ASSISTANT);
|
||||
vo.setContent(message.getText());
|
||||
if (message instanceof DeepSeekAssistantMessage deepseekMessage) {
|
||||
vo.setReason(deepseekMessage.getReasoningContent());
|
||||
}
|
||||
return vo;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge.entity.vo;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250516
|
||||
*/
|
||||
@Data
|
||||
public class MessageVO {
|
||||
private String role;
|
||||
private String content;
|
||||
private String reason;
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge.tools;
|
||||
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.KnowledgeApplication;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.ai.chat.client.ChatClient;
|
||||
import org.springframework.ai.tool.annotation.Tool;
|
||||
import org.springframework.ai.tool.annotation.ToolParam;
|
||||
|
||||
/**
|
||||
* 图表工具
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250611
|
||||
*/
|
||||
@Slf4j
|
||||
public class ChartTool {
|
||||
@Tool(description = """
|
||||
根据需求生成mermaid图表代码
|
||||
""")
|
||||
public String mermaid(
|
||||
// language=TEXT
|
||||
@ToolParam(description = """
|
||||
请按以下结构描述图表需求:
|
||||
1.图表类型:[必填] 明确指定图表类型,例如:流程图(flowchart TD)、序列图(sequenceDiagram)、类图(classDiagram)、甘特图(gantt)、状态图(stateDiagram-v2)、用户旅程图(journey)、饼图(pie)、思维导图(mindmap)、实体关系图(erDiagram)等。
|
||||
2.图表标题:[可选] 简述图表的核心主题,例如:“用户登录流程”,“订单处理系统时序交互”,“项目开发计划甘特图”。
|
||||
3.核心内容:[必填] 描述图表逻辑:
|
||||
实体清单:列出所有节点/角色/对象(如“用户、支付系统、数据库”)
|
||||
示例 (流程图):`用户, 登录界面, 认证服务, 数据库, 主界面, 错误提示`
|
||||
示例 (序列图):`用户 (User), 客户端 (ClientApp), 认证服务器 (AuthServer), 数据库 (DB)`
|
||||
示例 (甘特图):`需求分析, 设计, 开发, 测试, 部署`
|
||||
关系描述:说明实体间交互/流程顺序(如“用户提交订单 → 支付系统验证 → 数据库更新记录”)
|
||||
示例 (流程图):`用户 输入 -> 登录界面 -> 提交 -> 认证服务 -> 验证通过? --> 是 --> 主界面; 否 --> 错误提示`
|
||||
示例 (序列图):`用户 -> 客户端:输入凭据; 客户端 -> 认证服务器:发送认证请求; 认证服务器 -> 数据库:查询用户信息; 数据库 --> 认证服务器:返回结果; 认证服务器 --> 客户端:返回认证结果; 客户端 --> 用户:显示结果`
|
||||
示例 (甘特图):`需求分析 开始于 2023-10-01, 持续 5天; 设计 开始于 需求分析结束, 持续 7天; ...`
|
||||
示例(饼图):`成功:15.25; 失败:20.22`
|
||||
关键分支:如有条件判断,说明条件及路径(如“验证失败 → 显示错误信息”)
|
||||
示例 (流程图):`认证服务 --> |验证通过| 主界面; 认证服务 --> |验证失败| 错误提示`
|
||||
4.样式规范:[选填] 指定细节要求:
|
||||
主题:指定预定义主题,例如:default, forest, dark, neutral]` (默认是 default)
|
||||
颜色:实体颜色映射(如“用户节点用#3498db”)
|
||||
形状:特殊节点形状(如“决策节点用菱形”)
|
||||
布局:指定图表方向,例如:从上到下 (TB/TD), 从左到右 (LR), 从右到左 (RL), 从下到上 (BT)]` (默认通常是 TB/TD)
|
||||
其他:箭头类型(实线/虚线)、注释文本等
|
||||
""") String request
|
||||
) {
|
||||
log.info("Enter method: mermaid[request]. request:{}", request);
|
||||
ChatClient.Builder builder = KnowledgeApplication.getBean(ChatClient.Builder.class);
|
||||
ChatClient client = builder.build();
|
||||
return client.prompt()
|
||||
// language=TEXT
|
||||
.system("""
|
||||
你是一位专业的Mermaid图表专家,精通各类图表语法(流程图、序列图、类图、甘特图、状态图、饼图等),并能根据需求生成清晰、准确、符合最佳实践的Mermaid代码。
|
||||
请根据我提供的详细描述,为我生成可直接用于渲染的Mermaid图表代码。确保代码语法正确、布局合理、易于理解。
|
||||
输出要求:
|
||||
仅输出纯净的、可立即渲染的 Mermaid 代码。
|
||||
不要包含任何解释性文字、Markdown 标记(除了代码块标识)或额外的说明。
|
||||
代码格式清晰,使用适当的缩进(如果适用)。
|
||||
确保代码语法完全符合 Mermaid 官方文档规范。
|
||||
""")
|
||||
.user(request)
|
||||
.call()
|
||||
.content();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge.tools;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.KnowledgeApplication;
|
||||
import com.lanyuanxiaoyao.service.forest.service.KnowledgeService;
|
||||
import org.springframework.ai.tool.annotation.Tool;
|
||||
import org.springframework.ai.tool.annotation.ToolParam;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250606
|
||||
*/
|
||||
public class KnowledgeTool {
|
||||
private final Long knowledgeId;
|
||||
|
||||
public KnowledgeTool(Long knowledgeId) {
|
||||
this.knowledgeId = knowledgeId;
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
从知识库中检索相关外部知识
|
||||
""")
|
||||
public String queryKnowledge(
|
||||
@ToolParam(description = """
|
||||
精炼准确的知识库关键词,使用逗号分隔,尽可能全面地覆盖需要获取的知识
|
||||
""")
|
||||
String query
|
||||
) {
|
||||
KnowledgeService knowledgeService = KnowledgeApplication.getBean(KnowledgeService.class);
|
||||
var documents = knowledgeService.query(knowledgeId, query, 10, 0.5);
|
||||
if (ObjectUtil.isNotEmpty(documents)) {
|
||||
return StrUtil.format("""
|
||||
以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问:
|
||||
{}
|
||||
""", documents.makeString("\n"));
|
||||
}
|
||||
return "";
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge.tools;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.KnowledgeApplication;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.ai.tool.annotation.Tool;
|
||||
import org.springframework.ai.tool.annotation.ToolParam;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250605
|
||||
*/
|
||||
@Slf4j
|
||||
public class TableTool {
|
||||
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
|
||||
@Tool(description = """
|
||||
执行SQL语句,获取查询结果;结果可能是一行或多行,行内以逗号分隔字段。
|
||||
""")
|
||||
public String executeJdbc(
|
||||
@ToolParam(description = """
|
||||
完整的MySQL查询语句,禁止使用除select外的任何语句。
|
||||
""") String sql
|
||||
) {
|
||||
log.info("Enter method: executeJdbc[sql]. sql:{}", sql);
|
||||
InfoService infoService = KnowledgeApplication.getBean(InfoService.class);
|
||||
String result = infoService.jdbc(sql)
|
||||
.collect(map -> map.valuesView().makeString(","))
|
||||
.makeString("\n");
|
||||
log.info("SQL result: \n{}", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
查询表(Hudi表、Hive表、逻辑表等)数量
|
||||
""")
|
||||
private String tableCount(
|
||||
@ToolParam(description = """
|
||||
查询类型,取值如下
|
||||
Hudi表:hudi
|
||||
Hive表:hive
|
||||
逻辑表:logic
|
||||
一次调用只能传一个类型,不支持多个类型同时查询
|
||||
""") String type
|
||||
) {
|
||||
log.info("Enter method: tableCount[type]. type:{}", type);
|
||||
var infoService = KnowledgeApplication.getBean(InfoService.class);
|
||||
return switch (type) {
|
||||
case "logic" -> StrUtil.format("""
|
||||
逻辑表共{}张,其中重点表{}张
|
||||
""", infoService.tableCount(), infoService.tableFocusCount());
|
||||
case "hive" -> StrUtil.format("""
|
||||
Hive表共{}张,其中重点表{}张
|
||||
""", infoService.hiveCount(), infoService.hiveFocusCount());
|
||||
case "hudi" -> StrUtil.format("""
|
||||
Hudi表共{}张,其中重点表{}张
|
||||
""", infoService.hudiCount(), infoService.hudiFocusCount());
|
||||
default -> throw new IllegalStateException("Unexpected value: " + type);
|
||||
};
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
查询Hudi表跨天情况
|
||||
返回结果包含未接收到跨天标记的表数量和接收到跨天标记但未完成跨天的表数量
|
||||
一张表必须先接收到跨天标记才能完成跨天,两者数量均为0才算所有表完成跨天
|
||||
""")
|
||||
public String version(
|
||||
@ToolParam(description = """
|
||||
查询指定日期的跨天情况,格式为yyyyMMdd(如2025年6月6日取值为20250606)。
|
||||
一次调用只能传一个日期,不支持多个日期同时查询
|
||||
""")
|
||||
String date,
|
||||
@ToolParam(description = """
|
||||
表类型,取值如下
|
||||
普通表:normal
|
||||
重点表:focus
|
||||
一次调用只能传一个类型,不支持多个类型同时查询
|
||||
""")
|
||||
String type
|
||||
) {
|
||||
log.info("Enter method: version[date, type]. date:{},type:{}", date, type);
|
||||
InfoService infoService = KnowledgeApplication.getBean(InfoService.class);
|
||||
String version = date;
|
||||
if (StrUtil.isBlank(version)) {
|
||||
version = LocalDateTime.now().minusDays(1).format(FORMATTER);
|
||||
} else {
|
||||
version = LocalDate.parse(version, FORMATTER).minusDays(1).format(FORMATTER);
|
||||
}
|
||||
return switch (type) {
|
||||
case "normal" -> StrUtil.format("""
|
||||
未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{}
|
||||
""", infoService.unReceiveVersionNormalTableCount(version), infoService.unScheduledNormalTableCount(version));
|
||||
case "focus" -> StrUtil.format("""
|
||||
未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{}
|
||||
""", infoService.unReceiveVersionFocusTableCount(version), infoService.unScheduledFocusTableCount(version));
|
||||
default -> throw new IllegalStateException("Unexpected value: " + type);
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge.tools;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.KnowledgeApplication;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
|
||||
import com.lanyuanxiaoyao.service.forest.service.YarnService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.ai.tool.annotation.Tool;
|
||||
import org.springframework.ai.tool.annotation.ToolParam;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250606
|
||||
*/
|
||||
@Slf4j
|
||||
public class YarnTool {
|
||||
@Tool(description = """
|
||||
查询yarn集群整体资源情况,返回值为资源占用率(%)
|
||||
""")
|
||||
public Double yarnStatus(
|
||||
@ToolParam(description = """
|
||||
yarn集群名称,取值有b12、b1和a4;
|
||||
一次调用只能查询一个集群,不支持多个集群同时查询
|
||||
""") String cluster
|
||||
) {
|
||||
log.info("Enter method: yarnStatus[cluster]. cluster:{}", cluster);
|
||||
YarnService yarnService = KnowledgeApplication.getBean(YarnService.class);
|
||||
YarnRootQueue status = yarnService.cluster(cluster);
|
||||
return (status.getUsedCapacity() * 100.0) / status.getCapacity();
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
查询yarn集群中指定队列的资源情况,返回值为资源占用率(%)
|
||||
""")
|
||||
public Double yarnQueueStatus(
|
||||
@ToolParam(description = """
|
||||
yarn集群名称,取值有b12、b1和a4;
|
||||
一次调用只能查询一个集群,不支持多个集群同时查询
|
||||
""") String cluster,
|
||||
@ToolParam(description = """
|
||||
yarn队列名称
|
||||
""") String queue
|
||||
) {
|
||||
log.info("Enter method: yarnQueueStatus[cluster, queue]. cluster:{},queue:{}", cluster, queue);
|
||||
YarnService yarnService = KnowledgeApplication.getBean(YarnService.class);
|
||||
YarnQueue status = yarnService.queueDetail(cluster, queue);
|
||||
return (status.getAbsoluteCapacity() * 100.0) / status.getAbsoluteMaxCapacity();
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
查询指定集群上同步任务或压缩任务的运行情况
|
||||
""")
|
||||
public String yarnTaskStatus(
|
||||
@ToolParam(description = """
|
||||
yarn集群名称,取值有b12、b1和a4
|
||||
一次调用只能查询一个集群,不支持多个集群同时查询
|
||||
""") String cluster,
|
||||
@ToolParam(description = """
|
||||
查询任务种类,取值如下
|
||||
同步任务:Sync
|
||||
压缩任务:Compaction
|
||||
一次调用只能传一个类型,不支持多个类型同时查询
|
||||
""") String type
|
||||
) {
|
||||
log.info("Enter method: yarnTaskStatus[cluster, type]. cluster:{},type:{}", cluster, type);
|
||||
YarnService yarnService = KnowledgeApplication.getBean(YarnService.class);
|
||||
ImmutableList<YarnApplication> applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) && StrUtil.contains(app.getName(), type));
|
||||
return StrUtil.format(
|
||||
"""
|
||||
运行中:{},调度中:{}
|
||||
""",
|
||||
applications.count(app -> StrUtil.equals(app.getState(), "RUNNING")),
|
||||
applications.count(app -> StrUtil.equals(app.getState(), "ACCEPTED"))
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user