1
0
Files
DiAL/src/server/checker/runner/udp/execute.ts
lanyuanxiaoyao cf847ccd7a feat: 重构配置生命周期为 Authoring/Normalized/Resolved 三层
将变量替换和 expect 简写展开统一放入 Normalized 阶段,
运行时 AJV 使用 Normalized schema,导出 schema 面向 Authoring Config。

主要变更:
- 新增 normalizer.ts 实现 normalizeAuthoringConfig()
- 拆分 Authoring/Normalized 双 schema,checker 接口支持 authoring/normalized 片段
- config-loader 流程:normalize → Normalized AJV → semantic → resolve
- validator 兼容层自动分派 raw/normalized expect 形态
- 删除 rawExpect,store.expect 列写入 null
- Authoring schema 对 integer/boolean/enum 字段接受变量引用
- 修复 DB/HTTP validate 入口守卫和 LLM options integer 变量引用
- 优化 compact() 避免 undefined 覆盖隐患
- 移除 content.ts 恒为 true 的前置条件
- 同步 5 个主规范并归档 change
2026-05-22 14:00:47 +08:00

443 lines
13 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { isError } from "es-toolkit";
import type { CheckResult, RawTargetConfig } from "../../types";
import type { CheckerContext, CheckerDefinition, CheckerValidationInput, ResolveContext } from "../types";
import type { ResolvedUdpExpectConfig, ResolvedUdpTarget, UdpTargetConfig } from "./types";
import { errorFailure } from "../../expect/failure";
import { checkValueExpectation } from "../../expect/value";
import { parseSize } from "../../utils";
import { decodePayload, encodeResponse } from "./encoding";
import { checkResponded, checkResponseSize, checkResponseText, checkSourceHost, checkSourcePort } from "./expect";
import { udpCheckerSchemas } from "./schema";
import { validateUdpConfig } from "./validate";
const DEFAULT_MAX_RESPONSE_BYTES = 4096;
type UdpExchangeResult =
| {
data: Uint8Array;
flags: { truncated: boolean };
ok: true;
responded: true;
sourceAddress: string;
sourcePort: number;
}
| { error: string; ok: false }
| {
ok: true;
responded: false;
};
export class UdpChecker implements CheckerDefinition<ResolvedUdpTarget> {
readonly configKey = "udp";
readonly schemas = udpCheckerSchemas;
readonly type = "udp";
buildDetail(observation: Record<string, unknown>): null | string {
const responded = observation["responded"];
const durationMs = observation["durationMs"];
const duration = typeof durationMs === "number" ? `${durationMs}ms` : "?ms";
if (responded !== true) {
return `no response in ${duration}`;
}
const responseSize = observation["responseSize"];
const parts: string[] = [
`responded in ${duration}, ${typeof responseSize === "number" ? responseSize : "?"} bytes`,
];
const preview = observation["responsePreview"];
if (typeof preview === "string" && preview.length > 0) {
parts.push(`response: ${preview.length > 80 ? `${preview.slice(0, 80)}` : preview}`);
}
return parts.join(", ");
}
async execute(t: ResolvedUdpTarget, ctx: CheckerContext): Promise<CheckResult> {
const timestamp = new Date().toISOString();
const start = performance.now();
const expect = t.expect;
try {
const payloadBytes = decodePayload(t.udp.payload, t.udp.encoding);
const exchangeResult = await udpExchange(t.udp.host, t.udp.port, payloadBytes, ctx.signal);
if (!exchangeResult.ok) {
const durationMs = Math.round(performance.now() - start);
const observation: Record<string, unknown> = {
durationMs,
error: exchangeResult.error,
responded: false,
responsePreview: null,
responseSize: null,
sourceAddress: null,
sourcePort: null,
};
return {
detail: null,
durationMs,
failure: errorFailure("response", "response", exchangeResult.error),
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
if (!exchangeResult.responded) {
const durationMs = Math.round(performance.now() - start);
const expectedResponded = expect?.responded ?? true;
const noResponseMessage = "未收到 UDP 响应";
const error = expectedResponded ? noResponseMessage : null;
const observation: Record<string, unknown> = {
durationMs,
error,
responded: false,
responsePreview: null,
responseSize: null,
sourceAddress: null,
sourcePort: null,
};
if (expectedResponded) {
return {
detail: null,
durationMs,
failure: errorFailure("response", "response", noResponseMessage),
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
const durationResult = checkValueExpectation(durationMs, expect?.durationMs, {
message: "durationMs mismatch",
path: "durationMs",
phase: "duration",
});
if (!durationResult.matched) {
return {
detail: null,
durationMs,
failure: durationResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
return {
detail: null,
durationMs,
failure: null,
matched: true,
observation,
targetId: t.id,
timestamp,
};
}
const durationMs = Math.round(performance.now() - start);
const responsePreview = truncateResponsePreview(encodeResponse(exchangeResult.data, t.udp.responseEncoding));
const observation: Record<string, unknown> = {
durationMs,
error: null,
responded: true,
responsePreview,
responseSize: exchangeResult.data.byteLength,
sourceAddress: exchangeResult.sourceAddress,
sourcePort: exchangeResult.sourcePort,
};
const expectedResponded = expect?.responded ?? true;
const respondedResult = checkResponded(true, expectedResponded);
if (!respondedResult.matched) {
return {
detail: null,
durationMs,
failure: respondedResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
if (exchangeResult.flags.truncated) {
return {
detail: null,
durationMs,
failure: errorFailure("response", "response", "响应 datagram 被内核截断"),
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
if (exchangeResult.data.byteLength > t.udp.maxResponseBytes) {
observation["error"] = `响应超过 ${t.udp.maxResponseBytes} 字节限制 (${exchangeResult.data.byteLength} bytes)`;
return {
detail: null,
durationMs,
failure: errorFailure(
"response",
"response",
`响应超过 ${t.udp.maxResponseBytes} 字节限制 (${exchangeResult.data.byteLength} bytes)`,
),
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
if (expect?.responseSize) {
const sizeResult = checkResponseSize(exchangeResult.data.byteLength, expect.responseSize);
if (!sizeResult.matched) {
return {
detail: null,
durationMs,
failure: sizeResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
}
if (expect?.response) {
const responseText = encodeResponse(exchangeResult.data, t.udp.responseEncoding);
const textResult = checkResponseText(responseText, expect.response);
if (!textResult.matched) {
return {
detail: null,
durationMs,
failure: textResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
}
if (expect?.sourceHost) {
const sourceResult = checkSourceHost(exchangeResult.sourceAddress, expect.sourceHost);
if (!sourceResult.matched) {
return {
detail: null,
durationMs,
failure: sourceResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
}
if (expect?.sourcePort) {
const sourceResult = checkSourcePort(exchangeResult.sourcePort, expect.sourcePort);
if (!sourceResult.matched) {
return {
detail: null,
durationMs,
failure: sourceResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
}
const durationResult = checkValueExpectation(durationMs, expect?.durationMs, {
message: "durationMs mismatch",
path: "durationMs",
phase: "duration",
});
if (!durationResult.matched) {
return {
detail: null,
durationMs,
failure: durationResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
return {
detail: null,
durationMs,
failure: null,
matched: true,
observation,
targetId: t.id,
timestamp,
};
} catch (error) {
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: errorFailure("response", "response", isError(error) ? error.message : String(error)),
matched: false,
observation: null,
targetId: t.id,
timestamp,
};
}
}
resolve(target: RawTargetConfig, context: ResolveContext): ResolvedUdpTarget {
const t = target as RawTargetConfig & { type: "udp"; udp: UdpTargetConfig };
const encoding = t.udp.encoding ?? "text";
const responseEncoding = t.udp.responseEncoding ?? "text";
const maxResponseBytes = parseSize(t.udp.maxResponseBytes ?? DEFAULT_MAX_RESPONSE_BYTES);
const expect = target.expect as ResolvedUdpExpectConfig | undefined;
const resolvedExpect: ResolvedUdpExpectConfig = expect
? {
...expect,
responded: expect.responded ?? true,
}
: { responded: true };
return {
description: null,
expect: resolvedExpect,
group: target.group ?? "default",
id: t.id,
intervalMs: context.defaultIntervalMs,
name: t.name ?? null,
timeoutMs: context.defaultTimeoutMs,
type: "udp",
udp: {
encoding,
host: t.udp.host,
maxResponseBytes,
payload: t.udp.payload ?? "",
port: t.udp.port,
responseEncoding,
},
} satisfies ResolvedUdpTarget;
}
serialize(t: ResolvedUdpTarget): { config: string; target: string } {
return {
config: JSON.stringify(t.udp),
target: `udp ${t.udp.host}:${t.udp.port}`,
};
}
validate(input: CheckerValidationInput) {
return validateUdpConfig(input);
}
}
function simplifyUdpError(message: string): string {
const lower = message.toLowerCase();
if (lower.includes("econnrefused") || lower.includes("connection refused")) return "connection refused";
if (lower.includes("enoent") || lower.includes("not found")) return "host not found";
if (lower.includes("etimedout") || lower.includes("timed out")) return "timed out";
if (lower.includes("econnreset") || lower.includes("reset")) return "connection reset";
if (lower.includes("enetwork") || lower.includes("network")) return "network error";
return message;
}
function truncateResponsePreview(text: string, maxLen = 512): string {
if (text.length <= maxLen) return text;
return text.slice(0, maxLen);
}
async function udpExchange(
hostname: string,
port: number,
payload: Uint8Array,
signal: AbortSignal,
): Promise<UdpExchangeResult> {
let settled = false;
let exchangeResolve: ((value: UdpExchangeResult) => void) | undefined;
const exchangePromise = new Promise<UdpExchangeResult>((resolve) => {
exchangeResolve = resolve;
});
const settle = (result: UdpExchangeResult) => {
if (settled) return;
settled = true;
exchangeResolve!(result);
};
try {
const socket = await Bun.udpSocket({
connect: { hostname, port },
socket: {
data(socket, data, _port, _address, flags) {
settle({
data: new Uint8Array(data.buffer, data.byteOffset, data.byteLength),
flags: { truncated: flags.truncated },
ok: true,
responded: true,
sourceAddress: _address,
sourcePort: _port,
});
try {
socket.close();
} catch {
/* best-effort */
}
},
drain() {
// Bun UDP socket handler 必填项UDP checker 不关注 drain 事件
},
error(socket, error) {
settle({ error: error.message, ok: false });
try {
socket.close();
} catch {
/* best-effort */
}
},
},
});
if (signal.aborted) {
try {
socket.close();
} catch {
/* best-effort */
}
return { error: "探测已取消", ok: false };
}
socket.send(payload);
const onAbort = () => {
settle({ ok: true, responded: false });
try {
socket.close();
} catch {
/* best-effort */
}
};
signal.addEventListener("abort", onAbort, { once: true });
const result = await exchangePromise;
signal.removeEventListener("abort", onAbort);
return result;
} catch (error) {
if (signal.aborted) {
return { error: "探测超时", ok: false };
}
const message = isError(error) ? error.message : String(error);
return { error: simplifyUdpError(message), ok: false };
}
}