feat(chat): 优化提示词,增加外部调用方法

This commit is contained in:
v-zhangjc9
2025-06-06 17:20:15 +08:00
parent 951075fc9f
commit e01a883d37
5 changed files with 185 additions and 17 deletions

View File

@@ -4,6 +4,8 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.ai.chat.entity.MessageVO;
import com.lanyuanxiaoyao.service.ai.chat.tools.KnowledgeTool;
import com.lanyuanxiaoyao.service.ai.chat.tools.TableTool;
import com.lanyuanxiaoyao.service.ai.chat.tools.YarnTool;
import com.lanyuanxiaoyao.service.forest.service.KnowledgeService;
import java.io.IOException;
import java.time.LocalDateTime;
@@ -62,24 +64,28 @@ public class ChatController {
对话语言:中文
Hudi数据同步服务 实现从源端数据库TeleDB、TelePG到Hudi表的数据实时同步并通过Hudi-Hive插件将Hudi表封装为Hive外表即目标端供外部系统进行SQL查询。
数据同步任务基于Flink开发运行在Hadoop Yarn集群上该任务常驻运行在集群上每个任务负责一张或多张表的数据同步,同步任务从指定的Pulsar队列中实时读取数据经过数据转换和业务处理后写入Hudi表
数据压缩任务基于Flink开发运行在Hadoop Yarn集群上该任务根据指定的调度策略周期性定时启动每个任务将一张指定的Hudi表中的log数据压缩为parquet数据当压缩完成时上一次压缩到当前压缩之间被写入的数据才能被外部系统查询
服务管理平台使用Spring Cloud框架开发的微服务集群用于管理同步任务和压缩任务的运行、停止和状态监控等信息提供友好的前端交互网页
Hudi数据同步服务
实现从源端数据库TeleDB、TelePG到Hudi表的数据实时同步,并通过Hudi-Hive插件将Hudi表封为Hive外表即目标端供外部系统进行SQL查询
数据同步任务基于Flink开发运行在Hadoop Yarn集群上该任务常驻运行在集群上每个任务负责一张或多张表的据同步同步任务从指定的Pulsar队列中实时读取数据经过数据转换和业务处理后写入Hudi表
数据压缩任务基于Flink开发运行在Hadoop Yarn集群上该任务根据指定的调度策略周期性定时启动每个任务将张指定的Hudi表中的log数据压缩为parquet数据当压缩完成时上一次压缩到当前压缩之间被写入的数据才能被外部统查询
服务管理平台使用Spring Cloud框架开发的微服务集群用于管理同步任务和压缩任务的运行、停止和状态监控等息提供友好的前端交互网页。
源端上游类似TeleDB、TelePG这些提供原始数据的数据库。
目标端下游数据同步到Hudi表后提供给外部系统查询用的Hive表。
TeleDB中国电信自研分布式MySQL集群组件逻辑上的一张表在物理上被水平分片Sharding为多个“set”表存储在个MySQL节点支持弹性扩容和容灾。
TeleDB中国电信自研分布式MySQL集群组件逻辑上的一张表在物理上被水平分片Sharding为多个“set”表存储在个MySQL节点支持弹性扩容和容灾。
TelePG中国电信自研分布式PostgreSQL集群组件其架构和功能特性与TeleDB高度相似。
逻辑表一张逻辑表对应一张TeleDB或TelePG上的业务表逻辑表中包含广东所有地市的业务数据数据量大的表通常按地市进行分片(分区)。
Hudi表逻辑表数据经同步任务处理后实际存储的位置根据逻辑表的数据量数据量大的会根据地市分为A、B表B也被称为大表通常包含广深东佛广州、深圳、东莞、佛山四个大区的数据A表包含广东除了广深东佛外的其他特大表如acct_item会按一个地市对应一个Hudi表Hudi表统一配置为MOR表使用Bucket Index索引。
Hive表通过Hudi-Hive插件创建的Hive外表作为下游系统的唯一查询入口该表逻辑上对应源端的一张逻辑表Hudi服务内部可能存在的多个物理Hudi表如A表、B表或地市分表聚合封装成一个逻辑视图透明地对外提供完整的辑表数据查询。
逻辑表一张逻辑表对应一张TeleDB或TelePG上的业务表逻辑表中包含广东所有地市的业务数据数据量大的表通常按地市进行分片分区
Hudi表逻辑表数据经同步任务处理后实际存储的位置根据逻辑表的数据量数据量大的会根据地市分为A、B表B也被称为大表通常包含广深东佛广州、深圳、东莞、佛山四个大区的数据A表包含广东除了广深东佛外的其他市特大表如acct_item会按一个地市对应一个Hudi表Hudi表统一配置为MOR表使用Bucket Index索引。
Hive表通过Hudi-Hive插件创建的Hive外表作为下游系统的唯一查询入口该表逻辑上对应源端的一张逻辑表它Hudi服务内部可能存在的多个物理Hudi表如A表、B表或地市分表聚合封装成一个逻辑视图透明地对外提供完整的辑表数据查询。
重点表根据业务系统的要求对于有的表及时性和数据完整性有更高的要求这些表被称为重点表在tb_app_collect_table_info表中的tags字段包含“FOCUS”字符的是重点表。
Flink 任务:即数据同步任务,根据逻辑表的数据量通常有如下规则:
大数据量:采用 1逻辑表:1 Flink任务 模式。该Flink任务内聚合处理该逻辑表对应的所有Hudi表如A表 + B表或多地市表)的同步子任务。
小数据量:采用 N逻辑表:1 Flink任务 模式。一个Flink任务内聚合处理多张低数据量逻辑表对应的所有Hudi表同步自务。
Pulsar队列来自源端TeleDB逻辑表增量日志会被“Canal同步组件实时写入Pulsar队列队列中的一条消息对应一条增量日志有新增I、更新U、删除D、DDLDDL语句几种类型操作TelePG也按同样的逻辑通过“PG Sync组件将增量日志同步到对应队列
压缩调度服务:服务管理平台中有一个专门的服务用于根据指定的策略调度数据压缩任务的运行
压缩调度:压缩任务占用资源大耗时长为了平衡资源占用和压缩效率压缩调度服务会根据不同的策略调用不同Hudi表的压缩任务通常情况下从2点开始每3小时调度一次全部Hudi表的压缩任务。
跨天调度:由于压缩任务执行耗时长0点的时候外部系统在Hive表中能查到的最新数据往往还没到0点为了
大数据量:采用 1逻辑表:1 Flink任务 模式。该Flink任务内聚合处理该逻辑表对应的所有Hudi表如A表 + B表或多地市表的同步子任务。
小数据量:采用 N逻辑表:1 Flink任务 模式。一个Flink任务内聚合处理多张低数据量逻辑表对应的所有Hudi表同步自务。
Pulsar队列(消息队列)源端TeleDB逻辑表增量日志(包含新增-I、更新-U、删除-D、DDL操作类型Canal同步组件实时写入Pulsar队列。TelePG逻辑表增量日志由PGSync组件以相同逻辑同步到对应队列。
压缩调度服务:service-scheduler(单实例)按策略将压缩任务放入预调度队列定时将预调度任务转移至各集群对应的压缩任务队列service-launcher(与集群一一对应)定时轮询对应集群的压缩任务队列发现任务即调度执行service-queue提供队列机制scheduler与launcher均基于集群资源状态动态调节任务产生与执行速率利用队列缓冲避免资源超限
压缩调度:压缩任务耗时长、资源大。为平衡效率与资源调度服务通常从凌晨2点起每3小时调度一次全部Hudi表的压缩任务。
跨天调度:为确保关键表数据在0点后及时可用此时Hive最新数据常未达0点0点至2点间对重点表进行更高频压缩调度。此高频率调度持续直至目标表被标记为“已跨天”
跨天判断按照先后次序1. 源端同步组件(Canal/PGSync)判断源表数据是否跨天。若跨天向队列写入跨天消息2.同步任务接收跨天消息在tb_app_collect_table_version表插入记录3.独立程序判断Hudi表数据是否已跨天。若跨天更新tb_app_collect_table_version对应记录状态。此时逻辑表标记为“已跨天”一张表必须先接收到跨天标记然后才能完成跨天。
集群指Hadoop Yarn集群同步任务仅在b12运行压缩任务主要在b12运行部分重点表在b1或a4运行调度服务根据资源动态在多个集群间分配压缩任务其中b12集群使用default队列b1集群使用datalake队列a4集群使用ten_iap.datalake队列。
当前时间为:{}
@@ -95,6 +101,10 @@ public class ChatController {
if (ObjectUtil.isNotNull(knowledgeId)) {
spec.tools(new KnowledgeTool(knowledgeId));
}
spec.tools(
new TableTool(),
new YarnTool()
);
return spec;
}

View File

@@ -1,7 +1,11 @@
package com.lanyuanxiaoyao.service.ai.chat.tools;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.tool.annotation.Tool;
@@ -13,8 +17,9 @@ import org.springframework.ai.tool.annotation.ToolParam;
*/
public class TableTool {
private static final Logger logger = LoggerFactory.getLogger(TableTool.class);
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
@Tool(description = """
/* @Tool(description = """
执行SQL语句获取查询结果结果可能是一行或多行行内以逗号分隔字段。
""")
public String executeJdbc(
@@ -28,8 +33,68 @@ public class TableTool {
.makeString("\n");
logger.info("SQL result: \n{}", result);
return result;
} */
@Tool(description = """
查询表Hudi表、Hive表、逻辑表等数量
""")
private String tableCount(
@ToolParam(description = """
查询类型,取值如下
Hudi表hudi
Hive表hive
逻辑表logic
""") String type
) {
logger.info("Enter method: tableCount[type]. type:{}", type);
var infoService = AiChatApplication.getBean(InfoService.class);
return switch (type) {
case "logic" -> StrUtil.format("""
逻辑表共{}张,其中重点表{}张
""", infoService.tableCount(), infoService.tableFocusCount());
case "hive" -> StrUtil.format("""
Hive表共{}张,其中重点表{}张
""", infoService.hiveCount(), infoService.hiveFocusCount());
case "hudi" -> StrUtil.format("""
Hudi表共{}张,其中重点表{}张
""", infoService.hudiCount(), infoService.hudiFocusCount());
default -> throw new IllegalStateException("Unexpected value: " + type);
};
}
private void queryTableMeta() {
@Tool(description = """
查询Hudi表跨天情况
返回结果包含未接收到跨天标记的表数量和接收到跨天标记但未完成跨天的表数量
一张表必须先接收到跨天标记才能完成跨天两者数量均为0才算所有表完成跨天
""")
public String version(
@ToolParam(description = """
查询指定日期的跨天情况格式为yyyyMMdd如2025年6月6日取值为20250606如果参数为空则查询当天跨天情况
""", required = false)
String date,
@ToolParam(description = """
表类型,取值如下
普通表normal
重点表focus
""")
String type
) {
logger.info("Enter method: version[date, type]. date:{},type:{}", date, type);
InfoService infoService = AiChatApplication.getBean(InfoService.class);
String version = date;
if (StrUtil.isBlank(version)) {
version = LocalDateTime.now().minusDays(1).format(FORMATTER);
} else {
version = LocalDate.parse(version, FORMATTER).minusDays(1).format(FORMATTER);
}
return switch (type) {
case "normal" -> StrUtil.format("""
未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{}
""", infoService.unReceiveVersionNormalTableCount(version), infoService.unScheduledNormalTableCount(version));
case "focus" -> StrUtil.format("""
未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{}
""", infoService.unReceiveVersionFocusTableCount(version), infoService.unScheduledFocusTableCount(version));
default -> throw new IllegalStateException("Unexpected value: " + type);
};
}
}

View File

@@ -0,0 +1,77 @@
package com.lanyuanxiaoyao.service.ai.chat.tools;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
/**
* @author lanyuanxiaoyao
* @version 20250606
*/
public class YarnTool {
private static final Logger logger = LoggerFactory.getLogger(YarnTool.class);
@Tool(description = """
查询yarn集群整体资源情况返回值为资源占用率%
""")
public Double yarnStatus(
@ToolParam(description = """
yarn集群名称取值有b12、b1和a4
""") String cluster
) {
logger.info("Enter method: yarnStatus[cluster]. cluster:{}", cluster);
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
YarnRootQueue status = yarnService.cluster(cluster);
return (status.getUsedCapacity() * 100.0) / status.getCapacity();
}
@Tool(description = """
查询yarn集群中指定队列的资源情况返回值为资源占用率%
""")
public Double yarnQueueStatus(
@ToolParam(description = """
yarn集群名称取值有b12、b1和a4
""") String cluster,
@ToolParam(description = """
yarn队列名称
""") String queue
) {
logger.info("Enter method: yarnQueueStatus[cluster, queue]. cluster:{},queue:{}", cluster, queue);
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
YarnQueue status = yarnService.queueDetail(cluster, queue);
return (status.getAbsoluteCapacity() * 100.0) / status.getAbsoluteMaxCapacity();
}
@Tool(description = """
查询指定集群上同步任务或压缩任务的运行情况
""")
public String yarnTaskStatus(
@ToolParam(description = """
yarn集群名称取值有b12、b1和a4
""") String cluster,
@ToolParam(description = """
查询任务种类,取值如下
同步任务sync
压缩任务compaction
""") String type
) {
logger.info("Enter method: yarnTaskStatus[cluster, type]. cluster:{},type:{}", cluster, type);
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
ImmutableList<YarnApplication> applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) || StrUtil.contains(app.getName(), type));
return StrUtil.format(
"""
运行中:{},调度中:{}
""",
applications.count(app -> StrUtil.equals(app.getState(), "RUNNING")),
applications.count(app -> StrUtil.equals(app.getState(), "ACCEPTED"))
);
}
}

View File

@@ -12,4 +12,4 @@ spring:
model: 'Qwen3-1.7-vllm'
mvc:
async:
request-timeout: 300000
request-timeout: 3600000

View File

@@ -0,0 +1,16 @@
package com.lanyuanxiaoyao.service.ai.chat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
/**
* @author lanyuanxiaoyao
* @version 20250606
*/
public class TestDatetimeFormat {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
public static void main(String[] args) {
System.out.println(LocalDate.parse("20250606", FORMATTER).format(FORMATTER));
}
}