Compare commits
3 Commits
90fea22de5
...
72c23d916a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
72c23d916a | ||
|
|
e01a883d37 | ||
|
|
951075fc9f |
@@ -3,7 +3,9 @@ package com.lanyuanxiaoyao.service.ai.chat.controller;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.entity.MessageVO;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.tools.KnowledgeTool;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.tools.TableTool;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.tools.YarnTool;
|
||||
import com.lanyuanxiaoyao.service.forest.service.KnowledgeService;
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
@@ -51,46 +53,111 @@ public class ChatController {
|
||||
}
|
||||
|
||||
private ChatClient.ChatClientRequestSpec buildRequest(Long knowledgeId, ImmutableList<MessageVO> messages) {
|
||||
var builder = StrUtil.builder()
|
||||
.append(StrUtil.format("""
|
||||
你是一名专业的AI运维助手,负责“Hudi数据同步服务”的运维工作;
|
||||
你将会友好地帮助用户解答关于该平台运维工作的问题,你会尽可能通过各种方式获取知识和数据来解答;
|
||||
对于无法通过已有知识回答的问题,你会提示用户你无法解答该问题,而不是虚构不存在的数据或答案;
|
||||
对于与该平台无关的问题,你会委婉地拒绝用户,并提示无法回答;
|
||||
你将始终在中文语境下进行对话。
|
||||
ChatClient.ChatClientRequestSpec spec = chatClient.prompt()
|
||||
.system(StrUtil.format("""
|
||||
你是一名专业的AI运维助手,专职负责“Hudi数据同步服务”的平台运维工作。你的核心职责是:
|
||||
1.友好解答:积极、专业地解答用户(通常是平台管理员或用户)关于该平台运维工作的疑问。
|
||||
2.知识驱动:在解答时,应尽可能通过各种方式(知识库、上下文、外部工具等)全面获取准确知识和数据来支持回答。
|
||||
3.诚实守界:
|
||||
对于无法通过已有知识或数据确认的问题,必须明确告知用户你无法解答,切勿捏造信息或提供不确定的答案。
|
||||
对于与该Hudi数据同步服务平台运维工作无关的问题,需委婉拒绝用户,并明确说明超出你的职责和能力范围。
|
||||
|
||||
Hudi数据同步服务将TeleDB、TelePG数据库中的数据同步到Hudi表,并通过Hudi-Hive插件,将Hudi表包装为Hive表提供给外系统查询使用;
|
||||
平台分为数据同步任务、数据压缩任务和服务管理平台三部分;以下是关于该平台的关键概念:
|
||||
同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的数据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表;
|
||||
压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将一张指定的Hudi MOR表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部系统查询;
|
||||
管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等信息;
|
||||
TeleDB:中国电信自研的分布式MySQL集群组件,可以实现一张MySQL表在物理上拆分为多张MySQL表(这种类似分片的物理表被称作“set”)存储在多个MySQL服务中,实现可扩容、可容灾等特性;
|
||||
TelePG:中国电信自研的分布式PostgreSQL集群组件,其功能类似TeleDB;
|
||||
源端(上游):类似TeleDB、TelePG这些提供原始数据的数据库;
|
||||
目标端(下游):数据同步到Hudi表后提供给外部系统查询用的Hive表;
|
||||
Pulsar:TeleDB、TelePG产生的增量日志(类似MySQL的binlog)会写入Pulsar队列,同步任务再从Pulsar队列中读取增量日志进行处理;
|
||||
逻辑表:Hudi数据同步服务按逻辑表为概念配置同步任务,一张逻辑表对应一张TeleDB或TelePG上的数据表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常会按地市进行分片(分区);
|
||||
Hudi表:逻辑表经过同步任务同步后写入Hudi表,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B表也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他地市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引;
|
||||
Hive表:即作为目标端的Hudi表,通常和逻辑表一一对应,通过使用Hive外表将经过拆分的Hudi A、B表重新合成为一个Hive表;
|
||||
Flink 任务:通常指同步任务,根据Hudi表数据量,数据量大的
|
||||
对话语言:中文
|
||||
|
||||
Hudi数据同步服务
|
||||
实现从源端数据库(TeleDB、TelePG)到Hudi表的数据实时同步,并通过Hudi-Hive插件将Hudi表封为Hive外表(即目标端),供外部系统进行SQL查询。
|
||||
数据同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表。
|
||||
数据压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将张指定的Hudi表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部统查询。
|
||||
服务管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等息,提供友好的前端交互网页。
|
||||
源端(上游):类似TeleDB、TelePG这些提供原始数据的数据库。
|
||||
目标端(下游):数据同步到Hudi表后提供给外部系统查询用的Hive表。
|
||||
TeleDB:中国电信自研分布式MySQL集群组件,逻辑上的一张表在物理上被水平分片(Sharding)为多个“set”表存储在个MySQL节点,支持弹性扩容和容灾。
|
||||
TelePG:中国电信自研分布式PostgreSQL集群组件,其架构和功能特性与TeleDB高度相似。
|
||||
逻辑表:一张逻辑表对应一张TeleDB或TelePG上的业务表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常按地市进行分片(分区)。
|
||||
Hudi表:逻辑表数据经同步任务处理后实际存储的位置,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引。
|
||||
Hive表:通过Hudi-Hive插件创建的Hive外表,作为下游系统的唯一查询入口,该表逻辑上对应源端的一张逻辑表,它Hudi服务内部可能存在的多个物理Hudi表(如A表、B表或地市分表)聚合封装成一个逻辑视图,透明地对外提供完整的辑表数据查询。
|
||||
重点表:根据业务系统的要求,对于有的表及时性和数据完整性有更高的要求,这些表被称为重点表,在tb_app_collect_table_info表中的tags字段,包含“FOCUS”字符的是重点表。
|
||||
Flink 任务:即数据同步任务,根据逻辑表的数据量通常有如下规则:
|
||||
大数据量:采用 1逻辑表:1 Flink任务 模式。该Flink任务内聚合处理该逻辑表对应的所有Hudi表(如A表 + B表或多地市表)的同步子任务。
|
||||
小数据量:采用 N逻辑表:1 Flink任务 模式。一个Flink任务内聚合处理多张低数据量逻辑表对应的所有Hudi表同步自务。
|
||||
Pulsar队列(消息队列):源端TeleDB逻辑表增量日志(包含新增-I、更新-U、删除-D、DDL操作类型)由Canal同步组件实时写入Pulsar队列。TelePG逻辑表增量日志由PGSync组件以相同逻辑同步到对应队列。
|
||||
压缩调度服务:service-scheduler(单实例):按策略将压缩任务放入预调度队列,定时将预调度任务转移至各集群对应的压缩任务队列;service-launcher(与集群一一对应):定时轮询对应集群的压缩任务队列,发现任务即调度执行;service-queue:提供队列机制;scheduler与launcher均基于集群资源状态动态调节任务产生与执行速率,利用队列缓冲,避免资源超限。
|
||||
压缩调度:压缩任务耗时长、资源大。为平衡效率与资源,调度服务通常从凌晨2点起,每3小时调度一次全部Hudi表的压缩任务。
|
||||
跨天调度:为确保关键表数据在0点后及时可用(此时Hive最新数据常未达0点),0点至2点间,对重点表进行更高频压缩调度。此高频率调度持续直至目标表被标记为“已跨天”
|
||||
跨天判断:按照先后次序1. 源端同步组件(Canal/PGSync)判断源表数据是否跨天。若跨天,向队列写入跨天消息;2.同步任务接收跨天消息,在tb_app_collect_table_version表插入记录;3.独立程序判断Hudi表数据是否已跨天。若跨天,更新tb_app_collect_table_version对应记录状态。此时逻辑表标记为“已跨天”;一张表必须先接收到跨天标记,然后才能完成跨天。
|
||||
集群:指Hadoop Yarn集群,同步任务仅在b12运行,压缩任务主要在b12运行,部分重点表在b1或a4运行,调度服务根据资源动态在多个集群间分配压缩任务;其中b12集群使用default队列,b1集群使用datalake队列,a4集群使用ten_iap.datalake队列。
|
||||
|
||||
Hudi数据同步服务使用的数据表详情如下
|
||||
hudi_collect_build_b12.tb_app_collect_table_info表:记录同步任务配置信息
|
||||
id:主键
|
||||
alias:表别名,同样可以唯一标识该记录
|
||||
flink_job_id:tb_app_flink_job_config表主键,对应的Flink任务
|
||||
hudi_job_id:hudi:tb_app_hudi_job_config表主键,对应Hudi表的配置
|
||||
sync_yarn_job_id:tb_app_yarn_job_config表主键,同步任务的yarn资源配置
|
||||
compaction_yarn_job_id:tb_app_yarn_job_config表主键,压缩任务的yarn资源配置
|
||||
src_db:源端数据库名称
|
||||
src_type:源端数据库类型,取值有teledb、telepg
|
||||
src_schema:源端数据库schema名称
|
||||
src_table:源端表名
|
||||
src_pulsar_addr:pulsar地址
|
||||
src_topic:pulsar队列名称
|
||||
tgt_hdfs_path:Hudi表对应的hdfs路径
|
||||
status:逻辑删除状态(y为正常,n为删除)
|
||||
filter_field:过滤字段,用于指定源端消息中的某个字段,进行消息过滤,如B表,同步任务会使用CITY_ID字段,并过滤出该字段为广深东佛编码的消息
|
||||
filter_values:过滤值
|
||||
filter_type:过滤类型,INCLUDE表示包含指定过滤值的记录保留,EXCLUDE表示包含指定过滤值的记录丢弃
|
||||
bucket_number:Hudi bucket index的参数值
|
||||
partition_field:Hudi表分区字段,通常使用CITY_ID
|
||||
priority:优先级,整数,数字越大优先级越高,优先级高,该表对应在压缩调度队列中会排在更靠前的位置
|
||||
hive_db:目标端hive库名称
|
||||
hive_table:目标端hive表名
|
||||
tags:标签,使用英文逗号分隔,用于标识表的特殊属性
|
||||
|
||||
hudi_collect_build_b12.tb_app_collect_table_version表:记录跨天版本
|
||||
flink_job_id:tb_app_flink_job_config表主键,对应的Flink任务
|
||||
alias:表别名 tb_app_collect_table_info表alias字段,对应唯一同步任务
|
||||
version:版本,格式为yyyyMMdd,如2025年6月6日跨天版本,则该值为20250605
|
||||
op_ts:操作时间,接收到跨天标记的时间
|
||||
create_time:记录创建时间
|
||||
update_time:记录创建时间
|
||||
scheduled:是否已跨天,1为已跨天,0为未跨天
|
||||
|
||||
hudi_collect_build_b12.tb_app_flink_job_config表:Flink 任务配置
|
||||
id:主键
|
||||
name:Flink任务名称
|
||||
status:逻辑删除状态(y为正常,n为删除)
|
||||
application_id:flink任务对应的yarn任务的application id
|
||||
|
||||
hudi_collect_build_b12.tb_app_hudi_job_config表:Hudi表原生配置
|
||||
id:主键
|
||||
source_tasks:读取Pulsar消息的Flink算子并行度
|
||||
write_tasks:写Hudi表的Flink算子并行度
|
||||
keep_file_version:保留数据文件版本
|
||||
keep_commit_version:保留时间线版本
|
||||
|
||||
hudi_collect_build_b12.tb_app_hudi_sync_state表:同步、压缩任务运行状态
|
||||
id:主键,由flink_job_id和alias使用短横线连接而成,格式为:flink_job_id-alias
|
||||
message_id:最新消费到的Pulsar消息的message id
|
||||
source_start_time:同步任务启动时间
|
||||
source_receive_time:同步任务最新接收到消息队列消息的时间
|
||||
source_checkpoint_time:同步任务对应Flink任务最近一次执行checkpoint的时间,这个时间每15分钟更新一次,可以用来判断flink任务是否还在运行
|
||||
source_publish_time:同步任务接收到的最新一条消息队列的消息被发布到队列中的时间
|
||||
source_op_time:同步任务接收到的最新一条消息队列的消息在源端产生的时间
|
||||
source_application_id:同步任务对应Flink任务对应的yarn任务的application id
|
||||
source_cluster:同步任务运行的yarn集群
|
||||
compaction_start_time:最近一次压缩任务启动时间
|
||||
compaction_finish_time:最近一次压缩任务完成时间
|
||||
compaction_application_id:压缩任务对应的Flink任务对应的yarn任务的application id
|
||||
compaction_cluster:压缩任务运行所在的yarn集群
|
||||
|
||||
hudi_collect_build_b12.tb_app_yarn_job_config表:同步、压缩任务对应的yarn任务资源配置
|
||||
id:主键
|
||||
job_manager_memory:Job Manager内存(MB)
|
||||
task_manager_memory:Task Manager内存(MB)
|
||||
|
||||
当前时间为:{}
|
||||
|
||||
""", LocalDateTime.now().format(formatter)));
|
||||
if (ObjectUtil.isNotNull(knowledgeId)) {
|
||||
var vo = messages.select(message -> StrUtil.equals(message.getRole(), "user")).getLastOptional().orElseThrow();
|
||||
var documents = knowledgeService.query(knowledgeId, vo.getContent(), 10, 0.5);
|
||||
if (ObjectUtil.isNotEmpty(documents)) {
|
||||
builder.append(StrUtil.format("""
|
||||
以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问:
|
||||
{}
|
||||
|
||||
""", documents.makeString("\n")));
|
||||
}
|
||||
}
|
||||
return chatClient.prompt()
|
||||
.system(builder.toString())
|
||||
.tools(new TableTool())
|
||||
""", LocalDateTime.now().format(formatter)))
|
||||
.messages(
|
||||
messages
|
||||
.collect(message -> StrUtil.equals(message.getRole(), ROLE_ASSISTANT)
|
||||
@@ -99,6 +166,14 @@ public class ChatController {
|
||||
.collect(message -> (Message) message)
|
||||
.toList()
|
||||
);
|
||||
if (ObjectUtil.isNotNull(knowledgeId)) {
|
||||
spec.tools(new KnowledgeTool(knowledgeId));
|
||||
}
|
||||
spec.tools(
|
||||
new TableTool(),
|
||||
new YarnTool()
|
||||
);
|
||||
return spec;
|
||||
}
|
||||
|
||||
@PostMapping("sync")
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.lanyuanxiaoyao.service.ai.chat.tools;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
|
||||
import com.lanyuanxiaoyao.service.forest.service.KnowledgeService;
|
||||
import org.springframework.ai.tool.annotation.Tool;
|
||||
import org.springframework.ai.tool.annotation.ToolParam;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250606
|
||||
*/
|
||||
public class KnowledgeTool {
|
||||
private final Long knowledgeId;
|
||||
|
||||
public KnowledgeTool(Long knowledgeId) {
|
||||
this.knowledgeId = knowledgeId;
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
从知识库中检索相关外部知识
|
||||
""")
|
||||
public String queryKnowledge(
|
||||
@ToolParam(description = """
|
||||
精炼准确的知识库关键词,使用逗号分隔,尽可能全面地覆盖需要获取的知识
|
||||
""")
|
||||
String query
|
||||
) {
|
||||
KnowledgeService knowledgeService = AiChatApplication.getBean(KnowledgeService.class);
|
||||
var documents = knowledgeService.query(knowledgeId, query, 10, 0.5);
|
||||
if (ObjectUtil.isNotEmpty(documents)) {
|
||||
return StrUtil.format("""
|
||||
以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问:
|
||||
{}
|
||||
""", documents.makeString("\n"));
|
||||
}
|
||||
return "";
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,11 @@
|
||||
package com.lanyuanxiaoyao.service.ai.chat.tools;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.ai.tool.annotation.Tool;
|
||||
@@ -13,10 +17,11 @@ import org.springframework.ai.tool.annotation.ToolParam;
|
||||
*/
|
||||
public class TableTool {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TableTool.class);
|
||||
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
|
||||
@Tool(description = """
|
||||
执行SQL语句,获取查询结果;结果可能是一行或多行,行内以逗号分隔字段。
|
||||
""", returnDirect = true)
|
||||
""")
|
||||
public String executeJdbc(
|
||||
@ToolParam(description = """
|
||||
完整的MySQL查询语句,禁止使用除select外的任何语句。
|
||||
@@ -30,6 +35,66 @@ public class TableTool {
|
||||
return result;
|
||||
}
|
||||
|
||||
private void queryTableMeta() {
|
||||
@Tool(description = """
|
||||
查询表(Hudi表、Hive表、逻辑表等)数量
|
||||
""")
|
||||
private String tableCount(
|
||||
@ToolParam(description = """
|
||||
查询类型,取值如下
|
||||
Hudi表:hudi
|
||||
Hive表:hive
|
||||
逻辑表:logic
|
||||
""") String type
|
||||
) {
|
||||
logger.info("Enter method: tableCount[type]. type:{}", type);
|
||||
var infoService = AiChatApplication.getBean(InfoService.class);
|
||||
return switch (type) {
|
||||
case "logic" -> StrUtil.format("""
|
||||
逻辑表共{}张,其中重点表{}张
|
||||
""", infoService.tableCount(), infoService.tableFocusCount());
|
||||
case "hive" -> StrUtil.format("""
|
||||
Hive表共{}张,其中重点表{}张
|
||||
""", infoService.hiveCount(), infoService.hiveFocusCount());
|
||||
case "hudi" -> StrUtil.format("""
|
||||
Hudi表共{}张,其中重点表{}张
|
||||
""", infoService.hudiCount(), infoService.hudiFocusCount());
|
||||
default -> throw new IllegalStateException("Unexpected value: " + type);
|
||||
};
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
查询Hudi表跨天情况
|
||||
返回结果包含未接收到跨天标记的表数量和接收到跨天标记但未完成跨天的表数量
|
||||
一张表必须先接收到跨天标记才能完成跨天,两者数量均为0才算所有表完成跨天
|
||||
""")
|
||||
public String version(
|
||||
@ToolParam(description = """
|
||||
查询指定日期的跨天情况,格式为yyyyMMdd(如2025年6月6日取值为20250606),如果参数为空则查询当天跨天情况
|
||||
""", required = false)
|
||||
String date,
|
||||
@ToolParam(description = """
|
||||
表类型,取值如下
|
||||
普通表:normal
|
||||
重点表:focus
|
||||
""")
|
||||
String type
|
||||
) {
|
||||
logger.info("Enter method: version[date, type]. date:{},type:{}", date, type);
|
||||
InfoService infoService = AiChatApplication.getBean(InfoService.class);
|
||||
String version = date;
|
||||
if (StrUtil.isBlank(version)) {
|
||||
version = LocalDateTime.now().minusDays(1).format(FORMATTER);
|
||||
} else {
|
||||
version = LocalDate.parse(version, FORMATTER).minusDays(1).format(FORMATTER);
|
||||
}
|
||||
return switch (type) {
|
||||
case "normal" -> StrUtil.format("""
|
||||
未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{}
|
||||
""", infoService.unReceiveVersionNormalTableCount(version), infoService.unScheduledNormalTableCount(version));
|
||||
case "focus" -> StrUtil.format("""
|
||||
未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{}
|
||||
""", infoService.unReceiveVersionFocusTableCount(version), infoService.unScheduledFocusTableCount(version));
|
||||
default -> throw new IllegalStateException("Unexpected value: " + type);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
package com.lanyuanxiaoyao.service.ai.chat.tools;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue;
|
||||
import com.lanyuanxiaoyao.service.forest.service.YarnService;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.ai.tool.annotation.Tool;
|
||||
import org.springframework.ai.tool.annotation.ToolParam;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250606
|
||||
*/
|
||||
public class YarnTool {
|
||||
private static final Logger logger = LoggerFactory.getLogger(YarnTool.class);
|
||||
|
||||
@Tool(description = """
|
||||
查询yarn集群整体资源情况,返回值为资源占用率(%)
|
||||
""")
|
||||
public Double yarnStatus(
|
||||
@ToolParam(description = """
|
||||
yarn集群名称,取值有b12、b1和a4
|
||||
""") String cluster
|
||||
) {
|
||||
logger.info("Enter method: yarnStatus[cluster]. cluster:{}", cluster);
|
||||
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
|
||||
YarnRootQueue status = yarnService.cluster(cluster);
|
||||
return (status.getUsedCapacity() * 100.0) / status.getCapacity();
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
查询yarn集群中指定队列的资源情况,返回值为资源占用率(%)
|
||||
""")
|
||||
public Double yarnQueueStatus(
|
||||
@ToolParam(description = """
|
||||
yarn集群名称,取值有b12、b1和a4
|
||||
""") String cluster,
|
||||
@ToolParam(description = """
|
||||
yarn队列名称
|
||||
""") String queue
|
||||
) {
|
||||
logger.info("Enter method: yarnQueueStatus[cluster, queue]. cluster:{},queue:{}", cluster, queue);
|
||||
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
|
||||
YarnQueue status = yarnService.queueDetail(cluster, queue);
|
||||
return (status.getAbsoluteCapacity() * 100.0) / status.getAbsoluteMaxCapacity();
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
查询指定集群上同步任务或压缩任务的运行情况
|
||||
""")
|
||||
public String yarnTaskStatus(
|
||||
@ToolParam(description = """
|
||||
yarn集群名称,取值有b12、b1和a4
|
||||
""") String cluster,
|
||||
@ToolParam(description = """
|
||||
查询任务种类,取值如下
|
||||
同步任务:Sync
|
||||
压缩任务:Compaction
|
||||
""") String type
|
||||
) {
|
||||
logger.info("Enter method: yarnTaskStatus[cluster, type]. cluster:{},type:{}", cluster, type);
|
||||
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
|
||||
ImmutableList<YarnApplication> applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) && StrUtil.contains(app.getName(), type));
|
||||
return StrUtil.format(
|
||||
"""
|
||||
运行中:{},调度中:{}
|
||||
""",
|
||||
applications.count(app -> StrUtil.equals(app.getState(), "RUNNING")),
|
||||
applications.count(app -> StrUtil.equals(app.getState(), "ACCEPTED"))
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -12,4 +12,4 @@ spring:
|
||||
model: 'Qwen3-1.7-vllm'
|
||||
mvc:
|
||||
async:
|
||||
request-timeout: 300000
|
||||
request-timeout: 3600000
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.lanyuanxiaoyao.service.ai.chat;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250606
|
||||
*/
|
||||
public class TestDatetimeFormat {
|
||||
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.out.println(LocalDate.parse("20250606", FORMATTER).format(FORMATTER));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user