diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowExecutor.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowExecutor.java index 87eb4fb..a92da6c 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowExecutor.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowExecutor.java @@ -1,10 +1,8 @@ package com.lanyuanxiaoyao.service.ai.web.engine; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph; -import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode; import com.lanyuanxiaoyao.service.ai.web.engine.store.FlowStore; -import java.util.LinkedList; -import java.util.Queue; +import java.lang.reflect.InvocationTargetException; import org.eclipse.collections.api.map.ImmutableMap; /** @@ -16,14 +14,13 @@ import org.eclipse.collections.api.map.ImmutableMap; public class FlowExecutor { private final FlowStore flowStore; private final ImmutableMap> runnerMap; - private final Queue executionQueue = new LinkedList<>(); public FlowExecutor(FlowStore flowStore, ImmutableMap> runnerMap) { this.flowStore = flowStore; this.runnerMap = runnerMap; } - public void execute(FlowGraph graph) { + public void execute(FlowGraph graph) throws InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { var runner = new FlowGraphRunner(graph, flowStore, runnerMap); runner.run(); } diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowGraphRunner.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowGraphRunner.java index 1fcd2f0..d185085 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowGraphRunner.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowGraphRunner.java @@ -1,13 +1,14 @@ package com.lanyuanxiaoyao.service.ai.web.engine; +import cn.hutool.core.util.StrUtil; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowContext; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode; import com.lanyuanxiaoyao.service.ai.web.engine.store.FlowStore; +import java.lang.reflect.InvocationTargetException; import java.util.LinkedList; import java.util.Queue; -import lombok.SneakyThrows; import org.eclipse.collections.api.map.ImmutableMap; import org.eclipse.collections.api.multimap.set.ImmutableSetMultimap; @@ -17,7 +18,7 @@ import org.eclipse.collections.api.multimap.set.ImmutableSetMultimap; * @author lanyuanxiaoyao * @version 20250701 */ -public final class FlowGraphRunner implements Runnable { +public final class FlowGraphRunner { private final FlowGraph flowGraph; private final FlowStore flowStore; private final ImmutableMap> nodeRunnerClass; @@ -36,11 +37,8 @@ public final class FlowGraphRunner implements Runnable { nodeMap = flowGraph.nodes().toImmutableMap(FlowNode::id, node -> node); } - @SneakyThrows - @Override - public void run() { + public void run() throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { flowStore.init(flowGraph); - flowStore.updateGraphToRunning(flowGraph.id()); var context = new FlowContext(); for (FlowNode node : flowGraph.nodes()) { @@ -48,7 +46,26 @@ public final class FlowGraphRunner implements Runnable { } while (!executionQueue.isEmpty()) { var node = executionQueue.poll(); - if (readyForRunning(node)) { + process(node, context); + } + } + + private void process(FlowNode node, FlowContext context) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { + if ( + ( + // 没有入节点,即开始节点 + !nodeInputMap.containsKey(node.id()) + // 或者所有入的边状态都已经完成 + || nodeInputMap.get(node.id()).allSatisfy(edge -> flowStore.checkEdgeStatus(flowGraph.id(), edge.id(), FlowEdge.Status.EXECUTE, FlowEdge.Status.SKIP)) + ) + // 当前节点还未执行 + && flowStore.checkNodeStatus(flowGraph.id(), node.id(), FlowNode.Status.INITIAL) + ) { + // 是开始节点或入的边有至少一条是「执行」 + if ( + !nodeInputMap.containsKey(node.id()) + || nodeInputMap.get(node.id()).anySatisfy(edge -> flowStore.checkEdgeStatus(flowGraph.id(), edge.id(), FlowEdge.Status.EXECUTE)) + ) { flowStore.updateNodeToRunning(flowGraph.id(), node.id()); var runnerClazz = nodeRunnerClass.get(node.type()); @@ -57,20 +74,34 @@ public final class FlowGraphRunner implements Runnable { runner.setContext(context); runner.run(); + if (runner instanceof FlowNodeOptionalRunner) { + var targetPoint = ((FlowNodeOptionalRunner) runner).getTargetPoint(); + for (FlowEdge edge : nodeOutputMap.get(node.id())) { + if (StrUtil.equals(targetPoint, edge.sourcePoint())) { + flowStore.updateEdgeToExecute(flowGraph.id(), edge.id()); + } else { + flowStore.updateEdgeToSkip(flowGraph.id(), edge.id()); + } + executionQueue.offer(nodeMap.get(edge.target())); + } + } else { + for (FlowEdge edge : nodeOutputMap.get(node.id())) { + flowStore.updateEdgeToExecute(flowGraph.id(), edge.id()); + executionQueue.offer(nodeMap.get(edge.target())); + } + } + flowStore.updateNodeToFinished(flowGraph.id(), node.id()); + } + // 所有入的边都是跳过,当前节点就跳过 + else { + flowStore.updateNodeToSkipped(flowGraph.id(), node.id()); for (FlowEdge edge : nodeOutputMap.get(node.id())) { - flowStore.updateEdgeToExecute(flowGraph.id(), edge.id()); + flowStore.updateEdgeToSkip(flowGraph.id(), edge.id()); executionQueue.offer(nodeMap.get(edge.target())); } } } - - flowStore.updateGraphToFinished(flowGraph.id()); - } - - private boolean readyForRunning(FlowNode node) { - return (!nodeInputMap.containsKey(node.id()) || nodeInputMap.get(node.id()).allSatisfy(edge -> flowStore.checkEdgeStatus(flowGraph.id(), edge.id(), FlowEdge.Status.EXECUTE, FlowEdge.Status.SKIP))) - && flowStore.checkNodeStatus(flowGraph.id(), node.id(), FlowNode.Status.INITIAL); } } diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowNodeOptionalRunner.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowNodeOptionalRunner.java new file mode 100644 index 0000000..69163a1 --- /dev/null +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/FlowNodeOptionalRunner.java @@ -0,0 +1,15 @@ +package com.lanyuanxiaoyao.service.ai.web.engine; + +import lombok.Getter; + +public abstract class FlowNodeOptionalRunner extends FlowNodeRunner { + @Getter + private String targetPoint; + + public abstract String runOptional(); + + @Override + public void run() { + this.targetPoint = runOptional(); + } +} diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowGraph.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowGraph.java index a9ec6fe..d11be20 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowGraph.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowGraph.java @@ -1,6 +1,5 @@ package com.lanyuanxiaoyao.service.ai.web.engine.entity; -import java.time.LocalDateTime; import org.eclipse.collections.api.set.ImmutableSet; /** @@ -14,22 +13,4 @@ public record FlowGraph( ImmutableSet nodes, ImmutableSet edges ) { - public enum Status { - INITIAL, RUNNING, FINISHED, ERROR - } - - public record State( - String id, - Status status, - LocalDateTime startingTime, - LocalDateTime finishedTime - ) { - public State(String id) { - this(id, Status.INITIAL, LocalDateTime.now(), null); - } - - public State(String id, Status status) { - this(id, status, LocalDateTime.now(), null); - } - } } diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNode.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNode.java index 42e61ed..50a65f1 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNode.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/entity/FlowNode.java @@ -1,7 +1,6 @@ package com.lanyuanxiaoyao.service.ai.web.engine.entity; import java.time.LocalDateTime; -import org.eclipse.collections.api.set.ImmutableSet; /** * 流程图中的节点 @@ -11,9 +10,7 @@ import org.eclipse.collections.api.set.ImmutableSet; */ public record FlowNode( String id, - String type, - ImmutableSet inputPoints, - ImmutableSet outputPoints + String type ) { public enum Status { INITIAL, RUNNING, FINISHED, SKIPPED diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/FlowStore.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/FlowStore.java index c6218b0..c0afe80 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/FlowStore.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/FlowStore.java @@ -13,12 +13,6 @@ import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode; public interface FlowStore { void init(FlowGraph flowGraph); - void updateGraphToRunning(String graphId); - - void updateGraphToFinished(String graphId); - - void updateGraphToError(String graphId); - void updateNodeToRunning(String graphId, String nodeId); void updateNodeToSkipped(String graphId, String nodeId); diff --git a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/InMemoryFlowStore.java b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/InMemoryFlowStore.java index 18c6640..4dece6c 100644 --- a/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/InMemoryFlowStore.java +++ b/service-ai/service-ai-web/src/main/java/com/lanyuanxiaoyao/service/ai/web/engine/store/InMemoryFlowStore.java @@ -16,7 +16,6 @@ import org.eclipse.collections.api.map.MutableMap; */ @Slf4j public class InMemoryFlowStore implements FlowStore { - private static final MutableMap flowGraphStateMap = Maps.mutable.empty().asSynchronized(); private static final MutableMap flowNodeStateMap = Maps.mutable.empty().asSynchronized(); private static final MutableMap flowEdgeStateMap = Maps.mutable.empty().asSynchronized(); @@ -26,7 +25,6 @@ public class InMemoryFlowStore implements FlowStore { @Override public void init(FlowGraph flowGraph) { - flowGraphStateMap.put(flowGraph.id(), new FlowGraph.State(flowGraph.id())); for (FlowNode node : flowGraph.nodes()) { flowNodeStateMap.put(multiKey(flowGraph.id(), node.id()), new FlowNode.State(node.id())); } @@ -35,33 +33,6 @@ public class InMemoryFlowStore implements FlowStore { } } - @Override - public void updateGraphToRunning(String graphId) { - flowGraphStateMap.updateValue( - graphId, - () -> new FlowGraph.State(graphId, FlowGraph.Status.RUNNING), - old -> new FlowGraph.State(graphId, FlowGraph.Status.RUNNING, old.startingTime(), old.finishedTime()) - ); - } - - @Override - public void updateGraphToFinished(String graphId) { - flowGraphStateMap.updateValue( - graphId, - () -> new FlowGraph.State(graphId, FlowGraph.Status.FINISHED), - old -> new FlowGraph.State(graphId, FlowGraph.Status.FINISHED, old.startingTime(), old.finishedTime()) - ); - } - - @Override - public void updateGraphToError(String graphId) { - flowGraphStateMap.updateValue( - graphId, - () -> new FlowGraph.State(graphId, FlowGraph.Status.ERROR), - old -> new FlowGraph.State(graphId, FlowGraph.Status.ERROR, old.startingTime(), old.finishedTime()) - ); - } - @Override public void updateNodeToRunning(String graphId, String nodeId) { flowNodeStateMap.updateValue( @@ -128,8 +99,6 @@ public class InMemoryFlowStore implements FlowStore { @Override public void print() { log.info("====== Flow Store ======"); - log.info("====== Flow Graph ======"); - flowGraphStateMap.forEachKeyValue((key, value) -> log.info("{}: {}", key, value.status())); log.info("====== Flow Node ======"); flowNodeStateMap.forEachKeyValue((key, value) -> log.info("{}: {}", key, value.status())); log.info("====== Flow Edge ======"); diff --git a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/TestFlow.java b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/TestFlow.java index 01d9707..8180b31 100644 --- a/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/TestFlow.java +++ b/service-ai/service-ai-web/src/test/java/com/lanyuanxiaoyao/service/ai/web/TestFlow.java @@ -1,6 +1,7 @@ package com.lanyuanxiaoyao.service.ai.web; import com.lanyuanxiaoyao.service.ai.web.engine.FlowExecutor; +import com.lanyuanxiaoyao.service.ai.web.engine.FlowNodeOptionalRunner; import com.lanyuanxiaoyao.service.ai.web.engine.FlowNodeRunner; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge; import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph; @@ -22,28 +23,39 @@ public class TestFlow { var executor = new FlowExecutor( store, Maps.immutable.of( - "plain-node", PlainNode.class + "plain-node", PlainNode.class, + "option-node", PlainOptionNode.class ) ); + /* + * 4 6 7 + * 1 2 5 8---3 + * \9/ + */ var graph = new FlowGraph( "graph-1", Sets.immutable.of( - new FlowNode("node-1", "plain-node", Sets.immutable.empty(), Sets.immutable.of("target")), - new FlowNode("node-2", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), - new FlowNode("node-4", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), - new FlowNode("node-6", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), - new FlowNode("node-7", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), - new FlowNode("node-5", "plain-node", Sets.immutable.of("source"), Sets.immutable.of("target")), - new FlowNode("node-3", "plain-node", Sets.immutable.of("source"), Sets.immutable.empty()) + new FlowNode("node-1", "plain-node"), + new FlowNode("node-2", "plain-node"), + new FlowNode("node-4", "plain-node"), + new FlowNode("node-6", "plain-node"), + new FlowNode("node-7", "plain-node"), + new FlowNode("node-5", "plain-node"), + new FlowNode("node-8", "option-node"), + new FlowNode("node-9", "plain-node"), + new FlowNode("node-3", "plain-node") ), Sets.immutable.of( new FlowEdge("edge-1", "node-1", "node-2", null, null), new FlowEdge("edge-2", "node-2", "node-4", null, null), new FlowEdge("edge-3", "node-2", "node-5", null, null), - new FlowEdge("edge-4", "node-4", "node-6", null, null), - new FlowEdge("edge-5", "node-6", "node-7", null, null), - new FlowEdge("edge-6", "node-7", "node-3", null, null), - new FlowEdge("edge-7", "node-5", "node-3", null, null) + new FlowEdge("edge-4", "node-5", "node-8", null, null), + new FlowEdge("edge-5", "node-8", "node-9", "yes", null), + new FlowEdge("edge-6", "node-8", "node-3", "no", null), + new FlowEdge("edge-7", "node-9", "node-3", null, null), + new FlowEdge("edge-8", "node-4", "node-6", null, null), + new FlowEdge("edge-9", "node-6", "node-7", null, null), + new FlowEdge("edge-10", "node-7", "node-3", null, null) ) ); executor.execute(graph); @@ -56,4 +68,13 @@ public class TestFlow { log.info("run node id: {}", getNodeId()); } } + + public static class PlainOptionNode extends FlowNodeOptionalRunner { + @Override + public String runOptional() { + log.info("run node id: {}", getNodeId()); + // yes / no + return "no"; + } + } }