import type Database from "bun:sqlite"; import { describe, expect, test } from "bun:test"; import { and, eq } from "drizzle-orm"; import { notDeleted, wrap } from "../../../src/server/db/connection"; import { createMaterial } from "../../../src/server/db/materials"; import { createProject } from "../../../src/server/db/projects"; import { materials } from "../../../src/server/db/schema"; import { createNoopLogger } from "../../../src/server/logger"; import type { ProcessableMaterial } from "../../../src/server/processing/processor"; import { MaterialProcessor } from "../../../src/server/processing/processor"; import { createMigratedTestDatabase } from "../../helpers"; const LOG = createNoopLogger(); function withProcessorDb(callback: (db: Database) => void): void { const handle = createMigratedTestDatabase("processor-test"); try { callback(handle.db); handle.close(); } finally { handle.cleanup(); } } function setupProject(db: Database, name = "测试项目"): string { const result = createProject(db, { name }, LOG); if ("error" in result) throw new Error(result.error); return result.project.id; } function setupMaterial( db: Database, projectId: string, overrides: Partial<{ associatedDate: string; description: string; materialType: "general" | "meeting"; }> = {}, ): string { const result = createMaterial( db, projectId, { associatedDate: overrides.associatedDate ?? "2024-01-15", description: overrides.description ?? "测试素材", materialType: overrides.materialType, }, LOG, ); if ("error" in result) throw new Error(result.error); return result.material.id; } function getMaterialRow(db: Database, materialId: string) { return wrap(db) .select() .from(materials) .where(and(eq(materials.id, materialId), notDeleted(materials))) .get(); } function setMaterialStatus( db: Database, materialId: string, status: "approved" | "discarded" | "failed" | "pending" | "processing" | "review", ): void { db.prepare("UPDATE materials SET status = ?, updated_at = ? WHERE id = ?").run( status, new Date().toISOString(), materialId, ); } class FailingProcessor extends MaterialProcessor { public attempts = 0; public failUntilAttempt = Number.POSITIVE_INFINITY; protected override async processOne(material: ProcessableMaterial): Promise { this.attempts += 1; if (this.attempts <= this.failUntilAttempt) { throw new Error(`mock failure ${this.attempts}`); } return super.processOne(material); } } describe("素材处理器", () => { test("recoverStuckMaterials 将 processing 状态恢复为 pending", () => { withProcessorDb((db) => { const projectId = setupProject(db); const id1 = setupMaterial(db, projectId); const id2 = setupMaterial(db, projectId); setMaterialStatus(db, id1, "processing"); setMaterialStatus(db, id2, "processing"); const processor = new MaterialProcessor(db, LOG); const recovered = processor.recoverStuckMaterials(); expect(recovered).toBe(2); expect(getMaterialRow(db, id1)?.status).toBe("pending"); expect(getMaterialRow(db, id2)?.status).toBe("pending"); }); }); test("recoverStuckMaterials 无 processing 素材时返回 0", () => { withProcessorDb((db) => { const projectId = setupProject(db); setupMaterial(db, projectId); const processor = new MaterialProcessor(db, LOG); const recovered = processor.recoverStuckMaterials(); expect(recovered).toBe(0); }); }); test("processNext 将 pending 素材处理为 review 并写入 processedContent", async () => { await withProcessorDbAsync(async (db) => { const projectId = setupProject(db); const id = setupMaterial(db, projectId, { description: "测试内容" }); const processor = new MaterialProcessor(db, LOG); await processor.processNext(); const row = getMaterialRow(db, id); expect(row?.status).toBe("review"); expect(row?.processedContent).toBe("测试内容"); }); }); test("processNext 根据 materialType 选择模板", async () => { await withProcessorDbAsync(async (db) => { const projectId = setupProject(db); const id = setupMaterial(db, projectId, { description: "会议内容", materialType: "meeting", }); const processor = new MaterialProcessor(db, LOG); await processor.processNext(); const row = getMaterialRow(db, id); expect(row?.status).toBe("review"); expect(row?.processedContent).toBe("会议内容"); }); }); test("processNext 重试机制:前 2 次失败,第 3 次成功", async () => { await withProcessorDbAsync(async (db) => { const projectId = setupProject(db); const id = setupMaterial(db, projectId); const processor = new FailingProcessor(db, LOG); processor.failUntilAttempt = 2; await processor.processNext(); expect(processor.attempts).toBe(3); const row = getMaterialRow(db, id); expect(row?.status).toBe("review"); expect(row?.processedContent).not.toBeNull(); }); }); test("processNext 3 次都失败后标记为 failed", async () => { await withProcessorDbAsync(async (db) => { const projectId = setupProject(db); const id = setupMaterial(db, projectId); const processor = new FailingProcessor(db, LOG); processor.failUntilAttempt = Number.POSITIVE_INFINITY; await processor.processNext(); expect(processor.attempts).toBe(3); const row = getMaterialRow(db, id); expect(row?.status).toBe("failed"); expect(row?.processedContent).toBeNull(); }); }); test("空队列时不报错", async () => { await withProcessorDbAsync(async (db) => { setupProject(db); const processor = new MaterialProcessor(db, LOG); await processor.processNext(); }); }); test("FIFO 顺序:先创建的先处理", async () => { await withProcessorDbAsync(async (db) => { const projectId = setupProject(db); const id1 = setupMaterial(db, projectId, { description: "第一个" }); await new Promise((r) => setTimeout(r, 20)); const id2 = setupMaterial(db, projectId, { description: "第二个" }); const processor = new MaterialProcessor(db, LOG); await processor.processNext(); expect(getMaterialRow(db, id1)?.status).toBe("review"); expect(getMaterialRow(db, id2)?.status).toBe("pending"); }); }); test("start 启动后能正常 stop", () => { withProcessorDb((db) => { const processor = new MaterialProcessor(db, LOG); processor.start(100); processor.stop(); }); }); test("start 后定时器会推进 pending 素材到 review", async () => { await withProcessorDbAsync(async (db) => { const projectId = setupProject(db); const id = setupMaterial(db, projectId, { description: "定时扫描" }); const processor = new MaterialProcessor(db, LOG); processor.start(50); await new Promise((r) => setTimeout(r, 300)); processor.stop(); const row = getMaterialRow(db, id); expect(row?.status).toBe("review"); expect(row?.processedContent).toBe("定时扫描"); }); }); }); async function withProcessorDbAsync(callback: (db: Database) => Promise): Promise { const handle = createMigratedTestDatabase("processor-test-async"); try { await callback(handle.db); handle.close(); } finally { handle.cleanup(); } }