1
0
Files
DiAL/src/server/checker/engine.ts
lanyuanxiaoyao 08b61cbf47 refactor: ProbeEngine 调度引擎重写为 per-target setTimeout 链
将 per-group setInterval + groupBy 调度模式改为 per-target setTimeout 链,
实现 catch-up 语义(超时后立即补执行)、AbortController 优雅停止、
循环内错误隔离和 overrun warn 日志。
移除 groupBy/probeGroup/timers,新增 sleep/runLoop/runOnce。
新增 croner 依赖供后续 cron 表达式支持使用。
2026-05-26 11:35:06 +08:00

221 lines
6.3 KiB
TypeScript

import { isError, Semaphore } from "es-toolkit";
import type { Logger } from "../logger";
import type { ProbeStore } from "./store";
import type { CheckResult, ResolvedTargetBase } from "./types";
import { createNoopLogger } from "../logger";
import { errorFailure } from "./expect/failure";
import { checkerRegistry } from "./runner";
const PRUNE_INTERVAL_MS = 3600000;
export class ProbeEngine {
private abort: AbortController | null = null;
private lastMatched = new Map<string, boolean>();
private logger: Logger;
private pruneTimer: null | ReturnType<typeof setInterval> = null;
private retentionMs: number;
private semaphore: Semaphore;
private store: ProbeStore;
private targetIds = new Set<string>();
private targets: ResolvedTargetBase[];
constructor(
store: ProbeStore,
targets: ResolvedTargetBase[],
maxConcurrentChecks?: number,
retentionMs?: number,
logger?: Logger,
) {
this.store = store;
this.targets = targets;
this.semaphore = new Semaphore(maxConcurrentChecks ?? 20);
this.retentionMs = retentionMs ?? 0;
this.logger = logger ?? createNoopLogger();
this.refreshCache();
this.initStateCache();
}
start(): void {
this.abort = new AbortController();
const signal = this.abort.signal;
for (const target of this.targets) {
void this.runLoop(target, signal);
}
if (this.retentionMs > 0) {
this.store.prune(this.retentionMs);
this.pruneTimer = setInterval(() => {
this.store.prune(this.retentionMs);
}, PRUNE_INTERVAL_MS);
}
}
stop(): void {
this.abort?.abort();
this.abort = null;
if (this.pruneTimer) {
clearInterval(this.pruneTimer);
this.pruneTimer = null;
}
}
private initStateCache(): void {
const latestMap = this.store.getLatestChecksMap();
for (const [id, row] of latestMap) {
this.lastMatched.set(id, row.matched === 1);
}
}
private logCheckDebug(result: CheckResult): void {
this.logger.debug({
durationMs: result.durationMs,
failureMessage: result.failure?.message ?? null,
failurePhase: result.failure?.phase ?? null,
matched: result.matched,
targetId: result.targetId,
});
}
private logStateChange(result: CheckResult): void {
const previous = this.lastMatched.get(result.targetId);
const current = result.matched;
if (previous === undefined) {
if (!current) {
this.logger.warn(
{ durationMs: result.durationMs, failurePhase: result.failure?.phase, targetId: result.targetId },
`目标首次 DOWN: ${result.targetId}`,
);
}
} else if (previous && !current) {
this.logger.warn(
{ durationMs: result.durationMs, failurePhase: result.failure?.phase, targetId: result.targetId },
`目标状态变化 UP → DOWN: ${result.targetId}`,
);
} else if (!previous && current) {
this.logger.info(
{ durationMs: result.durationMs, targetId: result.targetId },
`目标恢复 DOWN → UP: ${result.targetId}`,
);
}
this.lastMatched.set(result.targetId, current);
}
private refreshCache(): void {
this.targetIds.clear();
for (const target of this.store.getTargets()) {
this.targetIds.add(target.id);
}
}
private async runCheck(target: ResolvedTargetBase): Promise<CheckResult> {
const checker = checkerRegistry.get(target.type);
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), target.timeoutMs);
try {
return await checker.execute(target, { signal: controller.signal });
} finally {
clearTimeout(timeoutId);
}
}
private async runLoop(target: ResolvedTargetBase, signal: AbortSignal): Promise<void> {
while (!signal.aborted) {
const start = performance.now();
try {
await this.runOnce(target, signal);
} catch {
break;
}
const elapsed = performance.now() - start;
if (elapsed > target.intervalMs) {
this.logger.warn(
{ elapsed, intervalMs: target.intervalMs, targetId: target.id },
`拨测超时: ${target.id} 耗时 ${Math.round(elapsed)}ms > 间隔 ${target.intervalMs}ms`,
);
}
const delay = Math.max(0, target.intervalMs - elapsed);
try {
await sleep(delay, signal);
} catch {
break;
}
}
}
private async runOnce(target: ResolvedTargetBase, signal?: AbortSignal): Promise<CheckResult> {
await this.semaphore.acquire();
if (signal?.aborted) {
this.semaphore.release();
throw new DOMException("Aborted", "AbortError");
}
try {
const result = await this.runCheck(target);
this.writeResult(result);
this.logStateChange(result);
this.logCheckDebug(result);
return result;
} catch (error) {
const reason = formatReason(error);
this.logger.error({ reason, targetId: target.id, targetType: target.type }, `探针执行失败: ${reason}`);
const errorResult: CheckResult = {
detail: null,
durationMs: null,
failure: errorFailure("internal", "engine", reason),
matched: false,
observation: null,
targetId: target.id,
timestamp: new Date().toISOString(),
};
this.writeResult(errorResult);
return errorResult;
} finally {
this.semaphore.release();
}
}
private writeResult(result: CheckResult): void {
if (!this.targetIds.has(result.targetId)) return;
this.store.insertCheckResult({
durationMs: result.durationMs,
failure: result.failure,
matched: result.matched,
observation: result.observation ?? null,
targetId: result.targetId,
timestamp: result.timestamp,
});
}
}
function formatReason(reason: unknown): string {
return isError(reason) ? reason.message : String(reason);
}
function sleep(ms: number, signal: AbortSignal): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (signal.aborted) {
reject(new DOMException("Aborted", "AbortError"));
return;
}
const timer = setTimeout(() => {
signal.removeEventListener("abort", onAbort);
resolve();
}, ms);
function onAbort() {
clearTimeout(timer);
reject(new DOMException("Aborted", "AbortError"));
}
signal.addEventListener("abort", onAbort, { once: true });
});
}