1
0
Files
DiAL/src/server/checker/engine.ts
lanyuanxiaoyao 147a2559ae refactor: 后端架构加固 — 泛型化、批量查询、bootstrap 统一、路径修复与 pageSize 上限
- CheckerDefinition 泛型化,HTTP/Command checker 移除 resolved target 断言
- 新增 ProbeStore.getAllRecentSamples 消除 targets 路由 N+1 查询
- 统一 getAllTargetStats 与 getTargetStats 的 availability 精度
- Engine rejected 结果写入 internal error 记录,提升可观测性
- 新增 bootstrap.ts 统一 dev/production 启动序列
- dataDir 相对路径改为基于配置文件目录解析
- validatePagination 增加 pageSize 上限 200 校验
- 修复 ErrorBoundary override 标记
- 更新 README/DEVELOPMENT 文档,新增完整测试覆盖
2026-05-13 18:15:46 +08:00

124 lines
3.5 KiB
TypeScript

import { groupBy, isError, Semaphore } from "es-toolkit";
import type { ProbeStore } from "./store";
import type { CheckResult, ResolvedTargetBase } from "./types";
import { errorFailure } from "./expect/failure";
import { checkerRegistry } from "./runner";
const PRUNE_INTERVAL_MS = 3600000;
export class ProbeEngine {
private retentionMs: number;
private semaphore: Semaphore;
private store: ProbeStore;
private targetNameToId = new Map<string, number>();
private targets: ResolvedTargetBase[];
private timers: Array<ReturnType<typeof setInterval>> = [];
constructor(store: ProbeStore, targets: ResolvedTargetBase[], maxConcurrentChecks?: number, retentionMs?: number) {
this.store = store;
this.targets = targets;
this.semaphore = new Semaphore(maxConcurrentChecks ?? 20);
this.retentionMs = retentionMs ?? 0;
this.refreshCache();
}
start(): void {
const groups = groupBy(this.targets, (t) => t.intervalMs);
for (const [intervalMs, groupTargets] of Object.entries(groups)) {
void this.probeGroup(groupTargets);
const timer = setInterval(() => {
void this.probeGroup(groupTargets);
}, Number(intervalMs));
this.timers.push(timer);
}
if (this.retentionMs > 0) {
this.store.prune(this.retentionMs);
const pruneTimer = setInterval(() => {
this.store.prune(this.retentionMs);
}, PRUNE_INTERVAL_MS);
this.timers.push(pruneTimer);
}
}
stop(): void {
for (const timer of this.timers) {
clearInterval(timer);
}
this.timers = [];
}
private async probeGroup(targets: ResolvedTargetBase[]): Promise<void> {
const results = await Promise.allSettled(
targets.map(async (target) => {
await this.semaphore.acquire();
try {
return await this.runCheck(target);
} finally {
this.semaphore.release();
}
}),
);
for (const [index, result] of results.entries()) {
if (result.status === "fulfilled") {
this.writeResult(result.value);
} else {
const target = targets[index];
console.warn("探针执行失败:", result.reason);
if (!target) continue;
this.writeResult({
durationMs: null,
failure: errorFailure("internal", "engine", formatReason(result.reason)),
matched: false,
statusDetail: null,
targetName: target.name,
timestamp: new Date().toISOString(),
});
}
}
}
private refreshCache(): void {
this.targetNameToId.clear();
for (const target of this.store.getTargets()) {
this.targetNameToId.set(target.name, target.id);
}
}
private async runCheck(target: ResolvedTargetBase): Promise<CheckResult> {
const checker = checkerRegistry.get(target.type);
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), target.timeoutMs);
try {
return await checker.execute(target, { signal: controller.signal });
} finally {
clearTimeout(timeoutId);
}
}
private writeResult(result: CheckResult): void {
const targetId = this.targetNameToId.get(result.targetName);
if (!targetId) return;
this.store.insertCheckResult({
durationMs: result.durationMs,
failure: result.failure,
matched: result.matched,
statusDetail: result.statusDetail,
targetId,
timestamp: result.timestamp,
});
}
}
function formatReason(reason: unknown): string {
return isError(reason) ? reason.message : String(reason);
}