feat: 新增 UDP checker,支持自定义 payload 请求-响应探测与断言
基于 Bun connected UDP socket 实现通用 UDP 拨测能力: - 支持 text/hex/base64 payload 编码与独立 responseEncoding 响应视图 - 支持 responded、response、responseSize、sourceHost、sourcePort、maxDurationMs 专属 expect - 单 datagram 发送,仅断言首个 UDP 响应 datagram - 通过 maxResponseBytes 和 flags.truncated 进行响应大小限制与截断保护 - payload 可选,省略时发送空 datagram - 自包含模块结构(types/schema/validate/expect/encoding/execute) - 新增 741 tests(含 unit、execute 集成、expect 和编码 roundtrip),全部通过
This commit is contained in:
388
src/server/checker/runner/udp/execute.ts
Normal file
388
src/server/checker/runner/udp/execute.ts
Normal file
@@ -0,0 +1,388 @@
|
||||
import { isError } from "es-toolkit";
|
||||
|
||||
import type { CheckResult, RawTargetConfig } from "../../types";
|
||||
import type { CheckerContext, CheckerDefinition, CheckerValidationInput, ResolveContext } from "../types";
|
||||
import type { ResolvedUdpTarget, UdpDefaultsConfig, UdpExpectConfig, UdpTargetConfig } from "./types";
|
||||
|
||||
import { checkDuration } from "../../expect/duration";
|
||||
import { errorFailure } from "../../expect/failure";
|
||||
import { parseSize } from "../../utils";
|
||||
import { decodePayload, encodeResponse } from "./encoding";
|
||||
import { checkResponded, checkResponseSize, checkResponseText, checkSourceHost, checkSourcePort } from "./expect";
|
||||
import { udpCheckerSchemas } from "./schema";
|
||||
import { validateUdpConfig } from "./validate";
|
||||
|
||||
const DEFAULT_MAX_RESPONSE_BYTES = 4096;
|
||||
const RESPONSE_PREVIEW_MAX = 80;
|
||||
|
||||
type UdpExchangeResult =
|
||||
| {
|
||||
data: Uint8Array;
|
||||
flags: { truncated: boolean };
|
||||
ok: true;
|
||||
responded: true;
|
||||
sourceAddress: string;
|
||||
sourcePort: number;
|
||||
}
|
||||
| { error: string; ok: false }
|
||||
| {
|
||||
ok: true;
|
||||
responded: false;
|
||||
};
|
||||
|
||||
export class UdpChecker implements CheckerDefinition<ResolvedUdpTarget> {
|
||||
readonly configKey = "udp";
|
||||
readonly schemas = udpCheckerSchemas;
|
||||
readonly type = "udp";
|
||||
|
||||
async execute(t: ResolvedUdpTarget, ctx: CheckerContext): Promise<CheckResult> {
|
||||
const timestamp = new Date().toISOString();
|
||||
const start = performance.now();
|
||||
const expect = t.expect;
|
||||
|
||||
try {
|
||||
const payloadBytes = decodePayload(t.udp.payload, t.udp.encoding);
|
||||
|
||||
const exchangeResult = await udpExchange(t.udp.host, t.udp.port, payloadBytes, ctx.signal);
|
||||
|
||||
if (!exchangeResult.ok) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
if (expect?.responded === false) {
|
||||
return {
|
||||
durationMs,
|
||||
failure: null,
|
||||
matched: true,
|
||||
statusDetail: exchangeResult.error,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
return {
|
||||
durationMs,
|
||||
failure: errorFailure("response", "response", exchangeResult.error),
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
const expectedResponded = expect?.responded ?? true;
|
||||
const respondedResult = checkResponded(exchangeResult.responded, expectedResponded);
|
||||
if (!respondedResult.matched) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
return {
|
||||
durationMs,
|
||||
failure: respondedResult.failure,
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
if (!exchangeResult.responded) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
const durationResult = checkDuration(durationMs, expect?.maxDurationMs);
|
||||
if (!durationResult.matched) {
|
||||
return {
|
||||
durationMs,
|
||||
failure: durationResult.failure,
|
||||
matched: false,
|
||||
statusDetail: buildNoResponseDetail(durationMs),
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
return {
|
||||
durationMs,
|
||||
failure: null,
|
||||
matched: true,
|
||||
statusDetail: buildNoResponseDetail(durationMs),
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
if (exchangeResult.flags.truncated) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
return {
|
||||
durationMs,
|
||||
failure: errorFailure("response", "response", "响应 datagram 被内核截断"),
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
if (exchangeResult.data.byteLength > t.udp.maxResponseBytes) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
return {
|
||||
durationMs,
|
||||
failure: errorFailure(
|
||||
"response",
|
||||
"response",
|
||||
`响应超过 ${t.udp.maxResponseBytes} 字节限制 (${exchangeResult.data.byteLength} bytes)`,
|
||||
),
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
if (expect?.responseSize) {
|
||||
const sizeResult = checkResponseSize(exchangeResult.data.byteLength, expect.responseSize);
|
||||
if (!sizeResult.matched) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
return {
|
||||
durationMs,
|
||||
failure: sizeResult.failure,
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (expect?.response) {
|
||||
const responseText = encodeResponse(exchangeResult.data, t.udp.responseEncoding);
|
||||
const textResult = checkResponseText(responseText, expect.response);
|
||||
if (!textResult.matched) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
return {
|
||||
durationMs,
|
||||
failure: textResult.failure,
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (expect?.sourceHost) {
|
||||
const sourceResult = checkSourceHost(exchangeResult.sourceAddress, expect.sourceHost);
|
||||
if (!sourceResult.matched) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
return {
|
||||
durationMs,
|
||||
failure: sourceResult.failure,
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if (expect?.sourcePort) {
|
||||
const sourceResult = checkSourcePort(exchangeResult.sourcePort, expect.sourcePort);
|
||||
if (!sourceResult.matched) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
return {
|
||||
durationMs,
|
||||
failure: sourceResult.failure,
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
const durationResult = checkDuration(durationMs, expect?.maxDurationMs);
|
||||
if (!durationResult.matched) {
|
||||
return {
|
||||
durationMs,
|
||||
failure: durationResult.failure,
|
||||
matched: false,
|
||||
statusDetail: buildRespondedDetail(
|
||||
exchangeResult.data.byteLength,
|
||||
durationMs,
|
||||
t.udp.responseEncoding,
|
||||
exchangeResult.data,
|
||||
),
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
durationMs,
|
||||
failure: null,
|
||||
matched: true,
|
||||
statusDetail: buildRespondedDetail(
|
||||
exchangeResult.data.byteLength,
|
||||
durationMs,
|
||||
t.udp.responseEncoding,
|
||||
exchangeResult.data,
|
||||
),
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
} catch (error) {
|
||||
const durationMs = Math.round(performance.now() - start);
|
||||
return {
|
||||
durationMs,
|
||||
failure: errorFailure("response", "response", isError(error) ? error.message : String(error)),
|
||||
matched: false,
|
||||
statusDetail: null,
|
||||
targetId: t.id,
|
||||
timestamp,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
resolve(target: RawTargetConfig, context: ResolveContext): ResolvedUdpTarget {
|
||||
const t = target as RawTargetConfig & { type: "udp"; udp: UdpTargetConfig };
|
||||
const udpDefaults = context.defaults["udp"] as UdpDefaultsConfig | undefined;
|
||||
|
||||
const encoding = t.udp.encoding ?? udpDefaults?.encoding ?? "text";
|
||||
const responseEncoding = t.udp.responseEncoding ?? udpDefaults?.responseEncoding ?? "text";
|
||||
const maxResponseBytes = parseSize(
|
||||
t.udp.maxResponseBytes ?? udpDefaults?.maxResponseBytes ?? DEFAULT_MAX_RESPONSE_BYTES,
|
||||
);
|
||||
|
||||
return {
|
||||
description: null,
|
||||
expect: target.expect as UdpExpectConfig | undefined,
|
||||
group: target.group ?? "default",
|
||||
id: t.id,
|
||||
intervalMs: context.defaultIntervalMs,
|
||||
name: t.name ?? null,
|
||||
timeoutMs: context.defaultTimeoutMs,
|
||||
type: "udp",
|
||||
udp: {
|
||||
encoding,
|
||||
host: t.udp.host,
|
||||
maxResponseBytes,
|
||||
payload: t.udp.payload ?? "",
|
||||
port: t.udp.port,
|
||||
responseEncoding,
|
||||
},
|
||||
} satisfies ResolvedUdpTarget;
|
||||
}
|
||||
|
||||
serialize(t: ResolvedUdpTarget): { config: string; target: string } {
|
||||
return {
|
||||
config: JSON.stringify(t.udp),
|
||||
target: `udp ${t.udp.host}:${t.udp.port}`,
|
||||
};
|
||||
}
|
||||
|
||||
validate(input: CheckerValidationInput) {
|
||||
return validateUdpConfig(input);
|
||||
}
|
||||
}
|
||||
|
||||
function buildNoResponseDetail(durationMs: number): string {
|
||||
return `no response in ${durationMs}ms`;
|
||||
}
|
||||
|
||||
function buildRespondedDetail(size: number, durationMs: number, encoding: string, data: Uint8Array): string {
|
||||
let detail = `responded in ${durationMs}ms, ${size} bytes`;
|
||||
if (size > 0 && size <= RESPONSE_PREVIEW_MAX) {
|
||||
const preview = encodeResponse(data, encoding as "base64" | "hex" | "text");
|
||||
const truncated = preview.length > RESPONSE_PREVIEW_MAX ? `${preview.slice(0, RESPONSE_PREVIEW_MAX)}…` : preview;
|
||||
detail = `${detail}, response: ${truncated}`;
|
||||
}
|
||||
return detail;
|
||||
}
|
||||
|
||||
function simplifyUdpError(message: string): string {
|
||||
const lower = message.toLowerCase();
|
||||
if (lower.includes("econnrefused") || lower.includes("connection refused")) return "connection refused";
|
||||
if (lower.includes("enoent") || lower.includes("not found")) return "host not found";
|
||||
if (lower.includes("etimedout") || lower.includes("timed out")) return "timed out";
|
||||
if (lower.includes("econnreset") || lower.includes("reset")) return "connection reset";
|
||||
if (lower.includes("enetwork") || lower.includes("network")) return "network error";
|
||||
return message;
|
||||
}
|
||||
|
||||
async function udpExchange(
|
||||
hostname: string,
|
||||
port: number,
|
||||
payload: Uint8Array,
|
||||
signal: AbortSignal,
|
||||
): Promise<UdpExchangeResult> {
|
||||
let settled = false;
|
||||
let exchangeResolve: ((value: UdpExchangeResult) => void) | undefined;
|
||||
const exchangePromise = new Promise<UdpExchangeResult>((resolve) => {
|
||||
exchangeResolve = resolve;
|
||||
});
|
||||
|
||||
const settle = (result: UdpExchangeResult) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
exchangeResolve!(result);
|
||||
};
|
||||
|
||||
try {
|
||||
const socket = await Bun.udpSocket({
|
||||
connect: { hostname, port },
|
||||
socket: {
|
||||
data(socket, data, _port, _address, flags) {
|
||||
settle({
|
||||
data: new Uint8Array(data.buffer, data.byteOffset, data.byteLength),
|
||||
flags: { truncated: flags.truncated },
|
||||
ok: true,
|
||||
responded: true,
|
||||
sourceAddress: _address,
|
||||
sourcePort: _port,
|
||||
});
|
||||
try {
|
||||
socket.close();
|
||||
} catch {
|
||||
/* best-effort */
|
||||
}
|
||||
},
|
||||
drain() {
|
||||
// Bun UDP socket handler 必填项,UDP checker 不关注 drain 事件
|
||||
},
|
||||
error(socket, error) {
|
||||
settle({ error: error.message, ok: false });
|
||||
try {
|
||||
socket.close();
|
||||
} catch {
|
||||
/* best-effort */
|
||||
}
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
if (signal.aborted) {
|
||||
try {
|
||||
socket.close();
|
||||
} catch {
|
||||
/* best-effort */
|
||||
}
|
||||
return { error: "探测已取消", ok: false };
|
||||
}
|
||||
|
||||
socket.send(payload);
|
||||
|
||||
const onAbort = () => {
|
||||
settle({ ok: true, responded: false });
|
||||
try {
|
||||
socket.close();
|
||||
} catch {
|
||||
/* best-effort */
|
||||
}
|
||||
};
|
||||
signal.addEventListener("abort", onAbort, { once: true });
|
||||
|
||||
const result = await exchangePromise;
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
return result;
|
||||
} catch (error) {
|
||||
if (signal.aborted) {
|
||||
return { error: "探测超时", ok: false };
|
||||
}
|
||||
const message = isError(error) ? error.message : String(error);
|
||||
return { error: simplifyUdpError(message), ok: false };
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user