Compare commits
3 Commits
8b4827b164
...
90fea22de5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
90fea22de5 | ||
|
|
a35980a5f4 | ||
|
|
e359bed97c |
@@ -1,10 +1,13 @@
|
||||
package com.lanyuanxiaoyao.service.ai.chat;
|
||||
|
||||
import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.retry.annotation.EnableRetry;
|
||||
|
||||
/**
|
||||
@@ -16,8 +19,19 @@ import org.springframework.retry.annotation.EnableRetry;
|
||||
@EnableConfigurationProperties
|
||||
@EnableEncryptableProperties
|
||||
@EnableRetry
|
||||
public class AiChatApplication {
|
||||
public class AiChatApplication implements ApplicationContextAware {
|
||||
private static ApplicationContext context;
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(AiChatApplication.class, args);
|
||||
}
|
||||
|
||||
public static <T> T getBean(Class<T> clazz) {
|
||||
return context.getBean(clazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext context) throws BeansException {
|
||||
AiChatApplication.context = context;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.lanyuanxiaoyao.service.ai.chat.controller;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.entity.MessageVO;
|
||||
import com.lanyuanxiaoyao.service.ai.chat.tools.TableTool;
|
||||
import com.lanyuanxiaoyao.service.forest.service.KnowledgeService;
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
@@ -52,11 +53,27 @@ public class ChatController {
|
||||
private ChatClient.ChatClientRequestSpec buildRequest(Long knowledgeId, ImmutableList<MessageVO> messages) {
|
||||
var builder = StrUtil.builder()
|
||||
.append(StrUtil.format("""
|
||||
你是一名专业的AI运维助手,负责“Hudi数据同步服务平台”的运维工作;
|
||||
你是一名专业的AI运维助手,负责“Hudi数据同步服务”的运维工作;
|
||||
你将会友好地帮助用户解答关于该平台运维工作的问题,你会尽可能通过各种方式获取知识和数据来解答;
|
||||
对于无法通过已有知识回答的问题,你会提示用户你无法解答该问题,而不是虚构不存在的数据或答案;
|
||||
对于与该平台无关的问题,你会委婉地拒绝用户,并提示无法回答;
|
||||
你将始终在中文语境下进行对话。
|
||||
|
||||
Hudi数据同步服务将TeleDB、TelePG数据库中的数据同步到Hudi表,并通过Hudi-Hive插件,将Hudi表包装为Hive表提供给外系统查询使用;
|
||||
平台分为数据同步任务、数据压缩任务和服务管理平台三部分;以下是关于该平台的关键概念:
|
||||
同步任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务常驻运行在集群上,每个任务负责一张或多张表的数据同步,同步任务从指定的Pulsar队列中实时读取数据,经过数据转换和业务处理后,写入Hudi表;
|
||||
压缩任务:基于Flink开发,运行在Hadoop Yarn集群上,该任务根据指定的调度策略周期性定时启动,每个任务将一张指定的Hudi MOR表中的log数据压缩为parquet数据,当压缩完成时,上一次压缩到当前压缩之间被写入的数据才能被外部系统查询;
|
||||
管理平台:使用Spring Cloud框架开发的微服务集群,用于管理同步任务和压缩任务的运行、停止和状态监控等信息;
|
||||
TeleDB:中国电信自研的分布式MySQL集群组件,可以实现一张MySQL表在物理上拆分为多张MySQL表(这种类似分片的物理表被称作“set”)存储在多个MySQL服务中,实现可扩容、可容灾等特性;
|
||||
TelePG:中国电信自研的分布式PostgreSQL集群组件,其功能类似TeleDB;
|
||||
源端(上游):类似TeleDB、TelePG这些提供原始数据的数据库;
|
||||
目标端(下游):数据同步到Hudi表后提供给外部系统查询用的Hive表;
|
||||
Pulsar:TeleDB、TelePG产生的增量日志(类似MySQL的binlog)会写入Pulsar队列,同步任务再从Pulsar队列中读取增量日志进行处理;
|
||||
逻辑表:Hudi数据同步服务按逻辑表为概念配置同步任务,一张逻辑表对应一张TeleDB或TelePG上的数据表;逻辑表中包含广东所有地市的业务数据,数据量大的表通常会按地市进行分片(分区);
|
||||
Hudi表:逻辑表经过同步任务同步后写入Hudi表,根据逻辑表的数据量,数据量大的会根据地市分为A、B表,B表也被称为大表,通常包含广深东佛(广州、深圳、东莞、佛山)四个大区的数据,A表包含广东除了广深东佛外的其他地市,特大表,如acct_item,会按一个地市对应一个Hudi表;Hudi表统一配置为MOR表,使用Bucket Index索引;
|
||||
Hive表:即作为目标端的Hudi表,通常和逻辑表一一对应,通过使用Hive外表将经过拆分的Hudi A、B表重新合成为一个Hive表;
|
||||
Flink 任务:通常指同步任务,根据Hudi表数据量,数据量大的
|
||||
|
||||
当前时间为:{}
|
||||
|
||||
""", LocalDateTime.now().format(formatter)));
|
||||
@@ -65,7 +82,7 @@ public class ChatController {
|
||||
var documents = knowledgeService.query(knowledgeId, vo.getContent(), 10, 0.5);
|
||||
if (ObjectUtil.isNotEmpty(documents)) {
|
||||
builder.append(StrUtil.format("""
|
||||
以下是与用户问题有关的外部知识,优先利用该知识回答用户的提问:
|
||||
以下是与用户问题有关的外部知识,优先结合该知识回答用户的提问:
|
||||
{}
|
||||
|
||||
""", documents.makeString("\n")));
|
||||
@@ -73,6 +90,7 @@ public class ChatController {
|
||||
}
|
||||
return chatClient.prompt()
|
||||
.system(builder.toString())
|
||||
.tools(new TableTool())
|
||||
.messages(
|
||||
messages
|
||||
.collect(message -> StrUtil.equals(message.getRole(), ROLE_ASSISTANT)
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.lanyuanxiaoyao.service.ai.chat.tools;
|
||||
|
||||
import com.lanyuanxiaoyao.service.ai.chat.AiChatApplication;
|
||||
import com.lanyuanxiaoyao.service.forest.service.InfoService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.ai.tool.annotation.Tool;
|
||||
import org.springframework.ai.tool.annotation.ToolParam;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250605
|
||||
*/
|
||||
public class TableTool {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TableTool.class);
|
||||
|
||||
@Tool(description = """
|
||||
执行SQL语句,获取查询结果;结果可能是一行或多行,行内以逗号分隔字段。
|
||||
""", returnDirect = true)
|
||||
public String executeJdbc(
|
||||
@ToolParam(description = """
|
||||
完整的MySQL查询语句,禁止使用除select外的任何语句。
|
||||
""") String sql
|
||||
) {
|
||||
InfoService infoService = AiChatApplication.getBean(InfoService.class);
|
||||
String result = infoService.jdbc(sql)
|
||||
.collect(map -> map.valuesView().makeString(","))
|
||||
.makeString("\n");
|
||||
logger.info("SQL result: \n{}", result);
|
||||
return result;
|
||||
}
|
||||
|
||||
private void queryTableMeta() {
|
||||
}
|
||||
}
|
||||
@@ -94,7 +94,7 @@ public class KnowledgeBaseController {
|
||||
|
||||
@PostMapping("submit_text")
|
||||
public void submitText(
|
||||
@RequestParam(value = "id") Long id,
|
||||
@RequestParam("id") Long id,
|
||||
@RequestParam(value = "mode", defaultValue = "NORMAL") String mode,
|
||||
@RequestParam(value = "type", defaultValue = "text") String type,
|
||||
@RequestParam(value = "content", required = false) String content,
|
||||
@@ -109,6 +109,16 @@ public class KnowledgeBaseController {
|
||||
}
|
||||
}
|
||||
|
||||
@PostMapping("submit_text_directly")
|
||||
public void submitDirectly(
|
||||
@RequestParam("id") Long id,
|
||||
@RequestParam(value = "name", required = false) String name,
|
||||
@RequestParam(value = "split_key", defaultValue = "\n\n") String splitKey,
|
||||
@RequestBody String content
|
||||
) {
|
||||
embeddingService.submitDirectly(id, name, Lists.immutable.of(content.split(splitKey)));
|
||||
}
|
||||
|
||||
@PostMapping("query")
|
||||
public ImmutableList<String> query(
|
||||
@RequestParam("id") Long id,
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.lanyuanxiaoyao.service.ai.knowledge.entity.Knowledge;
|
||||
import com.lanyuanxiaoyao.service.ai.knowledge.entity.vo.DataFileVO;
|
||||
import com.yomahub.liteflow.core.FlowExecutor;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@@ -104,4 +105,32 @@ public class EmbeddingService {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void submitDirectly(Long id, String name, ImmutableList<String> contents) {
|
||||
executors.submit(() -> {
|
||||
Knowledge knowledge = knowledgeBaseService.get(id);
|
||||
String groupName = name;
|
||||
if (StrUtil.isBlank(groupName)) {
|
||||
groupName = StrUtil.format("外部-{}", IdUtil.nanoId(10));
|
||||
}
|
||||
Long groupId = groupService.add(knowledge.getId(), groupName);
|
||||
EmbeddingContext context = EmbeddingContext.builder()
|
||||
.vectorSourceId(knowledge.getVectorSourceId())
|
||||
.groupId(groupId)
|
||||
.build();
|
||||
context.setDocuments(
|
||||
contents
|
||||
.collect(StrUtil::trim)
|
||||
.collect(content ->
|
||||
Document.builder()
|
||||
.text(content)
|
||||
.metadata(new HashMap<>())
|
||||
.build()
|
||||
)
|
||||
.toList()
|
||||
);
|
||||
executor.execute2Resp("embedding_submit_directly", null, context);
|
||||
groupService.finish(groupId);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,13 +188,14 @@ public class KnowledgeBaseService {
|
||||
.similarityThreshold(threshold)
|
||||
.build()
|
||||
);
|
||||
List<org.noear.solon.ai.rag.Document> rerankDocuments = rerankingModel.rerank(
|
||||
// 如果只是一个知识库的话,似乎没有什么rerank的必要...
|
||||
/* List<org.noear.solon.ai.rag.Document> rerankDocuments = rerankingModel.rerank(
|
||||
text,
|
||||
documents.stream()
|
||||
.map(doc -> new org.noear.solon.ai.rag.Document(doc.getId(), doc.getText(), doc.getMetadata(), doc.getScore()))
|
||||
.toList()
|
||||
);
|
||||
return Lists.immutable.ofAll(rerankDocuments)
|
||||
.collect(org.noear.solon.ai.rag.Document::getContent);
|
||||
); */
|
||||
return Lists.immutable.ofAll(documents)
|
||||
.collect(Document::getText);
|
||||
}
|
||||
}
|
||||
@@ -195,6 +195,7 @@ public class EmbeddingNodes {
|
||||
.call()
|
||||
.content();
|
||||
Assert.notBlank(response, "LLM response is empty");
|
||||
logger.info("LLM response: \n{}", response);
|
||||
// noinspection DataFlowIssue
|
||||
return Arrays.stream(StrUtil.trim(response).split("---"))
|
||||
.map(text -> text.replaceAll("(?!^.+) +$", ""))
|
||||
@@ -209,6 +210,8 @@ public class EmbeddingNodes {
|
||||
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "import_vector_source", nodeName = "导入向量库", nodeType = NodeTypeEnum.COMMON)
|
||||
public void importVectorSource(NodeComponent node) {
|
||||
EmbeddingContext context = node.getContextBean(EmbeddingContext.class);
|
||||
Assert.notNull(context.getVectorSourceId(), "VectorSourceId is null");
|
||||
Assert.notNull(context.getGroupId(), "GroupId is null");
|
||||
if (ObjectUtil.isNotEmpty(context.getDocuments())) {
|
||||
VectorStore vs = QdrantVectorStore.builder(qdrantClient, embeddingModel)
|
||||
.collectionName(String.valueOf(context.getVectorSourceId()))
|
||||
|
||||
@@ -20,4 +20,7 @@
|
||||
<chain id="embedding_submit">
|
||||
SER(embedding_preview, import_vector_source)
|
||||
</chain>
|
||||
<chain id="embedding_submit_directly">
|
||||
SER(import_vector_source)
|
||||
</chain>
|
||||
</flow>
|
||||
@@ -0,0 +1,99 @@
|
||||
package com.lanyuanxiaoyao.service.ai.knowledge;
|
||||
|
||||
import cn.hutool.http.HttpResponse;
|
||||
import cn.hutool.http.HttpUtil;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250605
|
||||
*/
|
||||
public class UploadDirectly {
|
||||
public static void main(String[] args) {
|
||||
try (HttpResponse response = HttpUtil.createPost("http://132.126.207.130:35690/hudi_services/ai_knowledge/knowledge/submit_text_directly?id=1930517588999553024&name=Hadoop")
|
||||
.basicAuth("AxhEbscwsJDbYMH2", "cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4")
|
||||
.contentType("plain/text")
|
||||
// language=TEXT
|
||||
.body("""
|
||||
问:什么是Apache Hadoop的核心组件? Hadoop由哪几个主要部分构成?
|
||||
答:Apache Hadoop的核心组件包括三部分:
|
||||
HDFS (Hadoop Distributed File System):负责分布式存储,将数据分割为多个块(默认128MB或256MB),并在集群节点间分布存储,通过多副本机制实现高容错性。
|
||||
YARN (Yet Another Resource Negotiator):作为资源管理器,分配集群的计算资源(CPU、内存),支持多种数据处理框架运行。
|
||||
MapReduce:分布式计算模型,通过Map(任务分解并行处理)和Reduce(结果聚合)两阶段处理海量数据。
|
||||
三者协作流程为:HDFS存储数据 → YARN分配资源 → MapReduce执行计算。
|
||||
|
||||
|
||||
问:Hadoop适合哪些应用场景?能否举例说明?
|
||||
答:Hadoop的典型应用场景包括:
|
||||
大数据分析:处理PB级结构化/非结构化数据,如日志分析(用户行为追踪)、社交媒体数据挖掘。
|
||||
数据湖构建:以HDFS为基础存储混合格式数据(如传感器数据、文本)。
|
||||
ETL流程:通过Hive、Pig等工具清洗转换数据,构建数据仓库。
|
||||
机器学习:利用Mahout或Spark MLlib训练分布式模型。
|
||||
案例:百度、淘宝使用Hadoop处理每日TB级用户数据。
|
||||
|
||||
|
||||
问:Hadoop与Spark的主要区别是什么?两者如何协同工作?
|
||||
答:核心区别如下:
|
||||
维度 Hadoop MapReduce Apache Spark
|
||||
计算模型 基于磁盘的批处理 基于内存的迭代/流处理
|
||||
延迟 高(分钟级) 低(毫秒级)
|
||||
适用场景 离线分析、ETL 实时分析、机器学习
|
||||
成本 低(依赖廉价磁盘) 高(依赖大内存)
|
||||
协同机制:
|
||||
Spark可运行在YARN上,共享Hadoop集群资源。
|
||||
HDFS为Spark提供存储底座,Spark替代MapReduce作为计算引擎(例如:批处理用MapReduce,实时分析用Spark)。
|
||||
|
||||
|
||||
问:Hadoop的优势和局限性有哪些?
|
||||
答:
|
||||
优势:
|
||||
高容错性:数据多副本机制,节点故障时自动恢复。
|
||||
横向扩展:支持数千节点线性扩容。
|
||||
低成本:可在廉价硬件上部署。
|
||||
批处理优化:适合海量离线数据处理(如日志分析)。
|
||||
局限性:
|
||||
高延迟:MapReduce依赖磁盘I/O,无法实时处理。
|
||||
小文件效率低:大量小文件增加NameNode负担。
|
||||
编程复杂:需手动编写MapReduce代码。
|
||||
安全缺陷:缺乏存储/网络级加密。
|
||||
|
||||
|
||||
问:Hadoop的历史发展中有哪些关键里程碑?
|
||||
答:关键里程碑包括:
|
||||
2002年:起源于Doug Cutting的Apache Nutch项目。
|
||||
2004年:受Google GFS/MapReduce论文启发,开始实现类似技术。
|
||||
2006年:HDFS和MapReduce从Nutch独立为Hadoop项目。
|
||||
2008年:成为Apache顶级项目;同年以209秒完成1TB数据排序(910节点集群),创世界纪录。
|
||||
2012年:Hadoop 2.0发布,引入YARN取代MapReduce的资源管理角色。
|
||||
2014年:Spark成为默认执行引擎。
|
||||
2017年:Hadoop 3.0支持纠删码降低存储开销。
|
||||
|
||||
|
||||
问:Hadoop生态系统包含哪些重要工具?
|
||||
答:生态系统分为四层:
|
||||
数据接入:Flume(日志收集)、Sqoop(关系数据库同步)、Kafka(消息队列)。
|
||||
数据存储:HBase(列式数据库)、HDFS(核心存储)。
|
||||
计算引擎:
|
||||
批处理:MapReduce、Tez
|
||||
交互式:Hive(SQL查询)、Pig(脚本)
|
||||
实时:Spark Streaming、Storm
|
||||
机器学习:Mahout、Spark MLlib。
|
||||
管理运维:Ambari(监控)、Oozie(工作流调度)、Kerberos(认证)。
|
||||
|
||||
|
||||
问:是否可以说Hadoop是实时数据处理框架?
|
||||
答:不能。Hadoop的核心组件MapReduce设计为批处理模型,依赖磁盘I/O导致高延迟(分钟级)。实时处理需依赖其生态系统中的其他工具,如:
|
||||
Spark Streaming:支持毫秒级流处理。
|
||||
Storm:专攻实时计算。
|
||||
因此,Hadoop本身并非实时框架,但可通过集成扩展实时能力。
|
||||
|
||||
|
||||
问:Hadoop如何处理节点故障?
|
||||
答:通过双重容错机制:
|
||||
HDFS多副本:每个数据块默认存储3个副本(可配置),当节点故障时自动从其他节点读取副本数据。
|
||||
任务自动重启:MapReduce或YARN管理的任务失败时,YARN会将其重新分配到健康节点执行,确保任务完成。
|
||||
""")
|
||||
.execute()) {
|
||||
System.out.println(response.getStatus());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.info.*;
|
||||
import java.util.Map;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.eclipse.collections.api.map.ImmutableMap;
|
||||
|
||||
/**
|
||||
* Info 接口
|
||||
@@ -193,4 +194,7 @@ public interface InfoService {
|
||||
|
||||
@Get("/info/clean_table_version")
|
||||
Integer cleanTableVersion();
|
||||
|
||||
@Post(value = "/jdbc", contentType = "plain/text")
|
||||
ImmutableList<ImmutableMap<String, String>> jdbc(@Body String sql);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.lanyuanxiaoyao.service.info.controller;
|
||||
|
||||
import com.lanyuanxiaoyao.service.info.service.JdbcService;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.eclipse.collections.api.map.ImmutableMap;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250605
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/jdbc")
|
||||
public class JdbcController {
|
||||
private final JdbcService jdbcService;
|
||||
|
||||
public JdbcController(JdbcService jdbcService) {
|
||||
this.jdbcService = jdbcService;
|
||||
}
|
||||
|
||||
@PostMapping("")
|
||||
public ImmutableList<ImmutableMap<String, String>> jdbc(@RequestBody String sql) {
|
||||
return jdbcService.jdbc(sql);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.lanyuanxiaoyao.service.info.service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.eclipse.collections.api.factory.Lists;
|
||||
import org.eclipse.collections.api.factory.Maps;
|
||||
import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.eclipse.collections.api.map.ImmutableMap;
|
||||
import org.eclipse.collections.api.map.MutableMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 执行JDBC
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @version 20250605
|
||||
*/
|
||||
@Service
|
||||
public class JdbcService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(JdbcService.class);
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
public JdbcService(JdbcTemplate jdbcTemplate) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
}
|
||||
|
||||
public ImmutableList<ImmutableMap<String, String>> jdbc(String sql) {
|
||||
logger.info("Executing SQL: {}", sql);
|
||||
List<Map<String, Object>> lines = jdbcTemplate.queryForList(sql);
|
||||
return Lists.immutable.ofAll(lines)
|
||||
.collect(map -> {
|
||||
MutableMap<String, String> item = Maps.mutable.empty();
|
||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||
item.put(entry.getKey(), String.valueOf(entry.getValue()));
|
||||
}
|
||||
return item.toImmutable();
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user