1
0
Files
DiAL/src/server/checker/runner/tcp/execute.ts
lanyuanxiaoyao cf847ccd7a feat: 重构配置生命周期为 Authoring/Normalized/Resolved 三层
将变量替换和 expect 简写展开统一放入 Normalized 阶段,
运行时 AJV 使用 Normalized schema,导出 schema 面向 Authoring Config。

主要变更:
- 新增 normalizer.ts 实现 normalizeAuthoringConfig()
- 拆分 Authoring/Normalized 双 schema,checker 接口支持 authoring/normalized 片段
- config-loader 流程:normalize → Normalized AJV → semantic → resolve
- validator 兼容层自动分派 raw/normalized expect 形态
- 删除 rawExpect,store.expect 列写入 null
- Authoring schema 对 integer/boolean/enum 字段接受变量引用
- 修复 DB/HTTP validate 入口守卫和 LLM options integer 变量引用
- 优化 compact() 避免 undefined 覆盖隐患
- 移除 content.ts 恒为 true 的前置条件
- 同步 5 个主规范并归档 change
2026-05-22 14:00:47 +08:00

407 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { isError } from "es-toolkit";
import type { CheckResult, RawTargetConfig } from "../../types";
import type { CheckerContext, CheckerDefinition, CheckerValidationInput, ResolveContext } from "../types";
import type { ResolvedTcpExpectConfig, ResolvedTcpTarget, TcpTargetConfig } from "./types";
import { errorFailure } from "../../expect/failure";
import { checkValueExpectation } from "../../expect/value";
import { parseSize } from "../../utils";
import { checkBanner, checkConnected } from "./expect";
import { tcpCheckerSchemas } from "./schema";
import { validateTcpConfig } from "./validate";
const DEFAULT_BANNER_READ_TIMEOUT = 2000;
const DEFAULT_MAX_BANNER_BYTES = 4096;
type ConnectAndBannerResult =
| { banner?: string; bannerExceeded?: boolean; connectTimeMs: number; ok: true; socket: { close(): void } }
| { error: string; ok: false };
export class TcpChecker implements CheckerDefinition<ResolvedTcpTarget> {
readonly configKey = "tcp";
readonly schemas = tcpCheckerSchemas;
readonly type = "tcp";
buildDetail(observation: Record<string, unknown>): null | string {
const connected = observation["connected"];
if (connected !== true) {
const error = observation["error"];
return typeof error === "string" ? `connection failed: ${error}` : "not connected";
}
const connectTimeMs = observation["connectTimeMs"];
const banner = observation["banner"];
const parts: string[] = [`connected in ${typeof connectTimeMs === "number" ? connectTimeMs : "?"}ms`];
if (typeof banner === "string" && banner.length > 0) {
parts.push(`banner: ${truncateBanner(banner)}`);
}
return parts.join(", ");
}
async execute(t: ResolvedTcpTarget, ctx: CheckerContext): Promise<CheckResult> {
const timestamp = new Date().toISOString();
const start = performance.now();
const expect = t.expect;
try {
const connectResult = await connectAndMaybeReadBanner(
t.tcp.host,
t.tcp.port,
t.tcp.readBanner,
t.tcp.bannerReadTimeout,
t.tcp.maxBannerBytes,
ctx.signal,
);
if (!connectResult.ok) {
const durationMs = Math.round(performance.now() - start);
const observation: Record<string, unknown> = {
banner: null,
connected: false,
connectTimeMs: null,
error: connectResult.error,
};
if (expect?.connected === false) {
return {
detail: null,
durationMs,
failure: null,
matched: true,
observation,
targetId: t.id,
timestamp,
};
}
return {
detail: null,
durationMs,
failure: errorFailure("connect", "connect", connectResult.error),
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
const socket = connectResult.socket;
const connectTimeMs = connectResult.connectTimeMs;
if (ctx.signal.aborted) {
closeSocket(socket);
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: errorFailure("connect", "connect", `连接超时 (${t.timeoutMs}ms)`),
matched: false,
observation: null,
targetId: t.id,
timestamp,
};
}
const expectedConnected = expect?.connected ?? true;
const connectedResult = checkConnected(true, expectedConnected);
if (!connectedResult.matched) {
closeSocket(socket);
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: connectedResult.failure,
matched: false,
observation: { banner: null, connected: true, connectTimeMs, error: null },
targetId: t.id,
timestamp,
};
}
if (connectResult.bannerExceeded) {
closeSocket(socket);
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: errorFailure("banner", "banner", `banner 数据超过 ${t.tcp.maxBannerBytes} 字节限制`),
matched: false,
observation: { banner: null, connected: true, connectTimeMs, error: null },
targetId: t.id,
timestamp,
};
}
const banner = connectResult.banner ?? "";
closeSocket(socket);
const observation: Record<string, unknown> = {
banner: banner ? truncateBannerForObservation(banner) : null,
connected: true,
connectTimeMs,
error: null,
};
if (expect?.banner) {
const bannerCheck = checkBanner(banner, expect.banner);
if (!bannerCheck.matched) {
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: bannerCheck.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
}
const durationMs = Math.round(performance.now() - start);
const durationResult = checkValueExpectation(durationMs, expect?.durationMs, {
message: "durationMs mismatch",
path: "durationMs",
phase: "duration",
});
if (!durationResult.matched) {
return {
detail: null,
durationMs,
failure: durationResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
return {
detail: null,
durationMs,
failure: null,
matched: true,
observation,
targetId: t.id,
timestamp,
};
} catch (error) {
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: errorFailure(
"connect",
"connect",
ctx.signal.aborted ? `连接超时 (${t.timeoutMs}ms)` : isError(error) ? error.message : String(error),
),
matched: false,
observation: null,
targetId: t.id,
timestamp,
};
}
}
resolve(target: RawTargetConfig, context: ResolveContext): ResolvedTcpTarget {
const t = target as RawTargetConfig & { tcp: TcpTargetConfig; type: "tcp" };
const maxBannerBytes = parseSize(t.tcp.maxBannerBytes ?? DEFAULT_MAX_BANNER_BYTES);
const bannerReadTimeout = t.tcp.bannerReadTimeout ?? DEFAULT_BANNER_READ_TIMEOUT;
const expect = target.expect as ResolvedTcpExpectConfig | undefined;
const resolvedExpect: ResolvedTcpExpectConfig = expect
? {
...expect,
connected: expect.connected ?? true,
}
: { connected: true };
return {
description: null,
expect: resolvedExpect,
group: target.group ?? "default",
id: t.id,
intervalMs: context.defaultIntervalMs,
name: t.name ?? null,
tcp: {
bannerReadTimeout,
host: t.tcp.host,
maxBannerBytes,
port: t.tcp.port,
readBanner: t.tcp.readBanner ?? false,
},
timeoutMs: context.defaultTimeoutMs,
type: "tcp",
} satisfies ResolvedTcpTarget;
}
serialize(t: ResolvedTcpTarget): { config: string; target: string } {
return {
config: JSON.stringify({
bannerReadTimeout: t.tcp.bannerReadTimeout,
host: t.tcp.host,
maxBannerBytes: t.tcp.maxBannerBytes,
port: t.tcp.port,
readBanner: t.tcp.readBanner,
}),
target: `${t.tcp.host}:${t.tcp.port}`,
};
}
validate(input: CheckerValidationInput) {
return validateTcpConfig(input);
}
}
function assembleChunks(chunks: Uint8Array[], totalBytes: number): Uint8Array {
const result = new Uint8Array(totalBytes);
let offset = 0;
for (const chunk of chunks) {
result.set(chunk, offset);
offset += chunk.byteLength;
}
return result;
}
function closeSocket(socket: { close(): void }) {
try {
socket.close();
} catch {
/* best-effort close */
}
}
async function connectAndMaybeReadBanner(
hostname: string,
port: number,
readBanner: boolean,
bannerTimeoutMs: number,
maxBannerBytes: number,
signal: AbortSignal,
): Promise<ConnectAndBannerResult> {
const chunks: Uint8Array[] = [];
let totalBytes = 0;
let bannerSettled = false;
let bannerExceeded = false;
let bannerResolve: ((value: void) => void) | undefined;
const bannerPromise = new Promise<void>((resolve) => {
bannerResolve = resolve;
});
const socketHandlers: Record<string, (...args: unknown[]) => void> = {
close() {
if (readBanner && !bannerSettled) {
bannerSettled = true;
bannerResolve!();
}
},
data(_socket: unknown, data: unknown) {
if (!readBanner || bannerSettled) return;
const bytes = data as Uint8Array;
totalBytes += bytes.byteLength;
if (totalBytes > maxBannerBytes) {
bannerSettled = true;
bannerExceeded = true;
bannerResolve!();
return;
}
chunks.push(bytes);
},
drain() {
// Bun socket handler 必填项TCP checker 不关注 drain 事件
},
end() {
if (readBanner && !bannerSettled) {
bannerSettled = true;
bannerResolve!();
}
},
error() {
if (readBanner && !bannerSettled) {
bannerSettled = true;
bannerResolve!();
}
},
open() {
// Bun socket handler 必填项,连接成功由 Bun.connect() resolve 表示
},
};
try {
const connectStart = performance.now();
const socket = await Bun.connect({
hostname,
port,
socket: socketHandlers,
});
const connectTimeMs = Math.round(performance.now() - connectStart);
if (signal.aborted) {
closeSocket(socket);
return { error: "连接已取消", ok: false };
}
if (!readBanner) {
return { bannerExceeded: false, connectTimeMs, ok: true, socket };
}
const timer = setTimeout(() => {
if (bannerSettled) return;
bannerSettled = true;
bannerResolve!();
}, bannerTimeoutMs);
const onAbort = () => {
if (bannerSettled) return;
bannerSettled = true;
clearTimeout(timer);
bannerResolve!();
};
if (signal.aborted) {
clearTimeout(timer);
closeSocket(socket);
return { error: "连接已取消", ok: false };
}
signal.addEventListener("abort", onAbort, { once: true });
await bannerPromise;
clearTimeout(timer);
signal.removeEventListener("abort", onAbort);
if (bannerExceeded) {
return { bannerExceeded: true, connectTimeMs, ok: true, socket };
}
const banner = new TextDecoder().decode(assembleChunks(chunks, totalBytes));
return { banner, bannerExceeded: false, connectTimeMs, ok: true, socket };
} catch (error) {
if (signal.aborted) {
return { error: "连接超时", ok: false };
}
const message = isError(error) ? error.message : String(error);
return { error: simplifyConnectError(message), ok: false };
}
}
function simplifyConnectError(message: string): string {
const lower = message.toLowerCase();
if (lower.includes("econnrefused") || lower.includes("connection refused")) return "connection refused";
if (lower.includes("enoent") || lower.includes("not found")) return "host not found";
if (lower.includes("etimedout") || lower.includes("timed out")) return "connection timed out";
if (lower.includes("econnreset") || lower.includes("reset")) return "connection reset";
if (lower.includes("enetwork") || lower.includes("network")) return "network error";
return message;
}
function truncateBanner(banner: string, maxLen = 80): string {
if (banner.length <= maxLen) return banner;
return `${banner.slice(0, maxLen)}`;
}
function truncateBannerForObservation(banner: string, maxLen = 256): string {
if (banner.length <= maxLen) return banner;
return banner.slice(0, maxLen);
}