1
0
Files
DiAL/src/server/checker/runner/tcp/execute.ts
lanyuanxiaoyao 79358ba50d refactor: 移除顶层 defaults 配置段,简化为 target 显式字段 > 代码内置默认值
- 移除 DefaultsConfig 类型、ProbeConfig.defaults 字段
- 移除 CheckerSchemas.defaults、ResolveContext.defaults、CheckerValidationInput.defaults
- 更新所有 checker schema/resolve/validate 删除 defaults 合并逻辑
- 更新 config-loader 不再读取传递 defaults
- 更新测试、README、DEVELOPMENT、probes.example.yaml
- 重新生成 probe-config.schema.json(不含 defaults)
- 同步 delta specs 到主规范
- 归档 openspec change
2026-05-21 16:53:12 +08:00

410 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { isError } from "es-toolkit";
import type { CheckResult, RawTargetConfig } from "../../types";
import type { CheckerContext, CheckerDefinition, CheckerValidationInput, ResolveContext } from "../types";
import type { RawTcpExpectConfig, ResolvedTcpExpectConfig, ResolvedTcpTarget, TcpTargetConfig } from "./types";
import { resolveContentExpectations } from "../../expect/content";
import { errorFailure } from "../../expect/failure";
import { checkValueExpectation, resolveValueExpectation } from "../../expect/value";
import { parseSize } from "../../utils";
import { checkBanner, checkConnected } from "./expect";
import { tcpCheckerSchemas } from "./schema";
import { validateTcpConfig } from "./validate";
const DEFAULT_BANNER_READ_TIMEOUT = 2000;
const DEFAULT_MAX_BANNER_BYTES = 4096;
type ConnectAndBannerResult =
| { banner?: string; bannerExceeded?: boolean; connectTimeMs: number; ok: true; socket: { close(): void } }
| { error: string; ok: false };
export class TcpChecker implements CheckerDefinition<ResolvedTcpTarget> {
readonly configKey = "tcp";
readonly schemas = tcpCheckerSchemas;
readonly type = "tcp";
buildDetail(observation: Record<string, unknown>): null | string {
const connected = observation["connected"];
if (connected !== true) {
const error = observation["error"];
return typeof error === "string" ? `connection failed: ${error}` : "not connected";
}
const connectTimeMs = observation["connectTimeMs"];
const banner = observation["banner"];
const parts: string[] = [`connected in ${typeof connectTimeMs === "number" ? connectTimeMs : "?"}ms`];
if (typeof banner === "string" && banner.length > 0) {
parts.push(`banner: ${truncateBanner(banner)}`);
}
return parts.join(", ");
}
async execute(t: ResolvedTcpTarget, ctx: CheckerContext): Promise<CheckResult> {
const timestamp = new Date().toISOString();
const start = performance.now();
const expect = t.expect;
try {
const connectResult = await connectAndMaybeReadBanner(
t.tcp.host,
t.tcp.port,
t.tcp.readBanner,
t.tcp.bannerReadTimeout,
t.tcp.maxBannerBytes,
ctx.signal,
);
if (!connectResult.ok) {
const durationMs = Math.round(performance.now() - start);
const observation: Record<string, unknown> = {
banner: null,
connected: false,
connectTimeMs: null,
error: connectResult.error,
};
if (expect?.connected === false) {
return {
detail: null,
durationMs,
failure: null,
matched: true,
observation,
targetId: t.id,
timestamp,
};
}
return {
detail: null,
durationMs,
failure: errorFailure("connect", "connect", connectResult.error),
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
const socket = connectResult.socket;
const connectTimeMs = connectResult.connectTimeMs;
if (ctx.signal.aborted) {
closeSocket(socket);
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: errorFailure("connect", "connect", `连接超时 (${t.timeoutMs}ms)`),
matched: false,
observation: null,
targetId: t.id,
timestamp,
};
}
const expectedConnected = expect?.connected ?? true;
const connectedResult = checkConnected(true, expectedConnected);
if (!connectedResult.matched) {
closeSocket(socket);
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: connectedResult.failure,
matched: false,
observation: { banner: null, connected: true, connectTimeMs, error: null },
targetId: t.id,
timestamp,
};
}
if (connectResult.bannerExceeded) {
closeSocket(socket);
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: errorFailure("banner", "banner", `banner 数据超过 ${t.tcp.maxBannerBytes} 字节限制`),
matched: false,
observation: { banner: null, connected: true, connectTimeMs, error: null },
targetId: t.id,
timestamp,
};
}
const banner = connectResult.banner ?? "";
closeSocket(socket);
const observation: Record<string, unknown> = {
banner: banner ? truncateBannerForObservation(banner) : null,
connected: true,
connectTimeMs,
error: null,
};
if (expect?.banner) {
const bannerCheck = checkBanner(banner, expect.banner);
if (!bannerCheck.matched) {
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: bannerCheck.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
}
const durationMs = Math.round(performance.now() - start);
const durationResult = checkValueExpectation(durationMs, expect?.durationMs, {
message: "durationMs mismatch",
path: "durationMs",
phase: "duration",
});
if (!durationResult.matched) {
return {
detail: null,
durationMs,
failure: durationResult.failure,
matched: false,
observation,
targetId: t.id,
timestamp,
};
}
return {
detail: null,
durationMs,
failure: null,
matched: true,
observation,
targetId: t.id,
timestamp,
};
} catch (error) {
const durationMs = Math.round(performance.now() - start);
return {
detail: null,
durationMs,
failure: errorFailure(
"connect",
"connect",
ctx.signal.aborted ? `连接超时 (${t.timeoutMs}ms)` : isError(error) ? error.message : String(error),
),
matched: false,
observation: null,
targetId: t.id,
timestamp,
};
}
}
resolve(target: RawTargetConfig, context: ResolveContext): ResolvedTcpTarget {
const t = target as RawTargetConfig & { tcp: TcpTargetConfig; type: "tcp" };
const maxBannerBytes = parseSize(t.tcp.maxBannerBytes ?? DEFAULT_MAX_BANNER_BYTES);
const bannerReadTimeout = t.tcp.bannerReadTimeout ?? DEFAULT_BANNER_READ_TIMEOUT;
const rawExpect = target.expect as RawTcpExpectConfig | undefined;
const resolvedExpect: ResolvedTcpExpectConfig = rawExpect
? {
banner: resolveContentExpectations(rawExpect.banner),
connected: rawExpect.connected ?? true,
durationMs: resolveValueExpectation(rawExpect.durationMs),
}
: { connected: true };
return {
description: null,
expect: resolvedExpect,
group: target.group ?? "default",
id: t.id,
intervalMs: context.defaultIntervalMs,
name: t.name ?? null,
rawExpect,
tcp: {
bannerReadTimeout,
host: t.tcp.host,
maxBannerBytes,
port: t.tcp.port,
readBanner: t.tcp.readBanner ?? false,
},
timeoutMs: context.defaultTimeoutMs,
type: "tcp",
} satisfies ResolvedTcpTarget;
}
serialize(t: ResolvedTcpTarget): { config: string; target: string } {
return {
config: JSON.stringify({
bannerReadTimeout: t.tcp.bannerReadTimeout,
host: t.tcp.host,
maxBannerBytes: t.tcp.maxBannerBytes,
port: t.tcp.port,
readBanner: t.tcp.readBanner,
}),
target: `${t.tcp.host}:${t.tcp.port}`,
};
}
validate(input: CheckerValidationInput) {
return validateTcpConfig(input);
}
}
function assembleChunks(chunks: Uint8Array[], totalBytes: number): Uint8Array {
const result = new Uint8Array(totalBytes);
let offset = 0;
for (const chunk of chunks) {
result.set(chunk, offset);
offset += chunk.byteLength;
}
return result;
}
function closeSocket(socket: { close(): void }) {
try {
socket.close();
} catch {
/* best-effort close */
}
}
async function connectAndMaybeReadBanner(
hostname: string,
port: number,
readBanner: boolean,
bannerTimeoutMs: number,
maxBannerBytes: number,
signal: AbortSignal,
): Promise<ConnectAndBannerResult> {
const chunks: Uint8Array[] = [];
let totalBytes = 0;
let bannerSettled = false;
let bannerExceeded = false;
let bannerResolve: ((value: void) => void) | undefined;
const bannerPromise = new Promise<void>((resolve) => {
bannerResolve = resolve;
});
const socketHandlers: Record<string, (...args: unknown[]) => void> = {
close() {
if (readBanner && !bannerSettled) {
bannerSettled = true;
bannerResolve!();
}
},
data(_socket: unknown, data: unknown) {
if (!readBanner || bannerSettled) return;
const bytes = data as Uint8Array;
totalBytes += bytes.byteLength;
if (totalBytes > maxBannerBytes) {
bannerSettled = true;
bannerExceeded = true;
bannerResolve!();
return;
}
chunks.push(bytes);
},
drain() {
// Bun socket handler 必填项TCP checker 不关注 drain 事件
},
end() {
if (readBanner && !bannerSettled) {
bannerSettled = true;
bannerResolve!();
}
},
error() {
if (readBanner && !bannerSettled) {
bannerSettled = true;
bannerResolve!();
}
},
open() {
// Bun socket handler 必填项,连接成功由 Bun.connect() resolve 表示
},
};
try {
const connectStart = performance.now();
const socket = await Bun.connect({
hostname,
port,
socket: socketHandlers,
});
const connectTimeMs = Math.round(performance.now() - connectStart);
if (signal.aborted) {
closeSocket(socket);
return { error: "连接已取消", ok: false };
}
if (!readBanner) {
return { bannerExceeded: false, connectTimeMs, ok: true, socket };
}
const timer = setTimeout(() => {
if (bannerSettled) return;
bannerSettled = true;
bannerResolve!();
}, bannerTimeoutMs);
const onAbort = () => {
if (bannerSettled) return;
bannerSettled = true;
clearTimeout(timer);
bannerResolve!();
};
if (signal.aborted) {
clearTimeout(timer);
closeSocket(socket);
return { error: "连接已取消", ok: false };
}
signal.addEventListener("abort", onAbort, { once: true });
await bannerPromise;
clearTimeout(timer);
signal.removeEventListener("abort", onAbort);
if (bannerExceeded) {
return { bannerExceeded: true, connectTimeMs, ok: true, socket };
}
const banner = new TextDecoder().decode(assembleChunks(chunks, totalBytes));
return { banner, bannerExceeded: false, connectTimeMs, ok: true, socket };
} catch (error) {
if (signal.aborted) {
return { error: "连接超时", ok: false };
}
const message = isError(error) ? error.message : String(error);
return { error: simplifyConnectError(message), ok: false };
}
}
function simplifyConnectError(message: string): string {
const lower = message.toLowerCase();
if (lower.includes("econnrefused") || lower.includes("connection refused")) return "connection refused";
if (lower.includes("enoent") || lower.includes("not found")) return "host not found";
if (lower.includes("etimedout") || lower.includes("timed out")) return "connection timed out";
if (lower.includes("econnreset") || lower.includes("reset")) return "connection reset";
if (lower.includes("enetwork") || lower.includes("network")) return "network error";
return message;
}
function truncateBanner(banner: string, maxLen = 80): string {
if (banner.length <= maxLen) return banner;
return `${banner.slice(0, maxLen)}`;
}
function truncateBannerForObservation(banner: string, maxLen = 256): string {
if (banner.length <= maxLen) return banner;
return banner.slice(0, maxLen);
}