From e01a883d37a87af4d492c5e2af960fae386951e0 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Fri, 6 Jun 2025 17:20:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(chat):=20=E4=BC=98=E5=8C=96=E6=8F=90?= =?UTF-8?q?=E7=A4=BA=E8=AF=8D=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=A4=96=E9=83=A8?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ai/chat/controller/ChatController.java | 38 +++++---- .../service/ai/chat/tools/TableTool.java | 69 ++++++++++++++++- .../service/ai/chat/tools/YarnTool.java | 77 +++++++++++++++++++ .../src/main/resources/application.yml | 2 +- .../service/ai/chat/TestDatetimeFormat.java | 16 ++++ 5 files changed, 185 insertions(+), 17 deletions(-) create mode 100644 service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/YarnTool.java create mode 100644 service-ai/service-ai-chat/src/test/java/com/lanyuanxiaoyao/service/ai/chat/TestDatetimeFormat.java diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java index 8f89c1b..9d26256 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java @@ -4,6 +4,8 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.ai.chat.entity.MessageVO; import com.lanyuanxiaoyao.service.ai.chat.tools.KnowledgeTool; +import com.lanyuanxiaoyao.service.ai.chat.tools.TableTool; +import com.lanyuanxiaoyao.service.ai.chat.tools.YarnTool; import com.lanyuanxiaoyao.service.forest.service.KnowledgeService; import java.io.IOException; import java.time.LocalDateTime; @@ -62,24 +64,28 @@ public class ChatController { 对话语言:中文 - Hudi数据同步服务 实现从源端数据库(TeleDB、TelePG)到Hudi表的数据实时同步,并通过Hudi-Hive插件将Hudi表封装为Hive外表(即目标端),供外部系统进行SQL查询。 - 数据同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的数据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表。 - 数据压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将一张指定的Hudi表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部系统查询。 - 服务管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等信息,提供友好的前端交互网页。 + Hudi数据同步服务 + 实现从源端数据库(TeleDB、TelePG)到Hudi表的数据实时同步,并通过Hudi-Hive插件将Hudi表封为Hive外表(即目标端),供外部系统进行SQL查询。 + 数据同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表。 + 数据压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将张指定的Hudi表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部统查询。 + 服务管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等息,提供友好的前端交互网页。 源端(上游):类似TeleDB、TelePG这些提供原始数据的数据库。 目标端(下游):数据同步到Hudi表后提供给外部系统查询用的Hive表。 - TeleDB:中国电信自研分布式MySQL集群组件,逻辑上的一张表在物理上被水平分片(Sharding)为多个“set”表存储在多个MySQL节点,支持弹性扩容和容灾。 + TeleDB:中国电信自研分布式MySQL集群组件,逻辑上的一张表在物理上被水平分片(Sharding)为多个“set”表存储在个MySQL节点,支持弹性扩容和容灾。 TelePG:中国电信自研分布式PostgreSQL集群组件,其架构和功能特性与TeleDB高度相似。 - 逻辑表:一张逻辑表对应一张TeleDB或TelePG上的业务表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常会按地市进行分片(分区)。 - Hudi表:逻辑表数据经同步任务处理后实际存储的位置,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B表也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他地市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引。 - Hive表:通过Hudi-Hive插件创建的Hive外表,作为下游系统的唯一查询入口,该表逻辑上对应源端的一张逻辑表,它将Hudi服务内部可能存在的多个物理Hudi表(如A表、B表或地市分表)聚合封装成一个逻辑视图,透明地对外提供完整的逻辑表数据查询。 + 逻辑表:一张逻辑表对应一张TeleDB或TelePG上的业务表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常按地市进行分片(分区)。 + Hudi表:逻辑表数据经同步任务处理后实际存储的位置,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引。 + Hive表:通过Hudi-Hive插件创建的Hive外表,作为下游系统的唯一查询入口,该表逻辑上对应源端的一张逻辑表,它Hudi服务内部可能存在的多个物理Hudi表(如A表、B表或地市分表)聚合封装成一个逻辑视图,透明地对外提供完整的辑表数据查询。 + 重点表:根据业务系统的要求,对于有的表及时性和数据完整性有更高的要求,这些表被称为重点表,在tb_app_collect_table_info表中的tags字段,包含“FOCUS”字符的是重点表。 Flink 任务:即数据同步任务,根据逻辑表的数据量通常有如下规则: - 大数据量:采用 1逻辑表:1 Flink任务 模式。该Flink任务内聚合处理该逻辑表对应的所有Hudi表(如A表 + B表或多个地市表)的同步子任务。 - 小数据量:采用 N逻辑表:1 Flink任务 模式。一个Flink任务内聚合处理多张低数据量逻辑表对应的所有Hudi表同步自任务。 - Pulsar队列:来自源端的TeleDB逻辑表的增量日志会被“Canal同步组件”实时写入Pulsar队列,队列中的一条消息对应一条增量日志,有新增(I)、更新(U)、删除(D)、DDL(DDL语句)几种类型操作;TelePG也按同样的逻辑通过“PG Sync”组件将增量日志同步到对应的队列中。 - 压缩调度服务:服务管理平台中有一个专门的服务用于根据指定的策略调度数据压缩任务的运行。 - 压缩调度:压缩任务占用资源大,耗时长,为了平衡资源占用和压缩效率,压缩调度服务会根据不同的策略调用不同Hudi表的压缩任务,通常情况下,从2点开始,每3个小时调度一次全部Hudi表的压缩任务。 - 跨天调度:由于压缩任务执行耗时长,0点的时候,外部系统在Hive表中能查到的最新数据往往还没到0点,为了 + 大数据量:采用 1逻辑表:1 Flink任务 模式。该Flink任务内聚合处理该逻辑表对应的所有Hudi表(如A表 + B表或多地市表)的同步子任务。 + 小数据量:采用 N逻辑表:1 Flink任务 模式。一个Flink任务内聚合处理多张低数据量逻辑表对应的所有Hudi表同步自务。 + Pulsar队列(消息队列):源端TeleDB逻辑表增量日志(包含新增-I、更新-U、删除-D、DDL操作类型)由Canal同步组件实时写入Pulsar队列。TelePG逻辑表增量日志由PGSync组件以相同逻辑同步到对应队列。 + 压缩调度服务:service-scheduler(单实例):按策略将压缩任务放入预调度队列,定时将预调度任务转移至各集群对应的压缩任务队列;service-launcher(与集群一一对应):定时轮询对应集群的压缩任务队列,发现任务即调度执行;service-queue:提供队列机制;scheduler与launcher均基于集群资源状态动态调节任务产生与执行速率,利用队列缓冲,避免资源超限。 + 压缩调度:压缩任务耗时长、资源大。为平衡效率与资源,调度服务通常从凌晨2点起,每3小时调度一次全部Hudi表的压缩任务。 + 跨天调度:为确保关键表数据在0点后及时可用(此时Hive最新数据常未达0点),0点至2点间,对重点表进行更高频压缩调度。此高频率调度持续直至目标表被标记为“已跨天” + 跨天判断:按照先后次序1. 源端同步组件(Canal/PGSync)判断源表数据是否跨天。若跨天,向队列写入跨天消息;2.同步任务接收跨天消息,在tb_app_collect_table_version表插入记录;3.独立程序判断Hudi表数据是否已跨天。若跨天,更新tb_app_collect_table_version对应记录状态。此时逻辑表标记为“已跨天”;一张表必须先接收到跨天标记,然后才能完成跨天。 + 集群:指Hadoop Yarn集群,同步任务仅在b12运行,压缩任务主要在b12运行,部分重点表在b1或a4运行,调度服务根据资源动态在多个集群间分配压缩任务;其中b12集群使用default队列,b1集群使用datalake队列,a4集群使用ten_iap.datalake队列。 当前时间为:{} @@ -95,6 +101,10 @@ public class ChatController { if (ObjectUtil.isNotNull(knowledgeId)) { spec.tools(new KnowledgeTool(knowledgeId)); } + spec.tools( + new TableTool(), + new YarnTool() + ); return spec; } diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java index 315c43c..b683b40 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java @@ -1,7 +1,11 @@ package com.lanyuanxiaoyao.service.ai.chat.tools; +import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication; import com.lanyuanxiaoyao.service.forest.service.InfoService; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.ai.tool.annotation.Tool; @@ -13,8 +17,9 @@ import org.springframework.ai.tool.annotation.ToolParam; */ public class TableTool { private static final Logger logger = LoggerFactory.getLogger(TableTool.class); + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); - @Tool(description = """ + /* @Tool(description = """ 执行SQL语句,获取查询结果;结果可能是一行或多行,行内以逗号分隔字段。 """) public String executeJdbc( @@ -28,8 +33,68 @@ public class TableTool { .makeString("\n"); logger.info("SQL result: \n{}", result); return result; + } */ + + @Tool(description = """ + 查询表(Hudi表、Hive表、逻辑表等)数量 + """) + private String tableCount( + @ToolParam(description = """ + 查询类型,取值如下 + Hudi表:hudi + Hive表:hive + 逻辑表:logic + """) String type + ) { + logger.info("Enter method: tableCount[type]. type:{}", type); + var infoService = AiChatApplication.getBean(InfoService.class); + return switch (type) { + case "logic" -> StrUtil.format(""" + 逻辑表共{}张,其中重点表{}张 + """, infoService.tableCount(), infoService.tableFocusCount()); + case "hive" -> StrUtil.format(""" + Hive表共{}张,其中重点表{}张 + """, infoService.hiveCount(), infoService.hiveFocusCount()); + case "hudi" -> StrUtil.format(""" + Hudi表共{}张,其中重点表{}张 + """, infoService.hudiCount(), infoService.hudiFocusCount()); + default -> throw new IllegalStateException("Unexpected value: " + type); + }; } - private void queryTableMeta() { + @Tool(description = """ + 查询Hudi表跨天情况 + 返回结果包含未接收到跨天标记的表数量和接收到跨天标记但未完成跨天的表数量 + 一张表必须先接收到跨天标记才能完成跨天,两者数量均为0才算所有表完成跨天 + """) + public String version( + @ToolParam(description = """ + 查询指定日期的跨天情况,格式为yyyyMMdd(如2025年6月6日取值为20250606),如果参数为空则查询当天跨天情况 + """, required = false) + String date, + @ToolParam(description = """ + 表类型,取值如下 + 普通表:normal + 重点表:focus + """) + String type + ) { + logger.info("Enter method: version[date, type]. date:{},type:{}", date, type); + InfoService infoService = AiChatApplication.getBean(InfoService.class); + String version = date; + if (StrUtil.isBlank(version)) { + version = LocalDateTime.now().minusDays(1).format(FORMATTER); + } else { + version = LocalDate.parse(version, FORMATTER).minusDays(1).format(FORMATTER); + } + return switch (type) { + case "normal" -> StrUtil.format(""" + 未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{} + """, infoService.unReceiveVersionNormalTableCount(version), infoService.unScheduledNormalTableCount(version)); + case "focus" -> StrUtil.format(""" + 未接收到跨天标记的表数量:{},接收到跨天标记但未完成跨天的表数量:{} + """, infoService.unReceiveVersionFocusTableCount(version), infoService.unScheduledFocusTableCount(version)); + default -> throw new IllegalStateException("Unexpected value: " + type); + }; } } diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/YarnTool.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/YarnTool.java new file mode 100644 index 0000000..bf6c560 --- /dev/null +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/YarnTool.java @@ -0,0 +1,77 @@ +package com.lanyuanxiaoyao.service.ai.chat.tools; + +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnApplication; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnQueue; +import com.lanyuanxiaoyao.service.configuration.entity.yarn.YarnRootQueue; +import com.lanyuanxiaoyao.service.forest.service.YarnService; +import org.eclipse.collections.api.list.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.ai.tool.annotation.Tool; +import org.springframework.ai.tool.annotation.ToolParam; + +/** + * @author lanyuanxiaoyao + * @version 20250606 + */ +public class YarnTool { + private static final Logger logger = LoggerFactory.getLogger(YarnTool.class); + + @Tool(description = """ + 查询yarn集群整体资源情况,返回值为资源占用率(%) + """) + public Double yarnStatus( + @ToolParam(description = """ + yarn集群名称,取值有b12、b1和a4 + """) String cluster + ) { + logger.info("Enter method: yarnStatus[cluster]. cluster:{}", cluster); + YarnService yarnService = AiChatApplication.getBean(YarnService.class); + YarnRootQueue status = yarnService.cluster(cluster); + return (status.getUsedCapacity() * 100.0) / status.getCapacity(); + } + + @Tool(description = """ + 查询yarn集群中指定队列的资源情况,返回值为资源占用率(%) + """) + public Double yarnQueueStatus( + @ToolParam(description = """ + yarn集群名称,取值有b12、b1和a4 + """) String cluster, + @ToolParam(description = """ + yarn队列名称 + """) String queue + ) { + logger.info("Enter method: yarnQueueStatus[cluster, queue]. cluster:{},queue:{}", cluster, queue); + YarnService yarnService = AiChatApplication.getBean(YarnService.class); + YarnQueue status = yarnService.queueDetail(cluster, queue); + return (status.getAbsoluteCapacity() * 100.0) / status.getAbsoluteMaxCapacity(); + } + + @Tool(description = """ + 查询指定集群上同步任务或压缩任务的运行情况 + """) + public String yarnTaskStatus( + @ToolParam(description = """ + yarn集群名称,取值有b12、b1和a4 + """) String cluster, + @ToolParam(description = """ + 查询任务种类,取值如下 + 同步任务:sync + 压缩任务:compaction + """) String type + ) { + logger.info("Enter method: yarnTaskStatus[cluster, type]. cluster:{},type:{}", cluster, type); + YarnService yarnService = AiChatApplication.getBean(YarnService.class); + ImmutableList applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) || StrUtil.contains(app.getName(), type)); + return StrUtil.format( + """ + 运行中:{},调度中:{} + """, + applications.count(app -> StrUtil.equals(app.getState(), "RUNNING")), + applications.count(app -> StrUtil.equals(app.getState(), "ACCEPTED")) + ); + } +} diff --git a/service-ai/service-ai-chat/src/main/resources/application.yml b/service-ai/service-ai-chat/src/main/resources/application.yml index 2cc9504..9f2a28c 100644 --- a/service-ai/service-ai-chat/src/main/resources/application.yml +++ b/service-ai/service-ai-chat/src/main/resources/application.yml @@ -12,4 +12,4 @@ spring: model: 'Qwen3-1.7-vllm' mvc: async: - request-timeout: 300000 + request-timeout: 3600000 diff --git a/service-ai/service-ai-chat/src/test/java/com/lanyuanxiaoyao/service/ai/chat/TestDatetimeFormat.java b/service-ai/service-ai-chat/src/test/java/com/lanyuanxiaoyao/service/ai/chat/TestDatetimeFormat.java new file mode 100644 index 0000000..c8b5dbd --- /dev/null +++ b/service-ai/service-ai-chat/src/test/java/com/lanyuanxiaoyao/service/ai/chat/TestDatetimeFormat.java @@ -0,0 +1,16 @@ +package com.lanyuanxiaoyao.service.ai.chat; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; + +/** + * @author lanyuanxiaoyao + * @version 20250606 + */ +public class TestDatetimeFormat { + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); + + public static void main(String[] args) { + System.out.println(LocalDate.parse("20250606", FORMATTER).format(FORMATTER)); + } +}