diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/BaseNode.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/BaseNode.java new file mode 100644 index 0000000..127a8ec --- /dev/null +++ b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/BaseNode.java @@ -0,0 +1,16 @@ +package com.lanyuanxiaoyao.service.ai.web.flow; + +import com.yomahub.liteflow.core.NodeComponent; +import lombok.extern.slf4j.Slf4j; + +/** + * @author lanyuanxiaoyao + * @version 20250625 + */ +@Slf4j +public abstract class BaseNode extends NodeComponent { + @Override + public void process() throws Exception { + log.info(getClass().getName()); + } +} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/EndNode.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/EndNode.java new file mode 100644 index 0000000..ef9cffa --- /dev/null +++ b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/EndNode.java @@ -0,0 +1,12 @@ +package com.lanyuanxiaoyao.service.ai.web.flow; + +/** + * @author lanyuanxiaoyao + * @version 20250625 + */ +public class EndNode extends BaseNode { + @Override + public void process() throws Exception { + + } +} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LiteFlowService.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LiteFlowService.java new file mode 100644 index 0000000..954ca45 --- /dev/null +++ b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LiteFlowService.java @@ -0,0 +1,304 @@ +package com.lanyuanxiaoyao.service.ai.web.flow; + +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.enums.NodeTypeEnum; +import java.util.LinkedList; +import java.util.Queue; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.collections.api.factory.Lists; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; +import org.eclipse.collections.api.map.ImmutableMap; + +/** + * @author lanyuanxiaoyao + * @version 20250625 + */ +@Slf4j +public class LiteFlowService { + public LiteFlowService() { + createNode("start-amis-node", NodeTypeEnum.COMMON, StartNode.class); + createNode("end-amis-node", NodeTypeEnum.COMMON, EndNode.class); + createNode("llm-amis-node", NodeTypeEnum.COMMON, LlmNode.class); + } + + private static void createNode(String name, NodeTypeEnum type, Class clazz) { + LiteFlowNodeBuilder.createNode() + .setId(name) + .setName(name) + .setType(type) + .setClazz(clazz) + .build(); + } + + public static void main(String[] args) throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new EclipseCollectionsModule()); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + // language=JSON + String source = """ + { + "nodes": [ + { + "id": "A", + "type": "start", + "position": { + "x": 8, + "y": 272 + }, + "data": {}, + "measured": { + "width": 256, + "height": 75 + }, + "selected": false, + "dragging": false + }, + { + "id": "F", + "type": "end", + "position": { + "x": 1439.5556937134281, + "y": 282.2797340760818 + }, + "data": {}, + "measured": { + "width": 256, + "height": 75 + }, + "selected": false, + "dragging": false + }, + { + "id": "C", + "type": "normal", + "position": { + "x": 902.6781018665707, + "y": 115.31234529524048 + }, + "data": {}, + "measured": { + "width": 256, + "height": 75 + }, + "selected": false, + "dragging": false + }, + { + "id": "B", + "type": "normal", + "position": { + "x": 338, + "y": 287 + }, + "data": {}, + "measured": { + "width": 256, + "height": 75 + }, + "selected": false, + "dragging": false + }, + { + "id": "E", + "type": "normal", + "position": { + "x": 1086.6322978498904, + "y": 371.3061114283591 + }, + "data": {}, + "measured": { + "width": 256, + "height": 75 + }, + "selected": true, + "dragging": false + }, + { + "id": "D", + "type": "normal", + "position": { + "x": 700.0944461714178, + "y": 369.84258971430364 + }, + "data": {}, + "measured": { + "width": 256, + "height": 75 + }, + "selected": false, + "dragging": false + } + ], + "edges": [ + { + "source": "A", + "target": "B", + "id": "xy-edge__A-B" + }, + { + "source": "B", + "target": "C", + "id": "xy-edge__B-C" + }, + { + "source": "C", + "target": "F", + "id": "xy-edge__C-F" + }, + { + "source": "D", + "target": "E", + "id": "xy-edge__D-E" + }, + { + "source": "B", + "target": "D", + "id": "xy-edge__B-D" + }, + { + "source": "E", + "target": "F", + "id": "xy-edge__E-F" + } + ], + "data": { + "A": { + "inputs": { + "name": { + "type": "text" + }, + "description": { + "type": "text", + "description": "文件描述" + } + } + }, + "C": { + "model": "qwen3", + "outputs": { + "text": { + "type": "string" + } + }, + "systemPrompt": "你是个沙雕" + }, + "B": { + "count": 3, + "score": 0.75, + "knowledgeId": 3585368238960640, + "query": "hello world" + }, + "E": { + "type": "python", + "content": "code='hello'\\nprint(code)" + }, + "D": { + "model": "qwen3", + "outputs": { + "text": { + "type": "string" + } + }, + "systemPrompt": "你是个聪明人" + } + } + } + """; + FlowData root = mapper.readValue(StrUtil.trim(source), FlowData.class); + log.info("\n{}", buildEl(root.nodes, root.edges)); + } + + public static String buildEl(ImmutableList nodes, ImmutableList edges) { + var nodeMap = nodes.toMap(FlowData.Node::getId, node -> node); + var adjacencyGraph = Maps.mutable.>empty(); + var reverseAdjacencyGraph = Maps.mutable.>empty(); + var inDegree = Maps.mutable.empty(); + + nodes.forEach(node -> { + adjacencyGraph.put(node.getId(), Lists.mutable.empty()); + reverseAdjacencyGraph.put(node.getId(), Lists.mutable.empty()); + inDegree.put(node.getId(), 0); + }); + edges.forEach(edge -> { + adjacencyGraph.get(edge.getSource()).add(edge.getTarget()); + reverseAdjacencyGraph.get(edge.getTarget()).add(edge.getSource()); + inDegree.put(edge.getTarget(), inDegree.get(edge.getTarget()) + 1); + }); + + Queue queue = new LinkedList<>(); + var topologicalSortedList = Lists.mutable.empty(); + + inDegree.forEachKeyValue((id, count) -> { + if (count == 0) { + queue.offer(id); + } + }); + + while (!queue.isEmpty()) { + String id = queue.poll(); + topologicalSortedList.add(id); + for (var neighborId : adjacencyGraph.get(id)) { + inDegree.put(neighborId, inDegree.get(neighborId) - 1); + if (inDegree.get(neighborId) == 0) { + queue.offer(neighborId); + } + } + } + + topologicalSortedList.forEach(id -> log.info("{} {}", id, adjacencyGraph.get(id))); + topologicalSortedList.forEach(id -> log.info("{} {}", id, reverseAdjacencyGraph.get(id))); + + var nodeQueue = new LinkedList<>(topologicalSortedList); + var chains = Lists.mutable.>empty(); + while (!nodeQueue.isEmpty()) { + String currentId = nodeQueue.poll(); + var subChain = Lists.mutable.empty(); + while (true) { + subChain.add(currentId); + nodeQueue.remove(currentId); + if (adjacencyGraph.get(currentId).size() != 1) { + break; + } + String nextId = adjacencyGraph.get(currentId).get(0); + if (reverseAdjacencyGraph.get(nextId).size() > 1) { + break; + } + currentId = nextId; + } + chains.add(subChain); + } + + log.info("{}", chains); + + return StrUtil.join(",", topologicalSortedList); + } + + @Data + public static class FlowData { + private ImmutableList nodes; + private ImmutableList edges; + private ImmutableMap data; + + @Data + public static class Node { + private String id; + private String type; + } + + @Data + public static class Edge { + private String id; + private String source; + private String target; + } + } +} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LlmNode.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LlmNode.java new file mode 100644 index 0000000..4dc9d70 --- /dev/null +++ b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LlmNode.java @@ -0,0 +1,12 @@ +package com.lanyuanxiaoyao.service.ai.web.flow; + +/** + * @author lanyuanxiaoyao + * @version 20250625 + */ +public class LlmNode extends BaseNode { + @Override + public void process() throws Exception { + + } +} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/StartNode.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/StartNode.java new file mode 100644 index 0000000..4d1a34f --- /dev/null +++ b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/StartNode.java @@ -0,0 +1,12 @@ +package com.lanyuanxiaoyao.service.ai.web.flow; + +/** + * @author lanyuanxiaoyao + * @version 20250625 + */ +public class StartNode extends BaseNode { + @Override + public void process() throws Exception { + + } +} diff --git a/service-web/client/src/pages/ai/flow/FlowEditor.tsx b/service-web/client/src/pages/ai/flow/FlowEditor.tsx index bbd6f33..4709fae 100644 --- a/service-web/client/src/pages/ai/flow/FlowEditor.tsx +++ b/service-web/client/src/pages/ai/flow/FlowEditor.tsx @@ -4,6 +4,7 @@ import { BackgroundVariant, type Connection, Controls, + getIncomers, getOutgoers, MiniMap, type Node, @@ -17,11 +18,13 @@ import {arrToMap, find, findIdx, isEqual, isNil, randomId} from 'licia' import {type JSX, useState} from 'react' import styled from 'styled-components' import '@xyflow/react/dist/style.css' +import Queue from 'yocto-queue' import {amisRender, commonInfo, horizontalFormOptions} from '../../../util/amis.tsx' import CodeNode from './node/CodeNode.tsx' import EndNode from './node/EndNode.tsx' import KnowledgeNode from './node/KnowledgeNode.tsx' import LlmNode from './node/LlmNode.tsx' +import ParallelNode from './node/ParallelNode.tsx' import StartNode from './node/StartNode.tsx' import {useDataStore} from './store/DataStore.ts' import {useFlowStore} from './store/FlowStore.ts' @@ -71,27 +74,32 @@ function FlowEditor() { component: (props: NodeProps) => JSX.Element }[]>([ { - key: 'start-amis-node', + key: 'start-node', name: '开始', component: StartNode, }, { - key: 'end-amis-node', + key: 'end-node', name: '结束', component: EndNode, }, { - key: 'llm-amis-node', + key: 'parallel-node', + name: '并行', + component: ParallelNode, + }, + { + key: 'llm-node', name: '大模型', component: LlmNode, }, { - key: 'knowledge-amis-node', + key: 'knowledge-node', name: '知识库', component: KnowledgeNode, }, { - key: 'code-amis-node', + key: 'code-node', name: '代码执行', component: CodeNode, }, @@ -183,10 +191,10 @@ function FlowEditor() { } const checkNode = (type: string) => { - if (isEqual(type, 'start-amis-node') && findIdx(nodes, (node: Node) => isEqual(type, node.type)) > -1) { + if (isEqual(type, 'start-node') && findIdx(nodes, (node: Node) => isEqual(type, node.type)) > -1) { throw new Error('只能存在1个开始节点') } - if (isEqual(type, 'end-amis-node') && findIdx(nodes, (node: Node) => isEqual(type, node.type)) > -1) { + if (isEqual(type, 'end-node') && findIdx(nodes, (node: Node) => isEqual(type, node.type)) > -1) { throw new Error('只能存在1个结束节点') } } @@ -201,12 +209,12 @@ function FlowEditor() { throw new Error('连线目标节点未找到') } // 禁止短路整个流程 - if (isEqual('start-amis-node', sourceNode.type) && isEqual('end-amis-node', targetNode.type)) { + if (isEqual('start-node', sourceNode.type) && isEqual('end-node', targetNode.type)) { throw new Error('开始节点不能直连结束节点') } // 禁止流程出现环,必须是有向无环图 - const hasCycle = (node: Node, visited = new Set()) => { + const hasCycle = (node: Node, visited = new Set()) => { if (visited.has(node.id)) return false visited.add(node.id) for (const outgoer of getOutgoers(node, nodes, edges)) { @@ -219,11 +227,35 @@ function FlowEditor() { } else if (hasCycle(targetNode)) { throw new Error('禁止流程循环') } + + const hasRedundant = (source: Node, target: Node) => { + const visited = new Set() + const queue = new Queue() + queue.enqueue(source) + visited.add(source.id) + while (queue.size > 0) { + const current = queue.dequeue()! + console.log(current.id) + for (const incomer of getIncomers(current, nodes, edges)) { + if (isEqual(incomer.id, target.id)) { + return true + } + if (!visited.has(incomer.id)) { + visited.add(incomer.id) + queue.enqueue(incomer) + } + } + } + return false + } + if (hasRedundant(sourceNode, targetNode)) { + throw new Error('出现冗余边') + } } useMount(() => { // language=JSON - let initialData = JSON.parse('{\n "nodes": [],\n "edges": [],\n "data": {}\n}') + let initialData = JSON.parse('{\n "nodes": [\n {\n "id": "lurod0PM-J",\n "type": "parallel-node",\n "position": {\n "x": -156,\n "y": 77\n },\n "data": {},\n "measured": {\n "width": 256,\n "height": 75\n },\n "selected": false,\n "dragging": false\n },\n {\n "id": "ldoKAzHnKF",\n "type": "llm-node",\n "position": {\n "x": 207,\n "y": -38\n },\n "data": {},\n "measured": {\n "width": 256,\n "height": 75\n },\n "selected": false,\n "dragging": false\n },\n {\n "id": "1eJtMoJWs6",\n "type": "llm-node",\n "position": {\n "x": 207,\n "y": 172.5\n },\n "data": {},\n "measured": {\n "width": 256,\n "height": 75\n },\n "selected": false,\n "dragging": false\n }\n ],\n "edges": [\n {\n "source": "lurod0PM-J",\n "target": "1eJtMoJWs6",\n "id": "xy-edge__lurod0PM-J-1eJtMoJWs6"\n },\n {\n "source": "lurod0PM-J",\n "target": "ldoKAzHnKF",\n "id": "xy-edge__lurod0PM-J-ldoKAzHnKF"\n }\n ],\n "data": {}\n}') let initialNodes = initialData['nodes'] ?? [] let initialEdges = initialData['edges'] ?? [] diff --git a/service-web/client/src/pages/ai/flow/node/AmisNode.tsx b/service-web/client/src/pages/ai/flow/node/AmisNode.tsx index 6616416..71662da 100644 --- a/service-web/client/src/pages/ai/flow/node/AmisNode.tsx +++ b/service-web/client/src/pages/ai/flow/node/AmisNode.tsx @@ -1,8 +1,8 @@ import {DeleteFilled, EditFilled} from '@ant-design/icons' -import {Handle, type NodeProps, Position} from '@xyflow/react' +import {Handle, type HandleProps, type NodeProps, Position, useNodeConnections} from '@xyflow/react' import type {Schema} from 'amis' import {Card, Dropdown} from 'antd' -import {isEmpty, isEqual} from 'licia' +import {isEmpty, isEqual, isNil} from 'licia' import {type JSX} from 'react' import {horizontalFormOptions} from '../../../../util/amis.tsx' @@ -55,13 +55,26 @@ export function outputsFormColumns(editable: boolean = false, required: boolean ] } -interface AmisNodeProps { - nodeProps: NodeProps, - type: AmisNodeType, - defaultNodeName: String, - defaultNodeDescription?: String, - extraNodeDescription?: (nodeData: any) => JSX.Element, - columnSchema?: Schema[], +export const LimitHandler = (props: HandleProps & { limit: number }) => { + const connections = useNodeConnections({ + handleType: props.type, + }) + return ( + + ) +} + +type AmisNodeProps = { + nodeProps: NodeProps + type: AmisNodeType + defaultNodeName: String + defaultNodeDescription?: String + extraNodeDescription?: (nodeData: any) => JSX.Element + handlers?: (nodeData: any) => JSX.Element + columnSchema?: Schema[] } const AmisNode: (props: AmisNodeProps) => JSX.Element = ({ @@ -70,6 +83,7 @@ const AmisNode: (props: AmisNodeProps) => JSX.Element = ({ defaultNodeName, defaultNodeDescription, extraNodeDescription, + handlers, columnSchema, }) => { const {id, data} = nodeProps @@ -142,10 +156,14 @@ const AmisNode: (props: AmisNodeProps) => JSX.Element = ({ - {isEqual(type, 'start') || isEqual(type, 'normal') - ? : undefined} - {isEqual(type, 'end') || isEqual(type, 'normal') - ? : undefined} + {isNil(handlers) + ? <> + {isEqual(type, 'start') || isEqual(type, 'normal') + ? : undefined} + {isEqual(type, 'end') || isEqual(type, 'normal') + ? : undefined} + + : handlers?.(nodeData)} ) } diff --git a/service-web/client/src/pages/ai/flow/node/ParallelNode.tsx b/service-web/client/src/pages/ai/flow/node/ParallelNode.tsx new file mode 100644 index 0000000..955100d --- /dev/null +++ b/service-web/client/src/pages/ai/flow/node/ParallelNode.tsx @@ -0,0 +1,19 @@ +import {Handle, type NodeProps, Position} from '@xyflow/react' +import AmisNode, {LimitHandler} from './AmisNode.tsx' + +const ParallelNode = (props: NodeProps) => AmisNode({ + nodeProps: props, + type: 'normal', + defaultNodeName: '并行节点', + defaultNodeDescription: '允许开启并行流程', + handlers: () => { + return ( + <> + + + + ) + }, +}) + +export default ParallelNode \ No newline at end of file