3 Commits

Author SHA1 Message Date
v-zhangjc9
72c23d916a feat(chat): 优化提示词,增加外部调用方法 2025-06-06 19:56:52 +08:00
v-zhangjc9
e01a883d37 feat(chat): 优化提示词,增加外部调用方法 2025-06-06 17:20:15 +08:00
v-zhangjc9
951075fc9f feat(chat): 优化提示词和知识库查询 2025-06-06 10:57:27 +08:00
6 changed files with 312 additions and 39 deletions

View File

@@ -3,7 +3,9 @@ package com.lanyuanxiaoyao.service.ai.chat.controller;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.ai.chat.entity.MessageVO;
import com.lanyuanxiaoyao.service.ai.chat.tools.KnowledgeTool;
import com.lanyuanxiaoyao.service.ai.chat.tools.TableTool;
import com.lanyuanxiaoyao.service.ai.chat.tools.YarnTool;
import com.lanyuanxiaoyao.service.forest.service.KnowledgeService;
import java.io.IOException;
import java.time.LocalDateTime;
@@ -51,46 +53,111 @@ public class ChatController {
}
private ChatClient.ChatClientRequestSpec buildRequest(Long knowledgeId, ImmutableList<MessageVO> messages) {
var builder = StrUtil.builder()
.append(StrUtil.format("""
你是一名专业的AI运维助手负责“Hudi数据同步服务”的运维工作
你将会友好地帮助用户解答关于该平台运维工作的问题,你会尽可能通过各种方式获取知识和数据来解答;
对于无法通过已有知识回答的问题,你会提示用户你无法解答该问题,而不是虚构不存在的数据或答案;
对于与该平台无关的问题,你会委婉地拒绝用户,并提示无法回答;
你将始终在中文语境下进行对话
ChatClient.ChatClientRequestSpec spec = chatClient.prompt()
.system(StrUtil.format("""
你是一名专业的AI运维助手专职负责“Hudi数据同步服务”的平台运维工作。你的核心职责是:
1.友好解答:积极、专业地解答用户(通常是平台管理员或用户)关于该平台运维工作的疑问。
2.知识驱动:在解答时,应尽可能通过各种方式(知识库、上下文、外部工具等)全面获取准确知识和数据来支持回答。
3.诚实守界:
对于无法通过已有知识或数据确认的问题,必须明确告知用户你无法解答,切勿捏造信息或提供不确定的答案
对于与该Hudi数据同步服务平台运维工作无关的问题需委婉拒绝用户并明确说明超出你的职责和能力范围。
Hudi数据同步服务将TeleDB、TelePG数据库中的数据同步到Hudi表并通过Hudi-Hive插件将Hudi表包装为Hive表提供给外系统查询使用
平台分为数据同步任务、数据压缩任务和服务管理平台三部分;以下是关于该平台的关键概念:
同步任务基于Flink开发运行在Hadoop Yarn集群上该任务常驻运行在集群上每个任务负责一张或多张表的数据同步同步任务从指定的Pulsar队列中实时读取数据经过数据转换和业务处理后写入Hudi表
压缩任务基于Flink开发运行在Hadoop Yarn集群上该任务根据指定的调度策略周期性定时启动每个任务将一张指定的Hudi MOR表中的log数据压缩为parquet数据当压缩完成时上一次压缩到当前压缩之间被写入的数据才能被外部系统查询
管理平台使用Spring Cloud框架开发的微服务集群用于管理同步任务和压缩任务的运行、停止和状态监控等信息
TeleDB中国电信自研的分布式MySQL集群组件可以实现一张MySQL表在物理上拆分为多张MySQL表这种类似分片的物理表被称作“set”存储在多个MySQL服务中实现可扩容、可容灾等特性
TelePG中国电信自研的分布式PostgreSQL集群组件其功能类似TeleDB
源端上游类似TeleDB、TelePG这些提供原始数据的数据库
目标端下游数据同步到Hudi表后提供给外部系统查询用的Hive表
PulsarTeleDB、TelePG产生的增量日志类似MySQL的binlog会写入Pulsar队列同步任务再从Pulsar队列中读取增量日志进行处理
逻辑表Hudi数据同步服务按逻辑表为概念配置同步任务一张逻辑表对应一张TeleDB或TelePG上的数据表逻辑表中包含广东所有地市的业务数据数据量大的表通常会按地市进行分片分区
Hudi表逻辑表经过同步任务同步后写入Hudi表根据逻辑表的数据,数据量大的会根据地市分为A、B表B表也被称为大表通常包含广深东佛广州、深圳、东莞、佛山四个大区的数据A表包含广东除了广深东佛外的其他地市特大表如acct_item会按一个地市对应一个Hudi表Hudi表统一配置为MOR表使用Bucket Index索引
Hive表即作为目标端的Hudi表通常和逻辑表一一对应通过使用Hive外表将经过拆分的Hudi A、B表重新合成为一个Hive表
Flink 任务通常指同步任务根据Hudi表数据量数据量大的
对话语言:中文
Hudi数据同步服务
实现从源端数据库TeleDB、TelePG到Hudi表的数据实时同步并通过Hudi-Hive插件将Hudi表封为Hive外表即目标端供外部系统进行SQL查询
数据同步任务基于Flink开发运行在Hadoop Yarn集群上该任务常驻运行在集群上每个任务负责一张或多张表的据同步同步任务从指定的Pulsar队列中实时读取数据经过数据转换和业务处理后写入Hudi表。
数据压缩任务基于Flink开发运行在Hadoop Yarn集群上该任务根据指定的调度策略周期性定时启动每个任务将张指定的Hudi表中的log数据压缩为parquet数据当压缩完成时上一次压缩到当前压缩之间被写入的数据才能被外部统查询。
服务管理平台使用Spring Cloud框架开发的微服务集群用于管理同步任务和压缩任务的运行、停止和状态监控等息提供友好的前端交互网页。
源端上游类似TeleDB、TelePG这些提供原始数据的数据库
目标端下游数据同步到Hudi表后提供给外部系统查询用的Hive表
TeleDB中国电信自研分布式MySQL集群组件逻辑上的一张表在物理上被水平分片Sharding为多个“set”表存储在个MySQL节点支持弹性扩容和容灾。
TelePG中国电信自研分布式PostgreSQL集群组件其架构和功能特性与TeleDB高度相似。
逻辑表一张逻辑表对应一张TeleDB或TelePG上的业务表逻辑表中包含广东所有地市的业务数据,数据量大的表通常按地市进行分片(分区)。
Hudi表逻辑表数据经同步任务处理后实际存储的位置根据逻辑表的数据量数据量大的会根据地市分为A、B表B也被称为大表通常包含广深东佛广州、深圳、东莞、佛山四个大区的数据A表包含广东除了广深东佛外的其他市特大表如acct_item会按一个地市对应一个Hudi表Hudi表统一配置为MOR表使用Bucket Index索引。
Hive表通过Hudi-Hive插件创建的Hive外表作为下游系统的唯一查询入口该表逻辑上对应源端的一张逻辑表它Hudi服务内部可能存在的多个物理Hudi表如A表、B表或地市分表聚合封装成一个逻辑视图透明地对外提供完整的辑表数据查询。
重点表根据业务系统的要求对于有的表及时性和数据完整性有更高的要求这些表被称为重点表在tb_app_collect_table_info表中的tags字段包含“FOCUS”字符的是重点表。
Flink 任务:即数据同步任务,根据逻辑表的数据量通常有如下规则:
大数据量:采用 1逻辑表:1 Flink任务 模式。该Flink任务内聚合处理该逻辑表对应的所有Hudi表如A表 + B表或多地市表的同步子任务。
小数据量:采用 N逻辑表:1 Flink任务 模式。一个Flink任务内聚合处理多张低数据量逻辑表对应的所有Hudi表同步自务。
Pulsar队列消息队列源端TeleDB逻辑表增量日志包含新增-I、更新-U、删除-D、DDL操作类型由Canal同步组件实时写入Pulsar队列。TelePG逻辑表增量日志由PGSync组件以相同逻辑同步到对应队列。
压缩调度服务service-scheduler(单实例)按策略将压缩任务放入预调度队列定时将预调度任务转移至各集群对应的压缩任务队列service-launcher(与集群一一对应)定时轮询对应集群的压缩任务队列发现任务即调度执行service-queue提供队列机制scheduler与launcher均基于集群资源状态动态调节任务产生与执行速率利用队列缓冲避免资源超限。
压缩调度压缩任务耗时长、资源大。为平衡效率与资源调度服务通常从凌晨2点起每3小时调度一次全部Hudi表的压缩任务。
跨天调度为确保关键表数据在0点后及时可用此时Hive最新数据常未达0点0点至2点间对重点表进行更高频压缩调度。此高频率调度持续直至目标表被标记为“已跨天”
跨天判断按照先后次序1. 源端同步组件(Canal/PGSync)判断源表数据是否跨天。若跨天向队列写入跨天消息2.同步任务接收跨天消息在tb_app_collect_table_version表插入记录3.独立程序判断Hudi表数据是否已跨天。若跨天更新tb_app_collect_table_version对应记录状态。此时逻辑表标记为“已跨天”一张表必须先接收到跨天标记然后才能完成跨天。
集群指Hadoop Yarn集群同步任务仅在b12运行压缩任务主要在b12运行部分重点表在b1或a4运行调度服务根据资源动态在多个集群间分配压缩任务其中b12集群使用default队列b1集群使用datalake队列a4集群使用ten_iap.datalake队列。
Hudi数据同步服务使用的数据表详情如下
hudi_collect_build_b12.tb_app_collect_table_info表记录同步任务配置信息
id主键
alias表别名同样可以唯一标识该记录
flink_job_idtb_app_flink_job_config表主键对应的Flink任务
hudi_job_idhuditb_app_hudi_job_config表主键对应Hudi表的配置
sync_yarn_job_idtb_app_yarn_job_config表主键同步任务的yarn资源配置
compaction_yarn_job_idtb_app_yarn_job_config表主键压缩任务的yarn资源配置
src_db源端数据库名称
src_type源端数据库类型取值有teledb、telepg
src_schema源端数据库schema名称
src_table源端表名
src_pulsar_addrpulsar地址
src_topicpulsar队列名称
tgt_hdfs_pathHudi表对应的hdfs路径
status逻辑删除状态y为正常n为删除
filter_field过滤字段用于指定源端消息中的某个字段进行消息过滤如B表同步任务会使用CITY_ID字段并过滤出该字段为广深东佛编码的消息
filter_values过滤值
filter_type过滤类型INCLUDE表示包含指定过滤值的记录保留EXCLUDE表示包含指定过滤值的记录丢弃
bucket_numberHudi bucket index的参数值
partition_fieldHudi表分区字段通常使用CITY_ID
priority优先级整数数字越大优先级越高优先级高该表对应在压缩调度队列中会排在更靠前的位置
hive_db目标端hive库名称
hive_table目标端hive表名
tags标签使用英文逗号分隔用于标识表的特殊属性
hudi_collect_build_b12.tb_app_collect_table_version表记录跨天版本
flink_job_idtb_app_flink_job_config表主键对应的Flink任务
alias表别名 tb_app_collect_table_info表alias字段对应唯一同步任务
version版本格式为yyyyMMdd如2025年6月6日跨天版本则该值为20250605
op_ts操作时间接收到跨天标记的时间
create_time记录创建时间
update_time记录创建时间
scheduled是否已跨天1为已跨天0为未跨天
hudi_collect_build_b12.tb_app_flink_job_config表Flink 任务配置
id主键
nameFlink任务名称
status逻辑删除状态y为正常n为删除
application_idflink任务对应的yarn任务的application id
hudi_collect_build_b12.tb_app_hudi_job_config表Hudi表原生配置
id主键
source_tasks读取Pulsar消息的Flink算子并行度
write_tasks写Hudi表的Flink算子并行度
keep_file_version保留数据文件版本
keep_commit_version保留时间线版本
hudi_collect_build_b12.tb_app_hudi_sync_state表同步、压缩任务运行状态
id主键由flink_job_id和alias使用短横线连接而成格式为flink_job_id-alias
message_id最新消费到的Pulsar消息的message id
source_start_time同步任务启动时间
source_receive_time同步任务最新接收到消息队列消息的时间
source_checkpoint_time同步任务对应Flink任务最近一次执行checkpoint的时间这个时间每15分钟更新一次可以用来判断flink任务是否还在运行
source_publish_time同步任务接收到的最新一条消息队列的消息被发布到队列中的时间
source_op_time同步任务接收到的最新一条消息队列的消息在源端产生的时间
source_application_id同步任务对应Flink任务对应的yarn任务的application id
source_cluster同步任务运行的yarn集群
compaction_start_time最近一次压缩任务启动时间
compaction_finish_time最近一次压缩任务完成时间
compaction_application_id压缩任务对应的Flink任务对应的yarn任务的application id
compaction_cluster压缩任务运行所在的yarn集群
hudi_collect_build_b12.tb_app_yarn_job_config表同步、压缩任务对应的yarn任务资源配置
id主键
job_manager_memoryJob Manager内存MB
task_manager_memoryTask Manager内存MB
当前时间为:{}
""", LocalDateTime.now().format(formatter)));
if (ObjectUtil.isNotNull(knowledgeId)) {
var vo = messages.select(message -> StrUtil.equals(message.getRole(), "user")).getLastOptional().orElseThrow();
var documents = knowledgeService.query(knowledgeId, vo.getContent(), 10, 0.5);
if (ObjectUtil.isNotEmpty(documents)) {
builder.append(StrUtil.format("""
以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问:
{}
""", documents.makeString("\n")));
}
}
return chatClient.prompt()
.system(builder.toString())
.tools(new TableTool())
""", LocalDateTime.now().format(formatter)))
.messages(
messages
.collect(message -> StrUtil.equals(message.getRole(), ROLE_ASSISTANT)
@@ -99,6 +166,14 @@ public class ChatController {
.collect(message -> (Message) message)
.toList()
);
if (ObjectUtil.isNotNull(knowledgeId)) {
spec.tools(new KnowledgeTool(knowledgeId));
}
spec.tools(
new TableTool(),
new YarnTool()
);
return spec;
}
@PostMapping("sync")

View File

@@ -0,0 +1,40 @@
package com.lanyuanxiaoyao.service.ai.chat.tools;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
import com.lanyuanxiaoyao.service.forest.service.KnowledgeService;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
/**
* @author lanyuanxiaoyao
* @version 20250606
*/
public class KnowledgeTool {
private final Long knowledgeId;
public KnowledgeTool(Long knowledgeId) {
this.knowledgeId = knowledgeId;
}
@Tool(description = """
从知识库中检索相关外部知识
""")
public String queryKnowledge(
@ToolParam(description = """
精炼准确的知识库关键词,使用逗号分隔,尽可能全面地覆盖需要获取的知识
""")
String query
) {
KnowledgeService knowledgeService = AiChatApplication.getBean(KnowledgeService.class);
var documents = knowledgeService.query(knowledgeId, query, 10, 0.5);
if (ObjectUtil.isNotEmpty(documents)) {
return StrUtil.format("""
以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问:
{}
""", documents.makeString("\n"));
}
return "";
}
}

View File

@@ -1,7 +1,11 @@
package com.lanyuanxiaoyao.service.ai.chat.tools;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.tool.annotation.Tool;
@@ -13,10 +17,11 @@ import org.springframework.ai.tool.annotation.ToolParam;
*/
public class TableTool {
private static final Logger logger = LoggerFactory.getLogger(TableTool.class);
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
@Tool(description = """
执行SQL语句获取查询结果结果可能是一行或多行行内以逗号分隔字段。
""", returnDirect = true)
""")
public String executeJdbc(
@ToolParam(description = """
完整的MySQL查询语句禁止使用除select外的任何语句。
@@ -30,6 +35,66 @@ public class TableTool {
return result;
}
private void queryTableMeta() {
@Tool(description = """
查询表Hudi表、Hive表、逻辑表等数量
""")
private String tableCount(
@ToolParam(description = """
查询类型,取值如下
Hudi表hudi
Hive表hive
逻辑表logic
""") String type
) {
logger.info("Enter method: tableCount[type]. type:{}", type);
var infoService = AiChatApplication.getBean(InfoService.class);
return switch (type) {
case "logic" -> StrUtil.format("""
逻辑表共{}张,其中重点表{}张
""", infoService.tableCount(), infoService.tableFocusCount());
case "hive" -> StrUtil.format("""
Hive表共{}张,其中重点表{}张
""", infoService.hiveCount(), infoService.hiveFocusCount());
case "hudi" -> StrUtil.format("""
Hudi表共{}张,其中重点表{}张
""", infoService.hudiCount(), infoService.hudiFocusCount());
default -> throw new IllegalStateException("Unexpected value: " + type);
};
}
@Tool(description = """
查询Hudi表跨天情况
返回结果包含未接收到跨天标记的表数量和接收到跨天标记但未完成跨天的表数量
一张表必须先接收到跨天标记才能完成跨天两者数量均为0才算所有表完成跨天
""")
public String version(
@ToolParam(description = """
查询指定日期的跨天情况格式为yyyyMMdd如2025年6月6日取值为20250606如果参数为空则查询当天跨天情况
""", required = false)
String date,
@ToolParam(description = """
表类型,取值如下
普通表normal
重点表focus
""")
String type
) {
logger.info("Enter method: version[date, type]. date:{},type:{}", date, type);
InfoService infoService = AiChatApplication.getBean(InfoService.class);
String version = date;
if (StrUtil.isBlank(version)) {
version = LocalDateTime.now().minusDays(1).format(FORMATTER);
} else {
version = LocalDate.parse(version, FORMATTER).minusDays(1).format(FORMATTER);
}
return switch (type) {
case "normal" -> StrUtil.format("""
未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{}
""", infoService.unReceiveVersionNormalTableCount(version), infoService.unScheduledNormalTableCount(version));
case "focus" -> StrUtil.format("""
未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{}
""", infoService.unReceiveVersionFocusTableCount(version), infoService.unScheduledFocusTableCount(version));
default -> throw new IllegalStateException("Unexpected value: " + type);
};
}
}

View File

@@ -0,0 +1,77 @@
package com.lanyuanxiaoyao.service.ai.chat.tools;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
import com.lanyuanxiaoyao.service.forest.service.YarnService;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
/**
* @author lanyuanxiaoyao
* @version 20250606
*/
public class YarnTool {
private static final Logger logger = LoggerFactory.getLogger(YarnTool.class);
@Tool(description = """
查询yarn集群整体资源情况返回值为资源占用率%
""")
public Double yarnStatus(
@ToolParam(description = """
yarn集群名称取值有b12、b1和a4
""") String cluster
) {
logger.info("Enter method: yarnStatus[cluster]. cluster:{}", cluster);
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
YarnRootQueue status = yarnService.cluster(cluster);
return (status.getUsedCapacity() * 100.0) / status.getCapacity();
}
@Tool(description = """
查询yarn集群中指定队列的资源情况返回值为资源占用率%
""")
public Double yarnQueueStatus(
@ToolParam(description = """
yarn集群名称取值有b12、b1和a4
""") String cluster,
@ToolParam(description = """
yarn队列名称
""") String queue
) {
logger.info("Enter method: yarnQueueStatus[cluster, queue]. cluster:{},queue:{}", cluster, queue);
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
YarnQueue status = yarnService.queueDetail(cluster, queue);
return (status.getAbsoluteCapacity() * 100.0) / status.getAbsoluteMaxCapacity();
}
@Tool(description = """
查询指定集群上同步任务或压缩任务的运行情况
""")
public String yarnTaskStatus(
@ToolParam(description = """
yarn集群名称取值有b12、b1和a4
""") String cluster,
@ToolParam(description = """
查询任务种类,取值如下
同步任务Sync
压缩任务Compaction
""") String type
) {
logger.info("Enter method: yarnTaskStatus[cluster, type]. cluster:{},type:{}", cluster, type);
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
ImmutableList<YarnApplication> applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) && StrUtil.contains(app.getName(), type));
return StrUtil.format(
"""
运行中:{},调度中:{}
""",
applications.count(app -> StrUtil.equals(app.getState(), "RUNNING")),
applications.count(app -> StrUtil.equals(app.getState(), "ACCEPTED"))
);
}
}

View File

@@ -12,4 +12,4 @@ spring:
model: 'Qwen3-1.7-vllm'
mvc:
async:
request-timeout: 300000
request-timeout: 3600000

View File

@@ -0,0 +1,16 @@
package com.lanyuanxiaoyao.service.ai.chat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
/**
* @author lanyuanxiaoyao
* @version 20250606
*/
public class TestDatetimeFormat {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
public static void main(String[] args) {
System.out.println(LocalDate.parse("20250606", FORMATTER).format(FORMATTER));
}
}