diff --git a/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/controller/KnowledgeBaseController.java b/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/controller/KnowledgeBaseController.java index 168fcd3..4a32256 100644 --- a/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/controller/KnowledgeBaseController.java +++ b/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/controller/KnowledgeBaseController.java @@ -94,7 +94,7 @@ public class KnowledgeBaseController { @PostMapping("submit_text") public void submitText( - @RequestParam(value = "id") Long id, + @RequestParam("id") Long id, @RequestParam(value = "mode", defaultValue = "NORMAL") String mode, @RequestParam(value = "type", defaultValue = "text") String type, @RequestParam(value = "content", required = false) String content, @@ -109,6 +109,16 @@ public class KnowledgeBaseController { } } + @PostMapping("submit_text_directly") + public void submitDirectly( + @RequestParam("id") Long id, + @RequestParam(value = "name", required = false) String name, + @RequestParam(value = "split_key", defaultValue = "\n\n") String splitKey, + @RequestBody String content + ) { + embeddingService.submitDirectly(id, name, Lists.immutable.of(content.split(splitKey))); + } + @PostMapping("query") public ImmutableList query( @RequestParam("id") Long id, diff --git a/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/service/EmbeddingService.java b/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/service/EmbeddingService.java index 0b4be14..35691ec 100644 --- a/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/service/EmbeddingService.java +++ b/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/service/EmbeddingService.java @@ -9,6 +9,7 @@ import com.lanyuanxiaoyao.service.ai.knowledge.entity.Knowledge; import com.lanyuanxiaoyao.service.ai.knowledge.entity.vo.DataFileVO; import com.yomahub.liteflow.core.FlowExecutor; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -104,4 +105,32 @@ public class EmbeddingService { } }); } + + public void submitDirectly(Long id, String name, ImmutableList contents) { + executors.submit(() -> { + Knowledge knowledge = knowledgeBaseService.get(id); + String groupName = name; + if (StrUtil.isBlank(groupName)) { + groupName = StrUtil.format("外部-{}", IdUtil.nanoId(10)); + } + Long groupId = groupService.add(knowledge.getId(), groupName); + EmbeddingContext context = EmbeddingContext.builder() + .vectorSourceId(knowledge.getVectorSourceId()) + .groupId(groupId) + .build(); + context.setDocuments( + contents + .collect(StrUtil::trim) + .collect(content -> + Document.builder() + .text(content) + .metadata(new HashMap<>()) + .build() + ) + .toList() + ); + executor.execute2Resp("embedding_submit_directly", null, context); + groupService.finish(groupId); + }); + } } diff --git a/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/service/node/EmbeddingNodes.java b/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/service/node/EmbeddingNodes.java index d7bce25..bb04151 100644 --- a/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/service/node/EmbeddingNodes.java +++ b/service-ai/service-ai-knowledge/src/main/java/com/lanyuanxiaoyao/service/ai/knowledge/service/node/EmbeddingNodes.java @@ -195,6 +195,7 @@ public class EmbeddingNodes { .call() .content(); Assert.notBlank(response, "LLM response is empty"); + logger.info("LLM response: \n{}", response); // noinspection DataFlowIssue return Arrays.stream(StrUtil.trim(response).split("---")) .map(text -> text.replaceAll("(?!^.+) +$", "")) @@ -209,6 +210,8 @@ public class EmbeddingNodes { @LiteflowMethod(value = LiteFlowMethodEnum.PROCESS, nodeId = "import_vector_source", nodeName = "导入向量库", nodeType = NodeTypeEnum.COMMON) public void importVectorSource(NodeComponent node) { EmbeddingContext context = node.getContextBean(EmbeddingContext.class); + Assert.notNull(context.getVectorSourceId(), "VectorSourceId is null"); + Assert.notNull(context.getGroupId(), "GroupId is null"); if (ObjectUtil.isNotEmpty(context.getDocuments())) { VectorStore vs = QdrantVectorStore.builder(qdrantClient, embeddingModel) .collectionName(String.valueOf(context.getVectorSourceId())) diff --git a/service-ai/service-ai-knowledge/src/main/resources/config/flow.xml b/service-ai/service-ai-knowledge/src/main/resources/config/flow.xml index cecb4e9..4669393 100644 --- a/service-ai/service-ai-knowledge/src/main/resources/config/flow.xml +++ b/service-ai/service-ai-knowledge/src/main/resources/config/flow.xml @@ -20,4 +20,7 @@ SER(embedding_preview, import_vector_source) + + SER(import_vector_source) + \ No newline at end of file diff --git a/service-ai/service-ai-knowledge/src/test/java/com/lanyuanxiaoyao/service/ai/knowledge/UploadDirectly.java b/service-ai/service-ai-knowledge/src/test/java/com/lanyuanxiaoyao/service/ai/knowledge/UploadDirectly.java new file mode 100644 index 0000000..9b362f1 --- /dev/null +++ b/service-ai/service-ai-knowledge/src/test/java/com/lanyuanxiaoyao/service/ai/knowledge/UploadDirectly.java @@ -0,0 +1,99 @@ +package com.lanyuanxiaoyao.service.ai.knowledge; + +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; + +/** + * @author lanyuanxiaoyao + * @version 20250605 + */ +public class UploadDirectly { + public static void main(String[] args) { + try (HttpResponse response = HttpUtil.createPost("http://132.126.207.130:35690/hudi_services/ai_knowledge/knowledge/submit_text_directly?id=1930517588999553024&name=Hadoop") + .basicAuth("AxhEbscwsJDbYMH2", "cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4") + .contentType("plain/text") + // language=TEXT + .body(""" + 问:什么是Apache Hadoop的核心组件? Hadoop由哪几个主要部分构成? + 答:Apache Hadoop的核心组件包括三部分: + HDFS (Hadoop Distributed File System):负责分布式存储,将数据分割为多个块(默认128MB或256MB),并在集群节点间分布存储,通过多副本机制实现高容错性。 + YARN (Yet Another Resource Negotiator):作为资源管理器,分配集群的计算资源(CPU、内存),支持多种数据处理框架运行。 + MapReduce:分布式计算模型,通过Map(任务分解并行处理)和Reduce(结果聚合)两阶段处理海量数据。 + 三者协作流程为:HDFS存储数据 → YARN分配资源 → MapReduce执行计算。 + + + 问:Hadoop适合哪些应用场景?能否举例说明? + 答:Hadoop的典型应用场景包括: + 大数据分析:处理PB级结构化/非结构化数据,如日志分析(用户行为追踪)、社交媒体数据挖掘。 + 数据湖构建:以HDFS为基础存储混合格式数据(如传感器数据、文本)。 + ETL流程:通过Hive、Pig等工具清洗转换数据,构建数据仓库。 + 机器学习:利用Mahout或Spark MLlib训练分布式模型。 + 案例:百度、淘宝使用Hadoop处理每日TB级用户数据。 + + + 问:Hadoop与Spark的主要区别是什么?两者如何协同工作? + 答:核心区别如下: + 维度 Hadoop MapReduce Apache Spark + 计算模型 基于磁盘的批处理 基于内存的迭代/流处理 + 延迟 高(分钟级) 低(毫秒级) + 适用场景 离线分析、ETL 实时分析、机器学习 + 成本 低(依赖廉价磁盘) 高(依赖大内存) + 协同机制: + Spark可运行在YARN上,共享Hadoop集群资源。 + HDFS为Spark提供存储底座,Spark替代MapReduce作为计算引擎(例如:批处理用MapReduce,实时分析用Spark)。 + + + 问:Hadoop的优势和局限性有哪些? + 答: + 优势: + 高容错性:数据多副本机制,节点故障时自动恢复。 + 横向扩展:支持数千节点线性扩容。 + 低成本:可在廉价硬件上部署。 + 批处理优化:适合海量离线数据处理(如日志分析)。 + 局限性: + 高延迟:MapReduce依赖磁盘I/O,无法实时处理。 + 小文件效率低:大量小文件增加NameNode负担。 + 编程复杂:需手动编写MapReduce代码。 + 安全缺陷:缺乏存储/网络级加密。 + + + 问:Hadoop的历史发展中有哪些关键里程碑? + 答:关键里程碑包括: + 2002年:起源于Doug Cutting的Apache Nutch项目。 + 2004年:受Google GFS/MapReduce论文启发,开始实现类似技术。 + 2006年:HDFS和MapReduce从Nutch独立为Hadoop项目。 + 2008年:成为Apache顶级项目;同年以209秒完成1TB数据排序(910节点集群),创世界纪录。 + 2012年:Hadoop 2.0发布,引入YARN取代MapReduce的资源管理角色。 + 2014年:Spark成为默认执行引擎。 + 2017年:Hadoop 3.0支持纠删码降低存储开销。 + + + 问:Hadoop生态系统包含哪些重要工具? + 答:生态系统分为四层: + 数据接入:Flume(日志收集)、Sqoop(关系数据库同步)、Kafka(消息队列)。 + 数据存储:HBase(列式数据库)、HDFS(核心存储)。 + 计算引擎: + 批处理:MapReduce、Tez + 交互式:Hive(SQL查询)、Pig(脚本) + 实时:Spark Streaming、Storm + 机器学习:Mahout、Spark MLlib。 + 管理运维:Ambari(监控)、Oozie(工作流调度)、Kerberos(认证)。 + + + 问:是否可以说Hadoop是实时数据处理框架? + 答:不能。Hadoop的核心组件MapReduce设计为批处理模型,依赖磁盘I/O导致高延迟(分钟级)。实时处理需依赖其生态系统中的其他工具,如: + Spark Streaming:支持毫秒级流处理。 + Storm:专攻实时计算。 + 因此,Hadoop本身并非实时框架,但可通过集成扩展实时能力。 + + + 问:Hadoop如何处理节点故障? + 答:通过双重容错机制: + HDFS多副本:每个数据块默认存储3个副本(可配置),当节点故障时自动从其他节点读取副本数据。 + 任务自动重启:MapReduce或YARN管理的任务失败时,YARN会将其重新分配到健康节点执行,确保任务完成。 + """) + .execute()) { + System.out.println(response.getStatus()); + } + } +}