From 72c23d916aa27951ca38013a5a7a7b629e0ab419 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Fri, 6 Jun 2025 19:56:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(chat):=20=E4=BC=98=E5=8C=96=E6=8F=90?= =?UTF-8?q?=E7=A4=BA=E8=AF=8D=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=A4=96=E9=83=A8?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ai/chat/controller/ChatController.java | 70 ++++++++++++++++++- .../service/ai/chat/tools/TableTool.java | 4 +- .../service/ai/chat/tools/YarnTool.java | 6 +- 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java index 9d26256..cf85283 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/controller/ChatController.java @@ -57,7 +57,7 @@ public class ChatController { .system(StrUtil.format(""" 你是一名专业的AI运维助手,专职负责“Hudi数据同步服务”的平台运维工作。你的核心职责是: 1.友好解答:积极、专业地解答用户(通常是平台管理员或用户)关于该平台运维工作的疑问。 - 2.知识驱动:在解答时,应尽可能通过各种方式(知识库、上下文、外部工具等)获取准确知识和数据来支持回答。 + 2.知识驱动:在解答时,应尽可能通过各种方式(知识库、上下文、外部工具等)全面获取准确知识和数据来支持回答。 3.诚实守界: 对于无法通过已有知识或数据确认的问题,必须明确告知用户你无法解答,切勿捏造信息或提供不确定的答案。 对于与该Hudi数据同步服务平台运维工作无关的问题,需委婉拒绝用户,并明确说明超出你的职责和能力范围。 @@ -87,6 +87,74 @@ public class ChatController { 跨天判断:按照先后次序1. 源端同步组件(Canal/PGSync)判断源表数据是否跨天。若跨天,向队列写入跨天消息;2.同步任务接收跨天消息,在tb_app_collect_table_version表插入记录;3.独立程序判断Hudi表数据是否已跨天。若跨天,更新tb_app_collect_table_version对应记录状态。此时逻辑表标记为“已跨天”;一张表必须先接收到跨天标记,然后才能完成跨天。 集群:指Hadoop Yarn集群,同步任务仅在b12运行,压缩任务主要在b12运行,部分重点表在b1或a4运行,调度服务根据资源动态在多个集群间分配压缩任务;其中b12集群使用default队列,b1集群使用datalake队列,a4集群使用ten_iap.datalake队列。 + Hudi数据同步服务使用的数据表详情如下 + hudi_collect_build_b12.tb_app_collect_table_info表:记录同步任务配置信息 + id:主键 + alias:表别名,同样可以唯一标识该记录 + flink_job_id:tb_app_flink_job_config表主键,对应的Flink任务 + hudi_job_id:hudi:tb_app_hudi_job_config表主键,对应Hudi表的配置 + sync_yarn_job_id:tb_app_yarn_job_config表主键,同步任务的yarn资源配置 + compaction_yarn_job_id:tb_app_yarn_job_config表主键,压缩任务的yarn资源配置 + src_db:源端数据库名称 + src_type:源端数据库类型,取值有teledb、telepg + src_schema:源端数据库schema名称 + src_table:源端表名 + src_pulsar_addr:pulsar地址 + src_topic:pulsar队列名称 + tgt_hdfs_path:Hudi表对应的hdfs路径 + status:逻辑删除状态(y为正常,n为删除) + filter_field:过滤字段,用于指定源端消息中的某个字段,进行消息过滤,如B表,同步任务会使用CITY_ID字段,并过滤出该字段为广深东佛编码的消息 + filter_values:过滤值 + filter_type:过滤类型,INCLUDE表示包含指定过滤值的记录保留,EXCLUDE表示包含指定过滤值的记录丢弃 + bucket_number:Hudi bucket index的参数值 + partition_field:Hudi表分区字段,通常使用CITY_ID + priority:优先级,整数,数字越大优先级越高,优先级高,该表对应在压缩调度队列中会排在更靠前的位置 + hive_db:目标端hive库名称 + hive_table:目标端hive表名 + tags:标签,使用英文逗号分隔,用于标识表的特殊属性 + + hudi_collect_build_b12.tb_app_collect_table_version表:记录跨天版本 + flink_job_id:tb_app_flink_job_config表主键,对应的Flink任务 + alias:表别名 tb_app_collect_table_info表alias字段,对应唯一同步任务 + version:版本,格式为yyyyMMdd,如2025年6月6日跨天版本,则该值为20250605 + op_ts:操作时间,接收到跨天标记的时间 + create_time:记录创建时间 + update_time:记录创建时间 + scheduled:是否已跨天,1为已跨天,0为未跨天 + + hudi_collect_build_b12.tb_app_flink_job_config表:Flink 任务配置 + id:主键 + name:Flink任务名称 + status:逻辑删除状态(y为正常,n为删除) + application_id:flink任务对应的yarn任务的application id + + hudi_collect_build_b12.tb_app_hudi_job_config表:Hudi表原生配置 + id:主键 + source_tasks:读取Pulsar消息的Flink算子并行度 + write_tasks:写Hudi表的Flink算子并行度 + keep_file_version:保留数据文件版本 + keep_commit_version:保留时间线版本 + + hudi_collect_build_b12.tb_app_hudi_sync_state表:同步、压缩任务运行状态 + id:主键,由flink_job_id和alias使用短横线连接而成,格式为:flink_job_id-alias + message_id:最新消费到的Pulsar消息的message id + source_start_time:同步任务启动时间 + source_receive_time:同步任务最新接收到消息队列消息的时间 + source_checkpoint_time:同步任务对应Flink任务最近一次执行checkpoint的时间,这个时间每15分钟更新一次,可以用来判断flink任务是否还在运行 + source_publish_time:同步任务接收到的最新一条消息队列的消息被发布到队列中的时间 + source_op_time:同步任务接收到的最新一条消息队列的消息在源端产生的时间 + source_application_id:同步任务对应Flink任务对应的yarn任务的application id + source_cluster:同步任务运行的yarn集群 + compaction_start_time:最近一次压缩任务启动时间 + compaction_finish_time:最近一次压缩任务完成时间 + compaction_application_id:压缩任务对应的Flink任务对应的yarn任务的application id + compaction_cluster:压缩任务运行所在的yarn集群 + + hudi_collect_build_b12.tb_app_yarn_job_config表:同步、压缩任务对应的yarn任务资源配置 + id:主键 + job_manager_memory:Job Manager内存(MB) + task_manager_memory:Task Manager内存(MB) + 当前时间为:{} """, LocalDateTime.now().format(formatter))) diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java index b683b40..d4b9972 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/TableTool.java @@ -19,7 +19,7 @@ public class TableTool { private static final Logger logger = LoggerFactory.getLogger(TableTool.class); private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); - /* @Tool(description = """ + @Tool(description = """ 执行SQL语句,获取查询结果;结果可能是一行或多行,行内以逗号分隔字段。 """) public String executeJdbc( @@ -33,7 +33,7 @@ public class TableTool { .makeString("\n"); logger.info("SQL result: \n{}", result); return result; - } */ + } @Tool(description = """ 查询表(Hudi表、Hive表、逻辑表等)数量 diff --git a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/YarnTool.java b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/YarnTool.java index bf6c560..81cb8c4 100644 --- a/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/YarnTool.java +++ b/service-ai/service-ai-chat/src/main/java/com/lanyuanxiaoyao/service/ai/chat/tools/YarnTool.java @@ -59,13 +59,13 @@ public class YarnTool { """) String cluster, @ToolParam(description = """ 查询任务种类,取值如下 - 同步任务:sync - 压缩任务:compaction + 同步任务:Sync + 压缩任务:Compaction """) String type ) { logger.info("Enter method: yarnTaskStatus[cluster, type]. cluster:{},type:{}", cluster, type); YarnService yarnService = AiChatApplication.getBean(YarnService.class); - ImmutableList applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) || StrUtil.contains(app.getName(), type)); + ImmutableList applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) && StrUtil.contains(app.getName(), type)); return StrUtil.format( """ 运行中:{},调度中:{}