feat(knowledge): 实现外部直接插入知识库

This commit is contained in:
v-zhangjc9
2025-06-05 15:20:33 +08:00
parent 8b4827b164
commit e359bed97c
5 changed files with 145 additions and 1 deletions

View File

@@ -94,7 +94,7 @@ public class KnowledgeBaseController {
@PostMapping("submit_text")
public void submitText(
@RequestParam(value = "id") Long id,
@RequestParam("id") Long id,
@RequestParam(value = "mode", defaultValue = "NORMAL") String mode,
@RequestParam(value = "type", defaultValue = "text") String type,
@RequestParam(value = "content", required = false) String content,
@@ -109,6 +109,16 @@ public class KnowledgeBaseController {
}
}
@PostMapping("submit_text_directly")
public void submitDirectly(
@RequestParam("id") Long id,
@RequestParam(value = "name", required = false) String name,
@RequestParam(value = "split_key", defaultValue = "\n\n") String splitKey,
@RequestBody String content
) {
embeddingService.submitDirectly(id, name, Lists.immutable.of(content.split(splitKey)));
}
@PostMapping("query")
public ImmutableList<String> query(
@RequestParam("id") Long id,

View File

@@ -9,6 +9,7 @@ import com.lanyuanxiaoyao.service.ai.knowledge.entity.Knowledge;
import com.lanyuanxiaoyao.service.ai.knowledge.entity.vo.DataFileVO;
import com.yomahub.liteflow.core.FlowExecutor;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -104,4 +105,32 @@ public class EmbeddingService {
}
});
}
public void submitDirectly(Long id, String name, ImmutableList<String> contents) {
executors.submit(() -> {
Knowledge knowledge = knowledgeBaseService.get(id);
String groupName = name;
if (StrUtil.isBlank(groupName)) {
groupName = StrUtil.format("外部-{}", IdUtil.nanoId(10));
}
Long groupId = groupService.add(knowledge.getId(), groupName);
EmbeddingContext context = EmbeddingContext.builder()
.vectorSourceId(knowledge.getVectorSourceId())
.groupId(groupId)
.build();
context.setDocuments(
contents
.collect(StrUtil::trim)
.collect(content ->
Document.builder()
.text(content)
.metadata(new HashMap<>())
.build()
)
.toList()
);
executor.execute2Resp("embedding_submit_directly", null, context);
groupService.finish(groupId);
});
}
}

View File

@@ -195,6 +195,7 @@ public class EmbeddingNodes {
.call()
.content();
Assert.notBlank(response, "LLM response is empty");
logger.info("LLM response: \n{}", response);
// noinspection DataFlowIssue
return Arrays.stream(StrUtil.trim(response).split("---"))
.map(text -> text.replaceAll("(?!^.+) +$", ""))
@@ -209,6 +210,8 @@ public class EmbeddingNodes {
@LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "import_vector_source", nodeName = "导入向量库", nodeType = NodeTypeEnum.COMMON)
public void importVectorSource(NodeComponent node) {
EmbeddingContext context = node.getContextBean(EmbeddingContext.class);
Assert.notNull(context.getVectorSourceId(), "VectorSourceId is null");
Assert.notNull(context.getGroupId(), "GroupId is null");
if (ObjectUtil.isNotEmpty(context.getDocuments())) {
VectorStore vs = QdrantVectorStore.builder(qdrantClient, embeddingModel)
.collectionName(String.valueOf(context.getVectorSourceId()))

View File

@@ -20,4 +20,7 @@
<chain id="embedding_submit">
SER(embedding_preview, import_vector_source)
</chain>
<chain id="embedding_submit_directly">
SER(import_vector_source)
</chain>
</flow>

View File

@@ -0,0 +1,99 @@
package com.lanyuanxiaoyao.service.ai.knowledge;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
/**
* @author lanyuanxiaoyao
* @version 20250605
*/
public class UploadDirectly {
public static void main(String[] args) {
try (HttpResponse response = HttpUtil.createPost("http://132.126.207.130:35690/hudi_services/ai_knowledge/knowledge/submit_text_directly?id=1930517588999553024&name=Hadoop")
.basicAuth("AxhEbscwsJDbYMH2", "cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4")
.contentType("plain/text")
// language=TEXT
.body("""
什么是Apache Hadoop的核心组件 Hadoop由哪几个主要部分构成
Apache Hadoop的核心组件包括三部分
HDFS (Hadoop Distributed File System)负责分布式存储将数据分割为多个块默认128MB或256MB并在集群节点间分布存储通过多副本机制实现高容错性。
YARN (Yet Another Resource Negotiator)作为资源管理器分配集群的计算资源CPU、内存支持多种数据处理框架运行。
MapReduce分布式计算模型通过Map任务分解并行处理和Reduce结果聚合两阶段处理海量数据。
三者协作流程为HDFS存储数据 → YARN分配资源 → MapReduce执行计算。
Hadoop适合哪些应用场景能否举例说明
Hadoop的典型应用场景包括
大数据分析处理PB级结构化/非结构化数据,如日志分析(用户行为追踪)、社交媒体数据挖掘。
数据湖构建以HDFS为基础存储混合格式数据如传感器数据、文本
ETL流程通过Hive、Pig等工具清洗转换数据构建数据仓库。
机器学习利用Mahout或Spark MLlib训练分布式模型。
案例百度、淘宝使用Hadoop处理每日TB级用户数据。
Hadoop与Spark的主要区别是什么两者如何协同工作
答:核心区别如下:
维度 Hadoop MapReduce Apache Spark
计算模型 基于磁盘的批处理 基于内存的迭代/流处理
延迟 高(分钟级) 低(毫秒级)
适用场景 离线分析、ETL 实时分析、机器学习
成本 低(依赖廉价磁盘) 高(依赖大内存)
协同机制:
Spark可运行在YARN上共享Hadoop集群资源。
HDFS为Spark提供存储底座Spark替代MapReduce作为计算引擎例如批处理用MapReduce实时分析用Spark
Hadoop的优势和局限性有哪些
答:
优势:
高容错性:数据多副本机制,节点故障时自动恢复。
横向扩展:支持数千节点线性扩容。
低成本:可在廉价硬件上部署。
批处理优化:适合海量离线数据处理(如日志分析)。
局限性:
高延迟MapReduce依赖磁盘I/O无法实时处理。
小文件效率低大量小文件增加NameNode负担。
编程复杂需手动编写MapReduce代码。
安全缺陷:缺乏存储/网络级加密。
Hadoop的历史发展中有哪些关键里程碑
答:关键里程碑包括:
2002年起源于Doug Cutting的Apache Nutch项目。
2004年受Google GFS/MapReduce论文启发开始实现类似技术。
2006年HDFS和MapReduce从Nutch独立为Hadoop项目。
2008年成为Apache顶级项目同年以209秒完成1TB数据排序910节点集群创世界纪录。
2012年Hadoop 2.0发布引入YARN取代MapReduce的资源管理角色。
2014年Spark成为默认执行引擎。
2017年Hadoop 3.0支持纠删码降低存储开销。
Hadoop生态系统包含哪些重要工具
答:生态系统分为四层:
数据接入Flume日志收集、Sqoop关系数据库同步、Kafka消息队列
数据存储HBase列式数据库、HDFS核心存储
计算引擎:
批处理MapReduce、Tez
交互式HiveSQL查询、Pig脚本
实时Spark Streaming、Storm
机器学习Mahout、Spark MLlib。
管理运维Ambari监控、Oozie工作流调度、Kerberos认证
是否可以说Hadoop是实时数据处理框架
不能。Hadoop的核心组件MapReduce设计为批处理模型依赖磁盘I/O导致高延迟分钟级。实时处理需依赖其生态系统中的其他工具
Spark Streaming支持毫秒级流处理。
Storm专攻实时计算。
因此Hadoop本身并非实时框架但可通过集成扩展实时能力。
Hadoop如何处理节点故障
答:通过双重容错机制:
HDFS多副本每个数据块默认存储3个副本可配置当节点故障时自动从其他节点读取副本数据。
任务自动重启MapReduce或YARN管理的任务失败时YARN会将其重新分配到健康节点执行确保任务完成。
""")
.execute()) {
System.out.println(response.getStatus());
}
}
}