diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowExecutor.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowExecutor.java index 85fd701..87eb4fb 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowExecutor.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowExecutor.java @@ -1,11 +1,8 @@ package com.lanyuanxiaoyao.service.ai.web.engine; -import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowContext; -import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode; -import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNodeRunner; -import java.lang.reflect.InvocationTargetException; +import com.lanyuanxiaoyao.service.ai.web.engine.store.FlowStore; import java.util.LinkedList; import java.util.Queue; import org.eclipse.collections.api.map.ImmutableMap; @@ -17,25 +14,17 @@ import org.eclipse.collections.api.map.ImmutableMap; * @version 20250630 */ public class FlowExecutor { + private final FlowStore flowStore; private final ImmutableMap> runnerMap; + private final Queue executionQueue = new LinkedList<>(); - public FlowExecutor(ImmutableMap> runnerMap) { + public FlowExecutor(FlowStore flowStore, ImmutableMap> runnerMap) { + this.flowStore = flowStore; this.runnerMap = runnerMap; } - public void execute(FlowGraph graph) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { - var nodeInputMap = graph.edges().groupBy(FlowEdge::source); - var nodeOutputMap = graph.edges().groupBy(FlowEdge::target); - var startNodes = graph.nodes().select(node -> !nodeInputMap.containsKey(node.id())); - Queue queue = new LinkedList<>(); - for (FlowNode node : startNodes) { - queue.offer(node); - } - while (!queue.isEmpty()) { - var node = queue.poll(); - var clazz = runnerMap.get(node.type()); - var runner = clazz.getDeclaredConstructor(FlowContext.class).newInstance(); - runner.run(); - } + public void execute(FlowGraph graph) { + var runner = new FlowGraphRunner(graph, flowStore, runnerMap); + runner.run(); } } diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowGraphRunner.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowGraphRunner.java new file mode 100644 index 0000000..1fcd2f0 --- /dev/null +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowGraphRunner.java @@ -0,0 +1,76 @@ +package com.lanyuanxiaoyao.service.ai.web.engine; + +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowContext; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode; +import com.lanyuanxiaoyao.service.ai.web.engine.store.FlowStore; +import java.util.LinkedList; +import java.util.Queue; +import lombok.SneakyThrows; +import org.eclipse.collections.api.map.ImmutableMap; +import org.eclipse.collections.api.multimap.set.ImmutableSetMultimap; + +/** + * Graph执行器 + * + * @author lanyuanxiaoyao + * @version 20250701 + */ +public final class FlowGraphRunner implements Runnable { + private final FlowGraph flowGraph; + private final FlowStore flowStore; + private final ImmutableMap> nodeRunnerClass; + private final Queue executionQueue = new LinkedList<>(); + private final ImmutableSetMultimap nodeInputMap; + private final ImmutableSetMultimap nodeOutputMap; + private final ImmutableMap nodeMap; + + public FlowGraphRunner(FlowGraph flowGraph, FlowStore flowStore, ImmutableMap> nodeRunnerClass) { + this.flowGraph = flowGraph; + this.flowStore = flowStore; + this.nodeRunnerClass = nodeRunnerClass; + + nodeInputMap = flowGraph.edges().groupBy(FlowEdge::target); + nodeOutputMap = flowGraph.edges().groupBy(FlowEdge::source); + nodeMap = flowGraph.nodes().toImmutableMap(FlowNode::id, node -> node); + } + + @SneakyThrows + @Override + public void run() { + flowStore.init(flowGraph); + flowStore.updateGraphToRunning(flowGraph.id()); + + var context = new FlowContext(); + for (FlowNode node : flowGraph.nodes()) { + executionQueue.offer(node); + } + while (!executionQueue.isEmpty()) { + var node = executionQueue.poll(); + if (readyForRunning(node)) { + flowStore.updateNodeToRunning(flowGraph.id(), node.id()); + + var runnerClazz = nodeRunnerClass.get(node.type()); + var runner = runnerClazz.getDeclaredConstructor().newInstance(); + runner.setNodeId(node.id()); + runner.setContext(context); + runner.run(); + + flowStore.updateNodeToFinished(flowGraph.id(), node.id()); + + for (FlowEdge edge : nodeOutputMap.get(node.id())) { + flowStore.updateEdgeToExecute(flowGraph.id(), edge.id()); + executionQueue.offer(nodeMap.get(edge.target())); + } + } + } + + flowStore.updateGraphToFinished(flowGraph.id()); + } + + private boolean readyForRunning(FlowNode node) { + return (!nodeInputMap.containsKey(node.id()) || nodeInputMap.get(node.id()).allSatisfy(edge -> flowStore.checkEdgeStatus(flowGraph.id(), edge.id(), FlowEdge.Status.EXECUTE, FlowEdge.Status.SKIP))) + && flowStore.checkNodeStatus(flowGraph.id(), node.id(), FlowNode.Status.INITIAL); + } +} diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowNodeRunner.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowNodeRunner.java new file mode 100644 index 0000000..1e99e18 --- /dev/null +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowNodeRunner.java @@ -0,0 +1,31 @@ +package com.lanyuanxiaoyao.service.ai.web.engine; + +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowContext; +import lombok.Getter; + +public abstract class FlowNodeRunner { + @Getter + private String nodeId; + @Getter + private FlowContext context; + + public abstract void run(); + + void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + void setContext(FlowContext context) { + this.context = context; + } + + protected T getData(String key) { + var data = context.get(nodeId); + return (T) data.get(key); + } + + protected void setData(String key, T value) { + var data = context.get(nodeId); + data.put(key, value); + } +} diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowContext.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowContext.java index 39ee6c9..e0cc7e7 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowContext.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowContext.java @@ -6,5 +6,12 @@ import org.eclipse.collections.api.map.MutableMap; @Data public class FlowContext { - private MutableMap data = Maps.mutable.empty().asSynchronized(); + private MutableMap> data = Maps.mutable.>empty().asSynchronized(); + + public MutableMap get(String key) { + if (!data.containsKey(key)) { + data.put(key, Maps.mutable.empty().asSynchronized()); + } + return data.get(key); + } } diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowEdge.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowEdge.java index 76d80ec..2e5b0dc 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowEdge.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowEdge.java @@ -1,5 +1,7 @@ package com.lanyuanxiaoyao.service.ai.web.engine.entity; +import java.time.LocalDateTime; + /** * 流程图中的边 * @@ -13,4 +15,22 @@ public record FlowEdge( String sourcePoint, String targetPoint ) { + public enum Status { + INITIAL, EXECUTE, SKIP + } + + public record State( + String id, + Status status, + LocalDateTime startingTime, + LocalDateTime finishedTime + ) { + public State(String edgeId) { + this(edgeId, Status.INITIAL, LocalDateTime.now(), null); + } + + public State(String edgeId, Status status) { + this(edgeId, status, LocalDateTime.now(), null); + } + } } diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowGraph.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowGraph.java index d11be20..a9ec6fe 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowGraph.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowGraph.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.ai.web.engine.entity; +import java.time.LocalDateTime; import org.eclipse.collections.api.set.ImmutableSet; /** @@ -13,4 +14,22 @@ public record FlowGraph( ImmutableSet nodes, ImmutableSet edges ) { + public enum Status { + INITIAL, RUNNING, FINISHED, ERROR + } + + public record State( + String id, + Status status, + LocalDateTime startingTime, + LocalDateTime finishedTime + ) { + public State(String id) { + this(id, Status.INITIAL, LocalDateTime.now(), null); + } + + public State(String id, Status status) { + this(id, status, LocalDateTime.now(), null); + } + } } diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNode.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNode.java index cba56d2..42e61ed 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNode.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNode.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.ai.web.engine.entity; +import java.time.LocalDateTime; import org.eclipse.collections.api.set.ImmutableSet; /** @@ -14,4 +15,22 @@ public record FlowNode( ImmutableSet inputPoints, ImmutableSet outputPoints ) { + public enum Status { + INITIAL, RUNNING, FINISHED, SKIPPED + } + + public record State( + String id, + Status status, + LocalDateTime startingTime, + LocalDateTime finishedTime + ) { + public State(String nodeId) { + this(nodeId, Status.INITIAL, LocalDateTime.now(), null); + } + + public State(String nodeId, Status status) { + this(nodeId, status, LocalDateTime.now(), null); + } + } } diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNodeRunner.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNodeRunner.java deleted file mode 100644 index 85c7d4f..0000000 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNodeRunner.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.lanyuanxiaoyao.service.ai.web.engine.entity; - -public abstract class FlowNodeRunner implements Runnable { - private final FlowContext context; -} diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/FlowStore.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/FlowStore.java new file mode 100644 index 0000000..c6218b0 --- /dev/null +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/FlowStore.java @@ -0,0 +1,37 @@ +package com.lanyuanxiaoyao.service.ai.web.engine.store; + +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode; + +/** + * 存储状态 + * + * @author lanyuanxiaoyao + * @version 20250701 + */ +public interface FlowStore { + void init(FlowGraph flowGraph); + + void updateGraphToRunning(String graphId); + + void updateGraphToFinished(String graphId); + + void updateGraphToError(String graphId); + + void updateNodeToRunning(String graphId, String nodeId); + + void updateNodeToSkipped(String graphId, String nodeId); + + void updateNodeToFinished(String graphId, String nodeId); + + void updateEdgeToExecute(String graphId, String edgeId); + + void updateEdgeToSkip(String graphId, String edgeId); + + boolean checkNodeStatus(String graphId, String nodeId, FlowNode.Status... statuses); + + boolean checkEdgeStatus(String graphId, String edgeId, FlowEdge.Status... statuses); + + void print(); +} diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/InMemoryFlowStore.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/InMemoryFlowStore.java new file mode 100644 index 0000000..18c6640 --- /dev/null +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/InMemoryFlowStore.java @@ -0,0 +1,138 @@ +package com.lanyuanxiaoyao.service.ai.web.engine.store; + +import cn.hutool.core.util.ArrayUtil; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.map.MutableMap; + +/** + * 基于内存的流程状态存储 + * + * @author lanyuanxiaoyao + * @version 20250701 + */ +@Slf4j +public class InMemoryFlowStore implements FlowStore { + private static final MutableMap flowGraphStateMap = Maps.mutable.empty().asSynchronized(); + private static final MutableMap flowNodeStateMap = Maps.mutable.empty().asSynchronized(); + private static final MutableMap flowEdgeStateMap = Maps.mutable.empty().asSynchronized(); + + private String multiKey(String... key) { + return String.join("-", key); + } + + @Override + public void init(FlowGraph flowGraph) { + flowGraphStateMap.put(flowGraph.id(), new FlowGraph.State(flowGraph.id())); + for (FlowNode node : flowGraph.nodes()) { + flowNodeStateMap.put(multiKey(flowGraph.id(), node.id()), new FlowNode.State(node.id())); + } + for (FlowEdge edge : flowGraph.edges()) { + flowEdgeStateMap.put(multiKey(flowGraph.id(), edge.id()), new FlowEdge.State(edge.id())); + } + } + + @Override + public void updateGraphToRunning(String graphId) { + flowGraphStateMap.updateValue( + graphId, + () -> new FlowGraph.State(graphId, FlowGraph.Status.RUNNING), + old -> new FlowGraph.State(graphId, FlowGraph.Status.RUNNING, old.startingTime(), old.finishedTime()) + ); + } + + @Override + public void updateGraphToFinished(String graphId) { + flowGraphStateMap.updateValue( + graphId, + () -> new FlowGraph.State(graphId, FlowGraph.Status.FINISHED), + old -> new FlowGraph.State(graphId, FlowGraph.Status.FINISHED, old.startingTime(), old.finishedTime()) + ); + } + + @Override + public void updateGraphToError(String graphId) { + flowGraphStateMap.updateValue( + graphId, + () -> new FlowGraph.State(graphId, FlowGraph.Status.ERROR), + old -> new FlowGraph.State(graphId, FlowGraph.Status.ERROR, old.startingTime(), old.finishedTime()) + ); + } + + @Override + public void updateNodeToRunning(String graphId, String nodeId) { + flowNodeStateMap.updateValue( + multiKey(graphId, nodeId), + () -> new FlowNode.State(nodeId, FlowNode.Status.RUNNING), + old -> new FlowNode.State(nodeId, FlowNode.Status.RUNNING, old.startingTime(), old.finishedTime()) + ); + } + + @Override + public void updateNodeToSkipped(String graphId, String nodeId) { + flowNodeStateMap.updateValue( + multiKey(graphId, nodeId), + () -> new FlowNode.State(nodeId, FlowNode.Status.SKIPPED), + old -> new FlowNode.State(nodeId, FlowNode.Status.SKIPPED, old.startingTime(), old.finishedTime()) + ); + } + + @Override + public void updateNodeToFinished(String graphId, String nodeId) { + flowNodeStateMap.updateValue( + multiKey(graphId, nodeId), + () -> new FlowNode.State(nodeId, FlowNode.Status.FINISHED), + old -> new FlowNode.State(nodeId, FlowNode.Status.FINISHED, old.startingTime(), old.finishedTime()) + ); + } + + @Override + public void updateEdgeToExecute(String graphId, String edgeId) { + flowEdgeStateMap.updateValue( + multiKey(graphId, edgeId), + () -> new FlowEdge.State(edgeId, FlowEdge.Status.EXECUTE), + old -> new FlowEdge.State(edgeId, FlowEdge.Status.EXECUTE, old.startingTime(), old.finishedTime()) + ); + } + + @Override + public void updateEdgeToSkip(String graphId, String edgeId) { + flowEdgeStateMap.updateValue( + multiKey(graphId, edgeId), + () -> new FlowEdge.State(edgeId, FlowEdge.Status.SKIP), + old -> new FlowEdge.State(edgeId, FlowEdge.Status.SKIP, old.startingTime(), old.finishedTime()) + ); + } + + @Override + public boolean checkNodeStatus(String graphId, String nodeId, FlowNode.Status... statuses) { + String key = multiKey(graphId, nodeId); + if (flowNodeStateMap.containsKey(key)) { + return ArrayUtil.contains(statuses, flowNodeStateMap.get(key).status()); + } + return false; + } + + @Override + public boolean checkEdgeStatus(String graphId, String edgeId, FlowEdge.Status... statuses) { + String key = multiKey(graphId, edgeId); + if (flowEdgeStateMap.containsKey(key)) { + return ArrayUtil.contains(statuses, flowEdgeStateMap.get(key).status()); + } + return false; + } + + @Override + public void print() { + log.info("====== Flow Store ======"); + log.info("====== Flow Graph ======"); + flowGraphStateMap.forEachKeyValue((key, value) -> log.info("{}: {}", key, value.status())); + log.info("====== Flow Node ======"); + flowNodeStateMap.forEachKeyValue((key, value) -> log.info("{}: {}", key, value.status())); + log.info("====== Flow Edge ======"); + flowEdgeStateMap.forEachKeyValue((key, value) -> log.info("{}: {}", key, value.status())); + } +} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/TestFlow.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/TestFlow.java new file mode 100644 index 0000000..01d9707 --- /dev/null +++ b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/TestFlow.java @@ -0,0 +1,59 @@ +package com.lanyuanxiaoyao.service.ai.web; + +import com.lanyuanxiaoyao.service.ai.web.engine.FlowExecutor; +import com.lanyuanxiaoyao.service.ai.web.engine.FlowNodeRunner; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph; +import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode; +import com.lanyuanxiaoyao.service.ai.web.engine.store.InMemoryFlowStore; +import java.lang.reflect.InvocationTargetException; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.collections.api.factory.Maps; +import org.eclipse.collections.api.factory.Sets; + +/** + * @author lanyuanxiaoyao + * @version 20250701 + */ +@Slf4j +public class TestFlow { + public static void main(String[] args) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + var store = new InMemoryFlowStore(); + var executor = new FlowExecutor( + store, + Maps.immutable.of( + "plain-node", PlainNode.class + ) + ); + var graph = new FlowGraph( + "graph-1", + Sets.immutable.of( + new FlowNode("node-1", "plain-node", Sets.immutable.empty(), Sets.immutable.of("target")), + new FlowNode("node-2", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), + new FlowNode("node-4", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), + new FlowNode("node-6", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), + new FlowNode("node-7", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), + new FlowNode("node-5", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), + new FlowNode("node-3", "plain-node", Sets.immutable.of("source"), Sets.immutable.empty()) + ), + Sets.immutable.of( + new FlowEdge("edge-1", "node-1", "node-2", null, null), + new FlowEdge("edge-2", "node-2", "node-4", null, null), + new FlowEdge("edge-3", "node-2", "node-5", null, null), + new FlowEdge("edge-4", "node-4", "node-6", null, null), + new FlowEdge("edge-5", "node-6", "node-7", null, null), + new FlowEdge("edge-6", "node-7", "node-3", null, null), + new FlowEdge("edge-7", "node-5", "node-3", null, null) + ) + ); + executor.execute(graph); + store.print(); + } + + public static class PlainNode extends FlowNodeRunner { + @Override + public void run() { + log.info("run node id: {}", getNodeId()); + } + } +} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/BaseNode.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/BaseNode.java deleted file mode 100644 index 127a8ec..0000000 --- a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/BaseNode.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.lanyuanxiaoyao.service.ai.web.flow; - -import com.yomahub.liteflow.core.NodeComponent; -import lombok.extern.slf4j.Slf4j; - -/** - * @author lanyuanxiaoyao - * @version 20250625 - */ -@Slf4j -public abstract class BaseNode extends NodeComponent { - @Override - public void process() throws Exception { - log.info(getClass().getName()); - } -} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/EndNode.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/EndNode.java deleted file mode 100644 index ef9cffa..0000000 --- a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/EndNode.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.lanyuanxiaoyao.service.ai.web.flow; - -/** - * @author lanyuanxiaoyao - * @version 20250625 - */ -public class EndNode extends BaseNode { - @Override - public void process() throws Exception { - - } -} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LiteFlowService.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LiteFlowService.java deleted file mode 100644 index 954ca45..0000000 --- a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LiteFlowService.java +++ /dev/null @@ -1,304 +0,0 @@ -package com.lanyuanxiaoyao.service.ai.web.flow; - -import cn.hutool.core.util.StrUtil; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule; -import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; -import com.yomahub.liteflow.core.NodeComponent; -import com.yomahub.liteflow.enums.NodeTypeEnum; -import java.util.LinkedList; -import java.util.Queue; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.collections.api.factory.Lists; -import org.eclipse.collections.api.factory.Maps; -import org.eclipse.collections.api.list.ImmutableList; -import org.eclipse.collections.api.list.MutableList; -import org.eclipse.collections.api.map.ImmutableMap; - -/** - * @author lanyuanxiaoyao - * @version 20250625 - */ -@Slf4j -public class LiteFlowService { - public LiteFlowService() { - createNode("start-amis-node", NodeTypeEnum.COMMON, StartNode.class); - createNode("end-amis-node", NodeTypeEnum.COMMON, EndNode.class); - createNode("llm-amis-node", NodeTypeEnum.COMMON, LlmNode.class); - } - - private static void createNode(String name, NodeTypeEnum type, Class clazz) { - LiteFlowNodeBuilder.createNode() - .setId(name) - .setName(name) - .setType(type) - .setClazz(clazz) - .build(); - } - - public static void main(String[] args) throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new EclipseCollectionsModule()); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - // language=JSON - String source = """ - { - "nodes": [ - { - "id": "A", - "type": "start", - "position": { - "x": 8, - "y": 272 - }, - "data": {}, - "measured": { - "width": 256, - "height": 75 - }, - "selected": false, - "dragging": false - }, - { - "id": "F", - "type": "end", - "position": { - "x": 1439.5556937134281, - "y": 282.2797340760818 - }, - "data": {}, - "measured": { - "width": 256, - "height": 75 - }, - "selected": false, - "dragging": false - }, - { - "id": "C", - "type": "normal", - "position": { - "x": 902.6781018665707, - "y": 115.31234529524048 - }, - "data": {}, - "measured": { - "width": 256, - "height": 75 - }, - "selected": false, - "dragging": false - }, - { - "id": "B", - "type": "normal", - "position": { - "x": 338, - "y": 287 - }, - "data": {}, - "measured": { - "width": 256, - "height": 75 - }, - "selected": false, - "dragging": false - }, - { - "id": "E", - "type": "normal", - "position": { - "x": 1086.6322978498904, - "y": 371.3061114283591 - }, - "data": {}, - "measured": { - "width": 256, - "height": 75 - }, - "selected": true, - "dragging": false - }, - { - "id": "D", - "type": "normal", - "position": { - "x": 700.0944461714178, - "y": 369.84258971430364 - }, - "data": {}, - "measured": { - "width": 256, - "height": 75 - }, - "selected": false, - "dragging": false - } - ], - "edges": [ - { - "source": "A", - "target": "B", - "id": "xy-edge__A-B" - }, - { - "source": "B", - "target": "C", - "id": "xy-edge__B-C" - }, - { - "source": "C", - "target": "F", - "id": "xy-edge__C-F" - }, - { - "source": "D", - "target": "E", - "id": "xy-edge__D-E" - }, - { - "source": "B", - "target": "D", - "id": "xy-edge__B-D" - }, - { - "source": "E", - "target": "F", - "id": "xy-edge__E-F" - } - ], - "data": { - "A": { - "inputs": { - "name": { - "type": "text" - }, - "description": { - "type": "text", - "description": "文件描述" - } - } - }, - "C": { - "model": "qwen3", - "outputs": { - "text": { - "type": "string" - } - }, - "systemPrompt": "你是个沙雕" - }, - "B": { - "count": 3, - "score": 0.75, - "knowledgeId": 3585368238960640, - "query": "hello world" - }, - "E": { - "type": "python", - "content": "code='hello'\\nprint(code)" - }, - "D": { - "model": "qwen3", - "outputs": { - "text": { - "type": "string" - } - }, - "systemPrompt": "你是个聪明人" - } - } - } - """; - FlowData root = mapper.readValue(StrUtil.trim(source), FlowData.class); - log.info("\n{}", buildEl(root.nodes, root.edges)); - } - - public static String buildEl(ImmutableList nodes, ImmutableList edges) { - var nodeMap = nodes.toMap(FlowData.Node::getId, node -> node); - var adjacencyGraph = Maps.mutable.>empty(); - var reverseAdjacencyGraph = Maps.mutable.>empty(); - var inDegree = Maps.mutable.empty(); - - nodes.forEach(node -> { - adjacencyGraph.put(node.getId(), Lists.mutable.empty()); - reverseAdjacencyGraph.put(node.getId(), Lists.mutable.empty()); - inDegree.put(node.getId(), 0); - }); - edges.forEach(edge -> { - adjacencyGraph.get(edge.getSource()).add(edge.getTarget()); - reverseAdjacencyGraph.get(edge.getTarget()).add(edge.getSource()); - inDegree.put(edge.getTarget(), inDegree.get(edge.getTarget()) + 1); - }); - - Queue queue = new LinkedList<>(); - var topologicalSortedList = Lists.mutable.empty(); - - inDegree.forEachKeyValue((id, count) -> { - if (count == 0) { - queue.offer(id); - } - }); - - while (!queue.isEmpty()) { - String id = queue.poll(); - topologicalSortedList.add(id); - for (var neighborId : adjacencyGraph.get(id)) { - inDegree.put(neighborId, inDegree.get(neighborId) - 1); - if (inDegree.get(neighborId) == 0) { - queue.offer(neighborId); - } - } - } - - topologicalSortedList.forEach(id -> log.info("{} {}", id, adjacencyGraph.get(id))); - topologicalSortedList.forEach(id -> log.info("{} {}", id, reverseAdjacencyGraph.get(id))); - - var nodeQueue = new LinkedList<>(topologicalSortedList); - var chains = Lists.mutable.>empty(); - while (!nodeQueue.isEmpty()) { - String currentId = nodeQueue.poll(); - var subChain = Lists.mutable.empty(); - while (true) { - subChain.add(currentId); - nodeQueue.remove(currentId); - if (adjacencyGraph.get(currentId).size() != 1) { - break; - } - String nextId = adjacencyGraph.get(currentId).get(0); - if (reverseAdjacencyGraph.get(nextId).size() > 1) { - break; - } - currentId = nextId; - } - chains.add(subChain); - } - - log.info("{}", chains); - - return StrUtil.join(",", topologicalSortedList); - } - - @Data - public static class FlowData { - private ImmutableList nodes; - private ImmutableList edges; - private ImmutableMap data; - - @Data - public static class Node { - private String id; - private String type; - } - - @Data - public static class Edge { - private String id; - private String source; - private String target; - } - } -} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LlmNode.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LlmNode.java deleted file mode 100644 index 4dc9d70..0000000 --- a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/LlmNode.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.lanyuanxiaoyao.service.ai.web.flow; - -/** - * @author lanyuanxiaoyao - * @version 20250625 - */ -public class LlmNode extends BaseNode { - @Override - public void process() throws Exception { - - } -} diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/StartNode.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/StartNode.java deleted file mode 100644 index 4d1a34f..0000000 --- a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/flow/StartNode.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.lanyuanxiaoyao.service.ai.web.flow; - -/** - * @author lanyuanxiaoyao - * @version 20250625 - */ -public class StartNode extends BaseNode { - @Override - public void process() throws Exception { - - } -}