feat(ai-web): 完成流程引擎雏形

This commit is contained in:
v-zhangjc9
2025-07-01 15:33:07 +08:00
parent e59e89a5ad
commit c5c62ab713
16 changed files with 415 additions and 381 deletions

View File

@@ -1,11 +1,8 @@
package com.lanyuanxiaoyao.service.ai.web.engine;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowContext;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNodeRunner;
import java.lang.reflect.InvocationTargetException;
import com.lanyuanxiaoyao.service.ai.web.engine.store.FlowStore;
import java.util.LinkedList;
import java.util.Queue;
import org.eclipse.collections.api.map.ImmutableMap;
@@ -17,25 +14,17 @@ import org.eclipse.collections.api.map.ImmutableMap;
* @version 20250630
*/
public class FlowExecutor {
private final FlowStore flowStore;
private final ImmutableMap<String, Class<? extends FlowNodeRunner>> runnerMap;
private final Queue<FlowNode> executionQueue = new LinkedList<>();
public FlowExecutor(ImmutableMap<String, Class<? extends FlowNodeRunner>> runnerMap) {
public FlowExecutor(FlowStore flowStore, ImmutableMap<String, Class<? extends FlowNodeRunner>> runnerMap) {
this.flowStore = flowStore;
this.runnerMap = runnerMap;
}
public void execute(FlowGraph graph) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
var nodeInputMap = graph.edges().groupBy(FlowEdge::source);
var nodeOutputMap = graph.edges().groupBy(FlowEdge::target);
var startNodes = graph.nodes().select(node -> !nodeInputMap.containsKey(node.id()));
Queue<FlowNode> queue = new LinkedList<>();
for (FlowNode node : startNodes) {
queue.offer(node);
}
while (!queue.isEmpty()) {
var node = queue.poll();
var clazz = runnerMap.get(node.type());
var runner = clazz.getDeclaredConstructor(FlowContext.class).newInstance();
runner.run();
}
public void execute(FlowGraph graph) {
var runner = new FlowGraphRunner(graph, flowStore, runnerMap);
runner.run();
}
}

View File

@@ -0,0 +1,76 @@
package com.lanyuanxiaoyao.service.ai.web.engine;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowContext;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode;
import com.lanyuanxiaoyao.service.ai.web.engine.store.FlowStore;
import java.util.LinkedList;
import java.util.Queue;
import lombok.SneakyThrows;
import org.eclipse.collections.api.map.ImmutableMap;
import org.eclipse.collections.api.multimap.set.ImmutableSetMultimap;
/**
* Graph执行器
*
* @author lanyuanxiaoyao
* @version 20250701
*/
public final class FlowGraphRunner implements Runnable {
private final FlowGraph flowGraph;
private final FlowStore flowStore;
private final ImmutableMap<String, Class<? extends FlowNodeRunner>> nodeRunnerClass;
private final Queue<FlowNode> executionQueue = new LinkedList<>();
private final ImmutableSetMultimap<String, FlowEdge> nodeInputMap;
private final ImmutableSetMultimap<String, FlowEdge> nodeOutputMap;
private final ImmutableMap<String, FlowNode> nodeMap;
public FlowGraphRunner(FlowGraph flowGraph, FlowStore flowStore, ImmutableMap<String, Class<? extends FlowNodeRunner>> nodeRunnerClass) {
this.flowGraph = flowGraph;
this.flowStore = flowStore;
this.nodeRunnerClass = nodeRunnerClass;
nodeInputMap = flowGraph.edges().groupBy(FlowEdge::target);
nodeOutputMap = flowGraph.edges().groupBy(FlowEdge::source);
nodeMap = flowGraph.nodes().toImmutableMap(FlowNode::id, node -> node);
}
@SneakyThrows
@Override
public void run() {
flowStore.init(flowGraph);
flowStore.updateGraphToRunning(flowGraph.id());
var context = new FlowContext();
for (FlowNode node : flowGraph.nodes()) {
executionQueue.offer(node);
}
while (!executionQueue.isEmpty()) {
var node = executionQueue.poll();
if (readyForRunning(node)) {
flowStore.updateNodeToRunning(flowGraph.id(), node.id());
var runnerClazz = nodeRunnerClass.get(node.type());
var runner = runnerClazz.getDeclaredConstructor().newInstance();
runner.setNodeId(node.id());
runner.setContext(context);
runner.run();
flowStore.updateNodeToFinished(flowGraph.id(), node.id());
for (FlowEdge edge : nodeOutputMap.get(node.id())) {
flowStore.updateEdgeToExecute(flowGraph.id(), edge.id());
executionQueue.offer(nodeMap.get(edge.target()));
}
}
}
flowStore.updateGraphToFinished(flowGraph.id());
}
private boolean readyForRunning(FlowNode node) {
return (!nodeInputMap.containsKey(node.id()) || nodeInputMap.get(node.id()).allSatisfy(edge -> flowStore.checkEdgeStatus(flowGraph.id(), edge.id(), FlowEdge.Status.EXECUTE, FlowEdge.Status.SKIP)))
&& flowStore.checkNodeStatus(flowGraph.id(), node.id(), FlowNode.Status.INITIAL);
}
}

View File

@@ -0,0 +1,31 @@
package com.lanyuanxiaoyao.service.ai.web.engine;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowContext;
import lombok.Getter;
public abstract class FlowNodeRunner {
@Getter
private String nodeId;
@Getter
private FlowContext context;
public abstract void run();
void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
void setContext(FlowContext context) {
this.context = context;
}
protected <T> T getData(String key) {
var data = context.get(nodeId);
return (T) data.get(key);
}
protected <T> void setData(String key, T value) {
var data = context.get(nodeId);
data.put(key, value);
}
}

View File

@@ -6,5 +6,12 @@ import org.eclipse.collections.api.map.MutableMap;
@Data
public class FlowContext {
private MutableMap<String, Object> data = Maps.mutable.<String, Object>empty().asSynchronized();
private MutableMap<String, MutableMap<String, Object>> data = Maps.mutable.<String, MutableMap<String, Object>>empty().asSynchronized();
public MutableMap<String, Object> get(String key) {
if (!data.containsKey(key)) {
data.put(key, Maps.mutable.<String, Object>empty().asSynchronized());
}
return data.get(key);
}
}

View File

@@ -1,5 +1,7 @@
package com.lanyuanxiaoyao.service.ai.web.engine.entity;
import java.time.LocalDateTime;
/**
* 流程图中的边
*
@@ -13,4 +15,22 @@ public record FlowEdge(
String sourcePoint,
String targetPoint
) {
public enum Status {
INITIAL, EXECUTE, SKIP
}
public record State(
String id,
Status status,
LocalDateTime startingTime,
LocalDateTime finishedTime
) {
public State(String edgeId) {
this(edgeId, Status.INITIAL, LocalDateTime.now(), null);
}
public State(String edgeId, Status status) {
this(edgeId, status, LocalDateTime.now(), null);
}
}
}

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.ai.web.engine.entity;
import java.time.LocalDateTime;
import org.eclipse.collections.api.set.ImmutableSet;
/**
@@ -13,4 +14,22 @@ public record FlowGraph(
ImmutableSet<FlowNode> nodes,
ImmutableSet<FlowEdge> edges
) {
public enum Status {
INITIAL, RUNNING, FINISHED, ERROR
}
public record State(
String id,
Status status,
LocalDateTime startingTime,
LocalDateTime finishedTime
) {
public State(String id) {
this(id, Status.INITIAL, LocalDateTime.now(), null);
}
public State(String id, Status status) {
this(id, status, LocalDateTime.now(), null);
}
}
}

View File

@@ -1,5 +1,6 @@
package com.lanyuanxiaoyao.service.ai.web.engine.entity;
import java.time.LocalDateTime;
import org.eclipse.collections.api.set.ImmutableSet;
/**
@@ -14,4 +15,22 @@ public record FlowNode(
ImmutableSet<String> inputPoints,
ImmutableSet<String> outputPoints
) {
public enum Status {
INITIAL, RUNNING, FINISHED, SKIPPED
}
public record State(
String id,
Status status,
LocalDateTime startingTime,
LocalDateTime finishedTime
) {
public State(String nodeId) {
this(nodeId, Status.INITIAL, LocalDateTime.now(), null);
}
public State(String nodeId, Status status) {
this(nodeId, status, LocalDateTime.now(), null);
}
}
}

View File

@@ -1,5 +0,0 @@
package com.lanyuanxiaoyao.service.ai.web.engine.entity;
public abstract class FlowNodeRunner implements Runnable {
private final FlowContext context;
}

View File

@@ -0,0 +1,37 @@
package com.lanyuanxiaoyao.service.ai.web.engine.store;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode;
/**
* 存储状态
*
* @author lanyuanxiaoyao
* @version 20250701
*/
public interface FlowStore {
void init(FlowGraph flowGraph);
void updateGraphToRunning(String graphId);
void updateGraphToFinished(String graphId);
void updateGraphToError(String graphId);
void updateNodeToRunning(String graphId, String nodeId);
void updateNodeToSkipped(String graphId, String nodeId);
void updateNodeToFinished(String graphId, String nodeId);
void updateEdgeToExecute(String graphId, String edgeId);
void updateEdgeToSkip(String graphId, String edgeId);
boolean checkNodeStatus(String graphId, String nodeId, FlowNode.Status... statuses);
boolean checkEdgeStatus(String graphId, String edgeId, FlowEdge.Status... statuses);
void print();
}

View File

@@ -0,0 +1,138 @@
package com.lanyuanxiaoyao.service.ai.web.engine.store;
import cn.hutool.core.util.ArrayUtil;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowEdge;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowGraph;
import com.lanyuanxiaoyao.service.ai.web.engine.entity.FlowNode;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.map.MutableMap;
/**
* 基于内存的流程状态存储
*
* @author lanyuanxiaoyao
* @version 20250701
*/
@Slf4j
public class InMemoryFlowStore implements FlowStore {
private static final MutableMap<String, FlowGraph.State> flowGraphStateMap = Maps.mutable.<String, FlowGraph.State>empty().asSynchronized();
private static final MutableMap<String, FlowNode.State> flowNodeStateMap = Maps.mutable.<String, FlowNode.State>empty().asSynchronized();
private static final MutableMap<String, FlowEdge.State> flowEdgeStateMap = Maps.mutable.<String, FlowEdge.State>empty().asSynchronized();
private String multiKey(String... key) {
return String.join("-", key);
}
@Override
public void init(FlowGraph flowGraph) {
flowGraphStateMap.put(flowGraph.id(), new FlowGraph.State(flowGraph.id()));
for (FlowNode node : flowGraph.nodes()) {
flowNodeStateMap.put(multiKey(flowGraph.id(), node.id()), new FlowNode.State(node.id()));
}
for (FlowEdge edge : flowGraph.edges()) {
flowEdgeStateMap.put(multiKey(flowGraph.id(), edge.id()), new FlowEdge.State(edge.id()));
}
}
@Override
public void updateGraphToRunning(String graphId) {
flowGraphStateMap.updateValue(
graphId,
() -> new FlowGraph.State(graphId, FlowGraph.Status.RUNNING),
old -> new FlowGraph.State(graphId, FlowGraph.Status.RUNNING, old.startingTime(), old.finishedTime())
);
}
@Override
public void updateGraphToFinished(String graphId) {
flowGraphStateMap.updateValue(
graphId,
() -> new FlowGraph.State(graphId, FlowGraph.Status.FINISHED),
old -> new FlowGraph.State(graphId, FlowGraph.Status.FINISHED, old.startingTime(), old.finishedTime())
);
}
@Override
public void updateGraphToError(String graphId) {
flowGraphStateMap.updateValue(
graphId,
() -> new FlowGraph.State(graphId, FlowGraph.Status.ERROR),
old -> new FlowGraph.State(graphId, FlowGraph.Status.ERROR, old.startingTime(), old.finishedTime())
);
}
@Override
public void updateNodeToRunning(String graphId, String nodeId) {
flowNodeStateMap.updateValue(
multiKey(graphId, nodeId),
() -> new FlowNode.State(nodeId, FlowNode.Status.RUNNING),
old -> new FlowNode.State(nodeId, FlowNode.Status.RUNNING, old.startingTime(), old.finishedTime())
);
}
@Override
public void updateNodeToSkipped(String graphId, String nodeId) {
flowNodeStateMap.updateValue(
multiKey(graphId, nodeId),
() -> new FlowNode.State(nodeId, FlowNode.Status.SKIPPED),
old -> new FlowNode.State(nodeId, FlowNode.Status.SKIPPED, old.startingTime(), old.finishedTime())
);
}
@Override
public void updateNodeToFinished(String graphId, String nodeId) {
flowNodeStateMap.updateValue(
multiKey(graphId, nodeId),
() -> new FlowNode.State(nodeId, FlowNode.Status.FINISHED),
old -> new FlowNode.State(nodeId, FlowNode.Status.FINISHED, old.startingTime(), old.finishedTime())
);
}
@Override
public void updateEdgeToExecute(String graphId, String edgeId) {
flowEdgeStateMap.updateValue(
multiKey(graphId, edgeId),
() -> new FlowEdge.State(edgeId, FlowEdge.Status.EXECUTE),
old -> new FlowEdge.State(edgeId, FlowEdge.Status.EXECUTE, old.startingTime(), old.finishedTime())
);
}
@Override
public void updateEdgeToSkip(String graphId, String edgeId) {
flowEdgeStateMap.updateValue(
multiKey(graphId, edgeId),
() -> new FlowEdge.State(edgeId, FlowEdge.Status.SKIP),
old -> new FlowEdge.State(edgeId, FlowEdge.Status.SKIP, old.startingTime(), old.finishedTime())
);
}
@Override
public boolean checkNodeStatus(String graphId, String nodeId, FlowNode.Status... statuses) {
String key = multiKey(graphId, nodeId);
if (flowNodeStateMap.containsKey(key)) {
return ArrayUtil.contains(statuses, flowNodeStateMap.get(key).status());
}
return false;
}
@Override
public boolean checkEdgeStatus(String graphId, String edgeId, FlowEdge.Status... statuses) {
String key = multiKey(graphId, edgeId);
if (flowEdgeStateMap.containsKey(key)) {
return ArrayUtil.contains(statuses, flowEdgeStateMap.get(key).status());
}
return false;
}
@Override
public void print() {
log.info("====== Flow Store ======");
log.info("====== Flow Graph ======");
flowGraphStateMap.forEachKeyValue((key, value) -> log.info("{}: {}", key, value.status()));
log.info("====== Flow Node ======");
flowNodeStateMap.forEachKeyValue((key, value) -> log.info("{}: {}", key, value.status()));
log.info("====== Flow Edge ======");
flowEdgeStateMap.forEachKeyValue((key, value) -> log.info("{}: {}", key, value.status()));
}
}