1
0
Files
DiAL/tests/server/checker/engine.test.ts
lanyuanxiaoyao 08b61cbf47 refactor: ProbeEngine 调度引擎重写为 per-target setTimeout 链
将 per-group setInterval + groupBy 调度模式改为 per-target setTimeout 链,
实现 catch-up 语义(超时后立即补执行)、AbortController 优雅停止、
循环内错误隔离和 overrun warn 日志。
移除 groupBy/probeGroup/timers,新增 sleep/runLoop/runOnce。
新增 croner 依赖供后续 cron 表达式支持使用。
2026-05-26 11:35:06 +08:00

690 lines
23 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { describe, expect, test } from "bun:test";
import type { ResolvedCommandTarget } from "../../../src/server/checker/runner/cmd/types";
import type { ResolvedHttpTarget } from "../../../src/server/checker/runner/http/types";
import type { ProbeStore } from "../../../src/server/checker/store";
import type { ResolvedTargetBase } from "../../../src/server/checker/types";
import { ProbeEngine } from "../../../src/server/checker/engine";
import { checkerRegistry } from "../../../src/server/checker/runner";
import { CommandChecker } from "../../../src/server/checker/runner/cmd/execute";
import { HttpChecker } from "../../../src/server/checker/runner/http/execute";
import { createMemoryLogger } from "../../../src/server/logger";
const processEnv = Object.fromEntries(
Object.entries(process.env).filter((entry): entry is [string, string] => entry[1] !== undefined),
);
function createMockStore(targetNames: string[]) {
const targets = targetNames.map((name) => ({ id: name, name }));
const results: Array<Record<string, unknown>> = [];
return {
_results: results,
getLatestChecksMap() {
return new Map();
},
getTargets() {
return targets.map(({ id, name }) => ({
config: "",
expect: null,
grp: "default",
id,
interval_ms: 60000,
name,
target: "",
timeout_ms: 5000,
type: "cmd" as const,
}));
},
insertCheckResult(result: Record<string, unknown>) {
results.push(result);
},
};
}
function ensureRegistered() {
if (!checkerRegistry.supportedTypes.includes("http")) {
checkerRegistry.register(new HttpChecker());
checkerRegistry.register(new CommandChecker());
}
}
function getRunOnce(engine: ProbeEngine) {
return (
engine as unknown as {
runOnce: (t: ResolvedTargetBase) => Promise<Record<string, unknown>>;
}
).runOnce.bind(engine);
}
function makeCommandTarget(name: string, overrides?: Partial<ResolvedCommandTarget>): ResolvedCommandTarget {
return {
cmd: {
args: ["-e", "console.log('hello')"],
cwd: process.cwd(),
env: processEnv,
exec: "bun",
maxOutputBytes: 1024 * 1024,
},
description: null,
group: "default",
id: name,
intervalMs: 60000,
name,
timeoutMs: 5000,
type: "cmd",
...overrides,
};
}
function makeMockResult(targetId: string, overrides?: Partial<Record<string, unknown>>) {
return {
detail: null,
durationMs: 1,
failure: null,
matched: true,
observation: null,
targetId,
timestamp: new Date().toISOString(),
...overrides,
};
}
describe("ProbeEngine", () => {
test("start/stop 不抛错", () => {
ensureRegistered();
const mockStore = createMockStore(["test"]) as unknown as ProbeStore;
const targets: ResolvedTargetBase[] = [makeCommandTarget("test")];
const engine = new ProbeEngine(mockStore, targets);
engine.start();
engine.stop();
expect(true).toBe(true);
});
test("单次 runOnce 执行 cmd 检查", async () => {
const target = makeCommandTarget("cmd-echo");
const mockStore = createMockStore(["cmd-echo"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target]);
await getRunOnce(engine)(target);
const results = (mockStore as unknown as { _results: Array<Record<string, unknown>> })._results;
expect(results.length).toBe(1);
expect(results[0]!["matched"]).toBe(true);
expect((results[0]!["observation"] as Record<string, unknown>)["exitCode"]).toBe(0);
});
test("多个目标并发执行", async () => {
const targetA = makeCommandTarget("echo-a", {
cmd: {
args: ["-e", "console.log('a')"],
cwd: process.cwd(),
env: processEnv,
exec: "bun",
maxOutputBytes: 1024 * 1024,
},
});
const targetB = makeCommandTarget("echo-b", {
cmd: {
args: ["-e", "console.log('b')"],
cwd: process.cwd(),
env: processEnv,
exec: "bun",
maxOutputBytes: 1024 * 1024,
},
});
const mockStore = createMockStore(["echo-a", "echo-b"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [targetA, targetB]);
const runOnce = getRunOnce(engine);
await Promise.all([runOnce(targetA), runOnce(targetB)]);
const results = (mockStore as unknown as { _results: Array<Record<string, unknown>> })._results;
expect(results.length).toBe(2);
});
test("失败目标不阻塞其他目标", async () => {
const badTarget = makeCommandTarget("bad-cmd", {
cmd: {
args: ["-e", "process.exit(1)"],
cwd: process.cwd(),
env: processEnv,
exec: "bun",
maxOutputBytes: 1024 * 1024,
},
});
const goodTarget = makeCommandTarget("good-cmd");
const mockStore = createMockStore(["bad-cmd", "good-cmd"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [badTarget, goodTarget]);
const runOnce = getRunOnce(engine);
await Promise.all([runOnce(badTarget), runOnce(goodTarget)]);
const results = (mockStore as unknown as { _results: Array<Record<string, unknown>> })._results;
expect(results.length).toBe(2);
const badResult = results.find((r) => r["matched"] === false);
const goodResult = results.find((r) => r["matched"] === true);
expect(badResult).toBeDefined();
expect(goodResult).toBeDefined();
});
test("checker rejected 时写入 internal error 结果", async () => {
ensureRegistered();
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = async (target, ctx) => {
if (target.name === "reject-cmd") {
throw new Error("boom");
}
return originalExecute(target, ctx);
};
const rejectTarget = makeCommandTarget("reject-cmd");
const goodTarget = makeCommandTarget("good-cmd");
const mockStore = createMockStore(["reject-cmd", "good-cmd"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [rejectTarget, goodTarget]);
const runOnce = getRunOnce(engine);
await Promise.all([runOnce(rejectTarget), runOnce(goodTarget)]);
const results = (mockStore as unknown as { _results: Array<Record<string, unknown>> })._results;
expect(results.length).toBe(2);
const rejectResult = results.find((r) => r["targetId"] === "reject-cmd");
const goodResult = results.find((r) => r["targetId"] === "good-cmd");
expect(rejectResult).toBeDefined();
expect(rejectResult!["matched"]).toBe(false);
expect(rejectResult!["durationMs"]).toBeNull();
expect(rejectResult!["observation"]).toBeNull();
expect(rejectResult!["failure"]).toEqual({
kind: "error",
message: "boom",
path: "engine",
phase: "internal",
});
expect(typeof rejectResult!["timestamp"]).toBe("string");
expect(goodResult).toBeDefined();
expect(goodResult!["matched"]).toBe(true);
checker.execute = originalExecute;
});
test("并发限制 maxConcurrentChecks", async () => {
const targets = Array.from({ length: 5 }, (_, i) =>
makeCommandTarget(`cmd-${i}`, {
cmd: {
args: ["-e", `console.log('${i}')`],
cwd: process.cwd(),
env: processEnv,
exec: "bun",
maxOutputBytes: 1024 * 1024,
},
}),
);
const mockStore = createMockStore(targets.map((t) => t.name ?? t.id)) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, targets, 2);
const runOnce = getRunOnce(engine);
await Promise.all(targets.map((t) => runOnce(t)));
const results = (mockStore as unknown as { _results: Array<Record<string, unknown>> })._results;
expect(results.length).toBe(5);
for (const r of results) {
expect(r["matched"]).toBe(true);
}
});
test("不同 interval 的 target 独立调度", () => {
const targetA = makeCommandTarget("a", { intervalMs: 30000 });
const targetB = makeCommandTarget("b", { intervalMs: 30000 });
const targetC = makeCommandTarget("c", { intervalMs: 60000 });
const mockStore = createMockStore(["a", "b", "c"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [targetA, targetB, targetC]);
engine.start();
engine.stop();
expect(true).toBe(true);
});
test("未注册的 target id 不写入结果", async () => {
const target = makeCommandTarget("unknown-target");
const mockStore = createMockStore(["other-name"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target]);
await getRunOnce(engine)(target);
const results = (mockStore as unknown as { _results: Array<Record<string, unknown>> })._results;
expect(results.length).toBe(0);
});
test("HTTP 目标运行", async () => {
const httpServer = Bun.serve({
fetch() {
return new Response("ok");
},
port: 0,
});
try {
const httpTarget: ResolvedHttpTarget = {
description: null,
group: "default",
http: {
headers: {},
ignoreSSL: false,
maxBodyBytes: 1024 * 1024,
maxRedirects: 0,
method: "GET",
url: `http://localhost:${httpServer.port}/`,
},
id: "http-test",
intervalMs: 60000,
name: "http-test",
timeoutMs: 5000,
type: "http",
};
const mockStore = createMockStore(["http-test"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [httpTarget]);
await getRunOnce(engine)(httpTarget);
const results = (mockStore as unknown as { _results: Array<Record<string, unknown>> })._results;
expect(results.length).toBe(1);
expect(results[0]!["matched"]).toBe(true);
expect((results[0]!["observation"] as Record<string, unknown>)["statusCode"]).toBe(200);
} finally {
void httpServer.stop();
}
});
test("retentionMs > 0 时 start 调用 prune", () => {
let pruneCalled = false;
const mockStore = {
...createMockStore(["test"]),
prune() {
pruneCalled = true;
return 0;
},
} as unknown as ProbeStore;
const targets: ResolvedTargetBase[] = [makeCommandTarget("test")];
const engine = new ProbeEngine(mockStore, targets, 20, 86400000);
engine.start();
expect(pruneCalled).toBe(true);
engine.stop();
});
test("retentionMs = 0 时不调用 prune", () => {
let pruneCalled = false;
const mockStore = {
...createMockStore(["test"]),
prune() {
pruneCalled = true;
return 0;
},
} as unknown as ProbeStore;
const targets: ResolvedTargetBase[] = [makeCommandTarget("test")];
const engine = new ProbeEngine(mockStore, targets, 20, 0);
engine.start();
expect(pruneCalled).toBe(false);
engine.stop();
});
test("retentionMs 未传时不调用 prune", () => {
let pruneCalled = false;
const mockStore = {
...createMockStore(["test"]),
prune() {
pruneCalled = true;
return 0;
},
} as unknown as ProbeStore;
const targets: ResolvedTargetBase[] = [makeCommandTarget("test")];
const engine = new ProbeEngine(mockStore, targets);
engine.start();
expect(pruneCalled).toBe(false);
engine.stop();
});
describe("日志与状态变化", () => {
test("checker rejected 时 logger 记录 error", async () => {
ensureRegistered();
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = async (target, ctx) => {
if (target.id === "fail-target") throw new Error("explode");
return originalExecute(target, ctx);
};
const logger = createMemoryLogger();
const mockStore = createMockStore(["fail-target", "ok-target"]) as unknown as ProbeStore;
const engine = new ProbeEngine(
mockStore,
[makeCommandTarget("fail-target"), makeCommandTarget("ok-target")],
20,
0,
logger,
);
const runOnce = getRunOnce(engine);
await Promise.all([runOnce(makeCommandTarget("fail-target")), runOnce(makeCommandTarget("ok-target"))]);
const errorLogs = logger.entries.filter((e) => e.level === "error");
expect(errorLogs.length).toBeGreaterThanOrEqual(1);
expect(errorLogs[0]!.msg).toContain("探针执行失败");
checker.execute = originalExecute;
});
test("状态变化 UP→DOWN 记录 warn 日志", async () => {
ensureRegistered();
const logger = createMemoryLogger();
const targetId = "state-test";
const mockStore = {
...createMockStore([targetId]),
getLatestChecksMap() {
return new Map([[targetId, { matched: 1 }]]);
},
} as unknown as ProbeStore;
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = async (target) => {
if (target.id === targetId) {
return makeMockResult(targetId, {
durationMs: 10,
failure: { kind: "error", message: "fail", path: "exitCode", phase: "body" },
matched: false,
});
}
return originalExecute(target, { signal: new AbortController().signal });
};
const engine = new ProbeEngine(mockStore, [makeCommandTarget(targetId)], 20, 0, logger);
await getRunOnce(engine)(makeCommandTarget(targetId));
const stateLogs = logger.entries.filter((e) => e.level === "warn" && e.msg.includes("UP → DOWN"));
expect(stateLogs.length).toBe(1);
expect(stateLogs[0]!.msg).toContain(targetId);
checker.execute = originalExecute;
});
test("状态变化 DOWN→UP 记录 info 日志", async () => {
ensureRegistered();
const logger = createMemoryLogger();
const targetId = "recover-test";
const mockStore = {
...createMockStore([targetId]),
getLatestChecksMap() {
return new Map([[targetId, { matched: 0 }]]);
},
} as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [makeCommandTarget(targetId)], 20, 0, logger);
await getRunOnce(engine)(makeCommandTarget(targetId));
const recoverLogs = logger.entries.filter((e) => e.level === "info" && e.msg.includes("DOWN → UP"));
expect(recoverLogs.length).toBe(1);
expect(recoverLogs[0]!.msg).toContain(targetId);
});
test("稳态 UP→UP 不产生状态变化日志", async () => {
ensureRegistered();
const logger = createMemoryLogger();
const targetId = "steady-up";
const mockStore = {
...createMockStore([targetId]),
getLatestChecksMap() {
return new Map([[targetId, { matched: 1 }]]);
},
} as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [makeCommandTarget(targetId)], 20, 0, logger);
await getRunOnce(engine)(makeCommandTarget(targetId));
const stateChangeLogs = logger.entries.filter(
(e) => e.msg.includes("UP → DOWN") || e.msg.includes("DOWN → UP") || e.msg.includes("首次 DOWN"),
);
expect(stateChangeLogs.length).toBe(0);
});
test("首次检查 DOWN 记录 warn 日志", async () => {
ensureRegistered();
const logger = createMemoryLogger();
const targetId = "first-down";
const mockStore = createMockStore([targetId]) as unknown as ProbeStore;
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = async (target) => {
if (target.id === targetId) {
return makeMockResult(targetId, {
durationMs: 10,
failure: { kind: "error", message: "fail", path: "exitCode", phase: "body" },
matched: false,
});
}
return originalExecute(target, { signal: new AbortController().signal });
};
const engine = new ProbeEngine(mockStore, [makeCommandTarget(targetId)], 20, 0, logger);
await getRunOnce(engine)(makeCommandTarget(targetId));
const firstDownLogs = logger.entries.filter((e) => e.level === "warn" && e.msg.includes("首次 DOWN"));
expect(firstDownLogs.length).toBe(1);
checker.execute = originalExecute;
});
test("每次检查产出 debug 日志", async () => {
ensureRegistered();
const logger = createMemoryLogger();
const targetId = "debug-target";
const mockStore = createMockStore([targetId]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [makeCommandTarget(targetId)], 20, 0, logger);
await getRunOnce(engine)(makeCommandTarget(targetId));
const debugLogs = logger.entries.filter((e) => e.level === "debug");
expect(debugLogs.length).toBeGreaterThanOrEqual(1);
const first = debugLogs[0]!;
expect(first.obj).toBeDefined();
expect(first.obj!["targetId"]).toBe(targetId);
});
test("无 logger 时不抛错noop logger 兜底)", async () => {
ensureRegistered();
const mockStore = createMockStore(["no-log"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [makeCommandTarget("no-log")]);
await getRunOnce(engine)(makeCommandTarget("no-log"));
});
});
describe("runLoop 调度行为", () => {
test("首次立即执行", async () => {
ensureRegistered();
let callCount = 0;
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = (target) => {
callCount++;
return Promise.resolve(makeMockResult(target.id));
};
const target = makeCommandTarget("immediate", { intervalMs: 60000 });
const mockStore = createMockStore(["immediate"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target]);
engine.start();
await Bun.sleep(20);
expect(callCount).toBeGreaterThanOrEqual(1);
engine.stop();
checker.execute = originalExecute;
});
test("正常调度间隔", async () => {
ensureRegistered();
const callTimes: number[] = [];
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = (target) => {
callTimes.push(performance.now());
return Promise.resolve(makeMockResult(target.id));
};
const target = makeCommandTarget("interval", { intervalMs: 100 });
const mockStore = createMockStore(["interval"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target]);
engine.start();
await Bun.sleep(280);
engine.stop();
expect(callTimes.length).toBeGreaterThanOrEqual(2);
const gap = callTimes[1]! - callTimes[0]!;
expect(gap).toBeGreaterThanOrEqual(80);
expect(gap).toBeLessThan(200);
checker.execute = originalExecute;
});
test("catch-up 语义:超时后立即补执行", async () => {
ensureRegistered();
const callTimes: number[] = [];
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = async (target) => {
callTimes.push(performance.now());
if (callTimes.length === 1) {
await Bun.sleep(150);
}
return makeMockResult(target.id);
};
const target = makeCommandTarget("catchup", { intervalMs: 100 });
const mockStore = createMockStore(["catchup"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target]);
engine.start();
await Bun.sleep(350);
engine.stop();
expect(callTimes.length).toBeGreaterThanOrEqual(2);
const gap = callTimes[1]! - callTimes[0]!;
expect(gap).toBeGreaterThanOrEqual(140);
expect(gap).toBeLessThan(220);
checker.execute = originalExecute;
});
test("overrun warn 日志", async () => {
ensureRegistered();
const logger = createMemoryLogger();
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = async (target) => {
await Bun.sleep(150);
return makeMockResult(target.id, { durationMs: 150 });
};
const target = makeCommandTarget("overrun", { intervalMs: 100 });
const mockStore = createMockStore(["overrun"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target], 20, 0, logger);
engine.start();
await Bun.sleep(250);
engine.stop();
const warnLogs = logger.entries.filter((e) => e.level === "warn" && e.msg.includes("拨测超时"));
expect(warnLogs.length).toBeGreaterThanOrEqual(1);
expect(warnLogs[0]!.obj).toBeDefined();
expect(warnLogs[0]!.obj!["targetId"]).toBe("overrun");
checker.execute = originalExecute;
});
test("无并发重叠:同一 target 不会并发执行", async () => {
ensureRegistered();
let running = 0;
let maxConcurrent = 0;
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = async (target) => {
running++;
maxConcurrent = Math.max(maxConcurrent, running);
await Bun.sleep(60);
running--;
return makeMockResult(target.id, { durationMs: 60 });
};
const target = makeCommandTarget("no-overlap", { intervalMs: 70 });
const mockStore = createMockStore(["no-overlap"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target]);
engine.start();
await Bun.sleep(350);
engine.stop();
expect(maxConcurrent).toBeLessThanOrEqual(1);
checker.execute = originalExecute;
});
test("优雅停止stop() 后循环快速退出", async () => {
ensureRegistered();
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = (target) => Promise.resolve(makeMockResult(target.id));
const target = makeCommandTarget("graceful", { intervalMs: 60000 });
const mockStore = createMockStore(["graceful"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target]);
engine.start();
await Bun.sleep(20);
const stopStart = performance.now();
engine.stop();
const stopDuration = performance.now() - stopStart;
expect(stopDuration).toBeLessThan(1000);
checker.execute = originalExecute;
});
test("错误隔离runCheck 抛异常后循环继续", async () => {
ensureRegistered();
let callCount = 0;
const checker = checkerRegistry.get("cmd");
const originalExecute = checker.execute.bind(checker);
checker.execute = (target) => {
callCount++;
if (callCount === 1) {
return Promise.reject(new Error("first fail"));
}
return Promise.resolve(makeMockResult(target.id));
};
const target = makeCommandTarget("error-isolation", { intervalMs: 50 });
const mockStore = createMockStore(["error-isolation"]) as unknown as ProbeStore;
const engine = new ProbeEngine(mockStore, [target]);
engine.start();
await Bun.sleep(180);
engine.stop();
expect(callCount).toBeGreaterThanOrEqual(2);
checker.execute = originalExecute;
});
});
});