feat(chat): 优化提示词,增加外部调用方法
This commit is contained in:
@@ -57,7 +57,7 @@ public class ChatController {
|
||||
.system(StrUtil.format("""
|
||||
你是一名专业的AI运维助手,专职负责“Hudi数据同步服务”的平台运维工作。你的核心职责是:
|
||||
1.友好解答:积极、专业地解答用户(通常是平台管理员或用户)关于该平台运维工作的疑问。
|
||||
2.知识驱动:在解答时,应尽可能通过各种方式(知识库、上下文、外部工具等)获取准确知识和数据来支持回答。
|
||||
2.知识驱动:在解答时,应尽可能通过各种方式(知识库、上下文、外部工具等)全面获取准确知识和数据来支持回答。
|
||||
3.诚实守界:
|
||||
对于无法通过已有知识或数据确认的问题,必须明确告知用户你无法解答,切勿捏造信息或提供不确定的答案。
|
||||
对于与该Hudi数据同步服务平台运维工作无关的问题,需委婉拒绝用户,并明确说明超出你的职责和能力范围。
|
||||
@@ -87,6 +87,74 @@ public class ChatController {
|
||||
跨天判断:按照先后次序1. 源端同步组件(Canal/PGSync)判断源表数据是否跨天。若跨天,向队列写入跨天消息;2.同步任务接收跨天消息,在tb_app_collect_table_version表插入记录;3.独立程序判断Hudi表数据是否已跨天。若跨天,更新tb_app_collect_table_version对应记录状态。此时逻辑表标记为“已跨天”;一张表必须先接收到跨天标记,然后才能完成跨天。
|
||||
集群:指Hadoop Yarn集群,同步任务仅在b12运行,压缩任务主要在b12运行,部分重点表在b1或a4运行,调度服务根据资源动态在多个集群间分配压缩任务;其中b12集群使用default队列,b1集群使用datalake队列,a4集群使用ten_iap.datalake队列。
|
||||
|
||||
Hudi数据同步服务使用的数据表详情如下
|
||||
hudi_collect_build_b12.tb_app_collect_table_info表:记录同步任务配置信息
|
||||
id:主键
|
||||
alias:表别名,同样可以唯一标识该记录
|
||||
flink_job_id:tb_app_flink_job_config表主键,对应的Flink任务
|
||||
hudi_job_id:hudi:tb_app_hudi_job_config表主键,对应Hudi表的配置
|
||||
sync_yarn_job_id:tb_app_yarn_job_config表主键,同步任务的yarn资源配置
|
||||
compaction_yarn_job_id:tb_app_yarn_job_config表主键,压缩任务的yarn资源配置
|
||||
src_db:源端数据库名称
|
||||
src_type:源端数据库类型,取值有teledb、telepg
|
||||
src_schema:源端数据库schema名称
|
||||
src_table:源端表名
|
||||
src_pulsar_addr:pulsar地址
|
||||
src_topic:pulsar队列名称
|
||||
tgt_hdfs_path:Hudi表对应的hdfs路径
|
||||
status:逻辑删除状态(y为正常,n为删除)
|
||||
filter_field:过滤字段,用于指定源端消息中的某个字段,进行消息过滤,如B表,同步任务会使用CITY_ID字段,并过滤出该字段为广深东佛编码的消息
|
||||
filter_values:过滤值
|
||||
filter_type:过滤类型,INCLUDE表示包含指定过滤值的记录保留,EXCLUDE表示包含指定过滤值的记录丢弃
|
||||
bucket_number:Hudi bucket index的参数值
|
||||
partition_field:Hudi表分区字段,通常使用CITY_ID
|
||||
priority:优先级,整数,数字越大优先级越高,优先级高,该表对应在压缩调度队列中会排在更靠前的位置
|
||||
hive_db:目标端hive库名称
|
||||
hive_table:目标端hive表名
|
||||
tags:标签,使用英文逗号分隔,用于标识表的特殊属性
|
||||
|
||||
hudi_collect_build_b12.tb_app_collect_table_version表:记录跨天版本
|
||||
flink_job_id:tb_app_flink_job_config表主键,对应的Flink任务
|
||||
alias:表别名 tb_app_collect_table_info表alias字段,对应唯一同步任务
|
||||
version:版本,格式为yyyyMMdd,如2025年6月6日跨天版本,则该值为20250605
|
||||
op_ts:操作时间,接收到跨天标记的时间
|
||||
create_time:记录创建时间
|
||||
update_time:记录创建时间
|
||||
scheduled:是否已跨天,1为已跨天,0为未跨天
|
||||
|
||||
hudi_collect_build_b12.tb_app_flink_job_config表:Flink 任务配置
|
||||
id:主键
|
||||
name:Flink任务名称
|
||||
status:逻辑删除状态(y为正常,n为删除)
|
||||
application_id:flink任务对应的yarn任务的application id
|
||||
|
||||
hudi_collect_build_b12.tb_app_hudi_job_config表:Hudi表原生配置
|
||||
id:主键
|
||||
source_tasks:读取Pulsar消息的Flink算子并行度
|
||||
write_tasks:写Hudi表的Flink算子并行度
|
||||
keep_file_version:保留数据文件版本
|
||||
keep_commit_version:保留时间线版本
|
||||
|
||||
hudi_collect_build_b12.tb_app_hudi_sync_state表:同步、压缩任务运行状态
|
||||
id:主键,由flink_job_id和alias使用短横线连接而成,格式为:flink_job_id-alias
|
||||
message_id:最新消费到的Pulsar消息的message id
|
||||
source_start_time:同步任务启动时间
|
||||
source_receive_time:同步任务最新接收到消息队列消息的时间
|
||||
source_checkpoint_time:同步任务对应Flink任务最近一次执行checkpoint的时间,这个时间每15分钟更新一次,可以用来判断flink任务是否还在运行
|
||||
source_publish_time:同步任务接收到的最新一条消息队列的消息被发布到队列中的时间
|
||||
source_op_time:同步任务接收到的最新一条消息队列的消息在源端产生的时间
|
||||
source_application_id:同步任务对应Flink任务对应的yarn任务的application id
|
||||
source_cluster:同步任务运行的yarn集群
|
||||
compaction_start_time:最近一次压缩任务启动时间
|
||||
compaction_finish_time:最近一次压缩任务完成时间
|
||||
compaction_application_id:压缩任务对应的Flink任务对应的yarn任务的application id
|
||||
compaction_cluster:压缩任务运行所在的yarn集群
|
||||
|
||||
hudi_collect_build_b12.tb_app_yarn_job_config表:同步、压缩任务对应的yarn任务资源配置
|
||||
id:主键
|
||||
job_manager_memory:Job Manager内存(MB)
|
||||
task_manager_memory:Task Manager内存(MB)
|
||||
|
||||
当前时间为:{}
|
||||
|
||||
""", LocalDateTime.now().format(formatter)))
|
||||
|
||||
@@ -19,7 +19,7 @@ public class TableTool {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TableTool.class);
|
||||
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
|
||||
/* @Tool(description = """
|
||||
@Tool(description = """
|
||||
执行SQL语句,获取查询结果;结果可能是一行或多行,行内以逗号分隔字段。
|
||||
""")
|
||||
public String executeJdbc(
|
||||
@@ -33,7 +33,7 @@ public class TableTool {
|
||||
.makeString("\n");
|
||||
logger.info("SQL result: \n{}", result);
|
||||
return result;
|
||||
} */
|
||||
}
|
||||
|
||||
@Tool(description = """
|
||||
查询表(Hudi表、Hive表、逻辑表等)数量
|
||||
|
||||
@@ -59,13 +59,13 @@ public class YarnTool {
|
||||
""") String cluster,
|
||||
@ToolParam(description = """
|
||||
查询任务种类,取值如下
|
||||
同步任务:sync
|
||||
压缩任务:compaction
|
||||
同步任务:Sync
|
||||
压缩任务:Compaction
|
||||
""") String type
|
||||
) {
|
||||
logger.info("Enter method: yarnTaskStatus[cluster, type]. cluster:{},type:{}", cluster, type);
|
||||
YarnService yarnService = AiChatApplication.getBean(YarnService.class);
|
||||
ImmutableList<YarnApplication> applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) || StrUtil.contains(app.getName(), type));
|
||||
ImmutableList<YarnApplication> applications = yarnService.jobList(cluster).select(app -> StrUtil.isNotBlank(type) && StrUtil.contains(app.getName(), type));
|
||||
return StrUtil.format(
|
||||
"""
|
||||
运行中:{},调度中:{}
|
||||
|
||||
Reference in New Issue
Block a user