feat(web): 尝试增加并行节点解决解析问题

This commit is contained in:
v-zhangjc9
2025-06-26 20:48:49 +08:00
parent c92a374591
commit d6b70b1750
8 changed files with 448 additions and 23 deletions

View File

@@ -0,0 +1,16 @@
package com.lanyuanxiaoyao.service.ai.web.flow;
import com.yomahub.liteflow.core.NodeComponent;
import lombok.extern.slf4j.Slf4j;
/**
* @author lanyuanxiaoyao
* @version 20250625
*/
@Slf4j
public abstract class BaseNode extends NodeComponent {
@Override
public void process() throws Exception {
log.info(getClass().getName());
}
}

View File

@@ -0,0 +1,12 @@
package com.lanyuanxiaoyao.service.ai.web.flow;
/**
* @author lanyuanxiaoyao
* @version 20250625
*/
public class EndNode extends BaseNode {
@Override
public void process() throws Exception {
}
}

View File

@@ -0,0 +1,304 @@
package com.lanyuanxiaoyao.service.ai.web.flow;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import java.util.LinkedList;
import java.util.Queue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.ImmutableList;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.api.map.ImmutableMap;
/**
* @author lanyuanxiaoyao
* @version 20250625
*/
@Slf4j
public class LiteFlowService {
public LiteFlowService() {
createNode("start-amis-node", NodeTypeEnum.COMMON, StartNode.class);
createNode("end-amis-node", NodeTypeEnum.COMMON, EndNode.class);
createNode("llm-amis-node", NodeTypeEnum.COMMON, LlmNode.class);
}
private static void createNode(String name, NodeTypeEnum type, Class<? extends NodeComponent> clazz) {
LiteFlowNodeBuilder.createNode()
.setId(name)
.setName(name)
.setType(type)
.setClazz(clazz)
.build();
}
public static void main(String[] args) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new EclipseCollectionsModule());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// language=JSON
String source = """
{
"nodes": [
{
"id": "A",
"type": "start",
"position": {
"x": 8,
"y": 272
},
"data": {},
"measured": {
"width": 256,
"height": 75
},
"selected": false,
"dragging": false
},
{
"id": "F",
"type": "end",
"position": {
"x": 1439.5556937134281,
"y": 282.2797340760818
},
"data": {},
"measured": {
"width": 256,
"height": 75
},
"selected": false,
"dragging": false
},
{
"id": "C",
"type": "normal",
"position": {
"x": 902.6781018665707,
"y": 115.31234529524048
},
"data": {},
"measured": {
"width": 256,
"height": 75
},
"selected": false,
"dragging": false
},
{
"id": "B",
"type": "normal",
"position": {
"x": 338,
"y": 287
},
"data": {},
"measured": {
"width": 256,
"height": 75
},
"selected": false,
"dragging": false
},
{
"id": "E",
"type": "normal",
"position": {
"x": 1086.6322978498904,
"y": 371.3061114283591
},
"data": {},
"measured": {
"width": 256,
"height": 75
},
"selected": true,
"dragging": false
},
{
"id": "D",
"type": "normal",
"position": {
"x": 700.0944461714178,
"y": 369.84258971430364
},
"data": {},
"measured": {
"width": 256,
"height": 75
},
"selected": false,
"dragging": false
}
],
"edges": [
{
"source": "A",
"target": "B",
"id": "xy-edge__A-B"
},
{
"source": "B",
"target": "C",
"id": "xy-edge__B-C"
},
{
"source": "C",
"target": "F",
"id": "xy-edge__C-F"
},
{
"source": "D",
"target": "E",
"id": "xy-edge__D-E"
},
{
"source": "B",
"target": "D",
"id": "xy-edge__B-D"
},
{
"source": "E",
"target": "F",
"id": "xy-edge__E-F"
}
],
"data": {
"A": {
"inputs": {
"name": {
"type": "text"
},
"description": {
"type": "text",
"description": "文件描述"
}
}
},
"C": {
"model": "qwen3",
"outputs": {
"text": {
"type": "string"
}
},
"systemPrompt": "你是个沙雕"
},
"B": {
"count": 3,
"score": 0.75,
"knowledgeId": 3585368238960640,
"query": "hello world"
},
"E": {
"type": "python",
"content": "code='hello'\\nprint(code)"
},
"D": {
"model": "qwen3",
"outputs": {
"text": {
"type": "string"
}
},
"systemPrompt": "你是个聪明人"
}
}
}
""";
FlowData root = mapper.readValue(StrUtil.trim(source), FlowData.class);
log.info("\n{}", buildEl(root.nodes, root.edges));
}
public static String buildEl(ImmutableList<FlowData.Node> nodes, ImmutableList<FlowData.Edge> edges) {
var nodeMap = nodes.toMap(FlowData.Node::getId, node -> node);
var adjacencyGraph = Maps.mutable.<String, MutableList<String>>empty();
var reverseAdjacencyGraph = Maps.mutable.<String, MutableList<String>>empty();
var inDegree = Maps.mutable.<String, Integer>empty();
nodes.forEach(node -> {
adjacencyGraph.put(node.getId(), Lists.mutable.empty());
reverseAdjacencyGraph.put(node.getId(), Lists.mutable.empty());
inDegree.put(node.getId(), 0);
});
edges.forEach(edge -> {
adjacencyGraph.get(edge.getSource()).add(edge.getTarget());
reverseAdjacencyGraph.get(edge.getTarget()).add(edge.getSource());
inDegree.put(edge.getTarget(), inDegree.get(edge.getTarget()) + 1);
});
Queue<String> queue = new LinkedList<>();
var topologicalSortedList = Lists.mutable.<String>empty();
inDegree.forEachKeyValue((id, count) -> {
if (count == 0) {
queue.offer(id);
}
});
while (!queue.isEmpty()) {
String id = queue.poll();
topologicalSortedList.add(id);
for (var neighborId : adjacencyGraph.get(id)) {
inDegree.put(neighborId, inDegree.get(neighborId) - 1);
if (inDegree.get(neighborId) == 0) {
queue.offer(neighborId);
}
}
}
topologicalSortedList.forEach(id -> log.info("{} {}", id, adjacencyGraph.get(id)));
topologicalSortedList.forEach(id -> log.info("{} {}", id, reverseAdjacencyGraph.get(id)));
var nodeQueue = new LinkedList<>(topologicalSortedList);
var chains = Lists.mutable.<MutableList<String>>empty();
while (!nodeQueue.isEmpty()) {
String currentId = nodeQueue.poll();
var subChain = Lists.mutable.<String>empty();
while (true) {
subChain.add(currentId);
nodeQueue.remove(currentId);
if (adjacencyGraph.get(currentId).size() != 1) {
break;
}
String nextId = adjacencyGraph.get(currentId).get(0);
if (reverseAdjacencyGraph.get(nextId).size() > 1) {
break;
}
currentId = nextId;
}
chains.add(subChain);
}
log.info("{}", chains);
return StrUtil.join(",", topologicalSortedList);
}
@Data
public static class FlowData {
private ImmutableList<Node> nodes;
private ImmutableList<Edge> edges;
private ImmutableMap<String, Object> data;
@Data
public static class Node {
private String id;
private String type;
}
@Data
public static class Edge {
private String id;
private String source;
private String target;
}
}
}

View File

@@ -0,0 +1,12 @@
package com.lanyuanxiaoyao.service.ai.web.flow;
/**
* @author lanyuanxiaoyao
* @version 20250625
*/
public class LlmNode extends BaseNode {
@Override
public void process() throws Exception {
}
}

View File

@@ -0,0 +1,12 @@
package com.lanyuanxiaoyao.service.ai.web.flow;
/**
* @author lanyuanxiaoyao
* @version 20250625
*/
public class StartNode extends BaseNode {
@Override
public void process() throws Exception {
}
}

View File

@@ -4,6 +4,7 @@ import {
BackgroundVariant,
type Connection,
Controls,
getIncomers,
getOutgoers,
MiniMap,
type Node,
@@ -17,11 +18,13 @@ import {arrToMap, find, findIdx, isEqual, isNil, randomId} from 'licia'
import {type JSX, useState} from 'react'
import styled from 'styled-components'
import '@xyflow/react/dist/style.css'
import Queue from 'yocto-queue'
import {amisRender, commonInfo, horizontalFormOptions} from '../../../util/amis.tsx'
import CodeNode from './node/CodeNode.tsx'
import EndNode from './node/EndNode.tsx'
import KnowledgeNode from './node/KnowledgeNode.tsx'
import LlmNode from './node/LlmNode.tsx'
import ParallelNode from './node/ParallelNode.tsx'
import StartNode from './node/StartNode.tsx'
import {useDataStore} from './store/DataStore.ts'
import {useFlowStore} from './store/FlowStore.ts'
@@ -71,27 +74,32 @@ function FlowEditor() {
component: (props: NodeProps) => JSX.Element
}[]>([
{
key: 'start-amis-node',
key: 'start-node',
name: '开始',
component: StartNode,
},
{
key: 'end-amis-node',
key: 'end-node',
name: '结束',
component: EndNode,
},
{
key: 'llm-amis-node',
key: 'parallel-node',
name: '并行',
component: ParallelNode,
},
{
key: 'llm-node',
name: '大模型',
component: LlmNode,
},
{
key: 'knowledge-amis-node',
key: 'knowledge-node',
name: '知识库',
component: KnowledgeNode,
},
{
key: 'code-amis-node',
key: 'code-node',
name: '代码执行',
component: CodeNode,
},
@@ -183,10 +191,10 @@ function FlowEditor() {
}
const checkNode = (type: string) => {
if (isEqual(type, 'start-amis-node') && findIdx(nodes, (node: Node) => isEqual(type, node.type)) > -1) {
if (isEqual(type, 'start-node') && findIdx(nodes, (node: Node) => isEqual(type, node.type)) > -1) {
throw new Error('只能存在1个开始节点')
}
if (isEqual(type, 'end-amis-node') && findIdx(nodes, (node: Node) => isEqual(type, node.type)) > -1) {
if (isEqual(type, 'end-node') && findIdx(nodes, (node: Node) => isEqual(type, node.type)) > -1) {
throw new Error('只能存在1个结束节点')
}
}
@@ -201,12 +209,12 @@ function FlowEditor() {
throw new Error('连线目标节点未找到')
}
// 禁止短路整个流程
if (isEqual('start-amis-node', sourceNode.type) && isEqual('end-amis-node', targetNode.type)) {
if (isEqual('start-node', sourceNode.type) && isEqual('end-node', targetNode.type)) {
throw new Error('开始节点不能直连结束节点')
}
// 禁止流程出现环,必须是有向无环图
const hasCycle = (node: Node, visited = new Set()) => {
const hasCycle = (node: Node, visited = new Set<string>()) => {
if (visited.has(node.id)) return false
visited.add(node.id)
for (const outgoer of getOutgoers(node, nodes, edges)) {
@@ -219,11 +227,35 @@ function FlowEditor() {
} else if (hasCycle(targetNode)) {
throw new Error('禁止流程循环')
}
const hasRedundant = (source: Node, target: Node) => {
const visited = new Set<string>()
const queue = new Queue<Node>()
queue.enqueue(source)
visited.add(source.id)
while (queue.size > 0) {
const current = queue.dequeue()!
console.log(current.id)
for (const incomer of getIncomers(current, nodes, edges)) {
if (isEqual(incomer.id, target.id)) {
return true
}
if (!visited.has(incomer.id)) {
visited.add(incomer.id)
queue.enqueue(incomer)
}
}
}
return false
}
if (hasRedundant(sourceNode, targetNode)) {
throw new Error('出现冗余边')
}
}
useMount(() => {
// language=JSON
let initialData = JSON.parse('{\n "nodes": [],\n "edges": [],\n "data": {}\n}')
let initialData = JSON.parse('{\n "nodes": [\n {\n "id": "lurod0PM-J",\n "type": "parallel-node",\n "position": {\n "x": -156,\n "y": 77\n },\n "data": {},\n "measured": {\n "width": 256,\n "height": 75\n },\n "selected": false,\n "dragging": false\n },\n {\n "id": "ldoKAzHnKF",\n "type": "llm-node",\n "position": {\n "x": 207,\n "y": -38\n },\n "data": {},\n "measured": {\n "width": 256,\n "height": 75\n },\n "selected": false,\n "dragging": false\n },\n {\n "id": "1eJtMoJWs6",\n "type": "llm-node",\n "position": {\n "x": 207,\n "y": 172.5\n },\n "data": {},\n "measured": {\n "width": 256,\n "height": 75\n },\n "selected": false,\n "dragging": false\n }\n ],\n "edges": [\n {\n "source": "lurod0PM-J",\n "target": "1eJtMoJWs6",\n "id": "xy-edge__lurod0PM-J-1eJtMoJWs6"\n },\n {\n "source": "lurod0PM-J",\n "target": "ldoKAzHnKF",\n "id": "xy-edge__lurod0PM-J-ldoKAzHnKF"\n }\n ],\n "data": {}\n}')
let initialNodes = initialData['nodes'] ?? []
let initialEdges = initialData['edges'] ?? []

View File

@@ -1,8 +1,8 @@
import {DeleteFilled, EditFilled} from '@ant-design/icons'
import {Handle, type NodeProps, Position} from '@xyflow/react'
import {Handle, type HandleProps, type NodeProps, Position, useNodeConnections} from '@xyflow/react'
import type {Schema} from 'amis'
import {Card, Dropdown} from 'antd'
import {isEmpty, isEqual} from 'licia'
import {isEmpty, isEqual, isNil} from 'licia'
import {type JSX} from 'react'
import {horizontalFormOptions} from '../../../../util/amis.tsx'
@@ -55,13 +55,26 @@ export function outputsFormColumns(editable: boolean = false, required: boolean
]
}
interface AmisNodeProps {
nodeProps: NodeProps,
type: AmisNodeType,
defaultNodeName: String,
defaultNodeDescription?: String,
extraNodeDescription?: (nodeData: any) => JSX.Element,
columnSchema?: Schema[],
export const LimitHandler = (props: HandleProps & { limit: number }) => {
const connections = useNodeConnections({
handleType: props.type,
})
return (
<Handle
{...props}
isConnectable={connections.length < props.limit}
/>
)
}
type AmisNodeProps = {
nodeProps: NodeProps
type: AmisNodeType
defaultNodeName: String
defaultNodeDescription?: String
extraNodeDescription?: (nodeData: any) => JSX.Element
handlers?: (nodeData: any) => JSX.Element
columnSchema?: Schema[]
}
const AmisNode: (props: AmisNodeProps) => JSX.Element = ({
@@ -70,6 +83,7 @@ const AmisNode: (props: AmisNodeProps) => JSX.Element = ({
defaultNodeName,
defaultNodeDescription,
extraNodeDescription,
handlers,
columnSchema,
}) => {
const {id, data} = nodeProps
@@ -142,10 +156,14 @@ const AmisNode: (props: AmisNodeProps) => JSX.Element = ({
</div>
</Card>
</Dropdown>
{isEqual(type, 'start') || isEqual(type, 'normal')
? <Handle type="source" position={Position.Right}/> : undefined}
{isEqual(type, 'end') || isEqual(type, 'normal')
? <Handle type="target" position={Position.Left}/> : undefined}
{isNil(handlers)
? <>
{isEqual(type, 'start') || isEqual(type, 'normal')
? <LimitHandler type="source" position={Position.Right} limit={1}/> : undefined}
{isEqual(type, 'end') || isEqual(type, 'normal')
? <Handle type="target" position={Position.Left}/> : undefined}
</>
: handlers?.(nodeData)}
</div>
)
}

View File

@@ -0,0 +1,19 @@
import {Handle, type NodeProps, Position} from '@xyflow/react'
import AmisNode, {LimitHandler} from './AmisNode.tsx'
const ParallelNode = (props: NodeProps) => AmisNode({
nodeProps: props,
type: 'normal',
defaultNodeName: '并行节点',
defaultNodeDescription: '允许开启并行流程',
handlers: () => {
return (
<>
<Handle type="source" position={Position.Right}/>
<LimitHandler type="target" position={Position.Left} limit={1}/>
</>
)
},
})
export default ParallelNode