248 lines
7.4 KiB
TypeScript
248 lines
7.4 KiB
TypeScript
import type Database from "bun:sqlite";
|
|
|
|
import { describe, expect, test } from "bun:test";
|
|
|
|
import { and, eq } from "drizzle-orm";
|
|
|
|
import { notDeleted, wrap } from "../../../src/server/db/connection";
|
|
import { createMaterial } from "../../../src/server/db/materials";
|
|
import { createProject } from "../../../src/server/db/projects";
|
|
import { materials } from "../../../src/server/db/schema";
|
|
import { createNoopLogger } from "../../../src/server/logger";
|
|
import type { ProcessableMaterial } from "../../../src/server/processing/processor";
|
|
import { MaterialProcessor } from "../../../src/server/processing/processor";
|
|
|
|
import { createMigratedTestDatabase } from "../../helpers";
|
|
|
|
const LOG = createNoopLogger();
|
|
|
|
function withProcessorDb(callback: (db: Database) => void): void {
|
|
const handle = createMigratedTestDatabase("processor-test");
|
|
try {
|
|
callback(handle.db);
|
|
handle.close();
|
|
} finally {
|
|
handle.cleanup();
|
|
}
|
|
}
|
|
|
|
function setupProject(db: Database, name = "测试项目"): string {
|
|
const result = createProject(db, { name }, LOG);
|
|
if ("error" in result) throw new Error(result.error);
|
|
return result.project.id;
|
|
}
|
|
|
|
function setupMaterial(
|
|
db: Database,
|
|
projectId: string,
|
|
overrides: Partial<{
|
|
associatedDate: string;
|
|
description: string;
|
|
materialType: "general" | "meeting";
|
|
}> = {},
|
|
): string {
|
|
const result = createMaterial(
|
|
db,
|
|
projectId,
|
|
{
|
|
associatedDate: overrides.associatedDate ?? "2024-01-15",
|
|
description: overrides.description ?? "测试素材",
|
|
materialType: overrides.materialType,
|
|
},
|
|
LOG,
|
|
);
|
|
if ("error" in result) throw new Error(result.error);
|
|
return result.material.id;
|
|
}
|
|
|
|
function getMaterialRow(db: Database, materialId: string) {
|
|
return wrap(db)
|
|
.select()
|
|
.from(materials)
|
|
.where(and(eq(materials.id, materialId), notDeleted(materials)))
|
|
.get();
|
|
}
|
|
|
|
function setMaterialStatus(
|
|
db: Database,
|
|
materialId: string,
|
|
status: "approved" | "discarded" | "failed" | "pending" | "processing" | "review",
|
|
): void {
|
|
db.prepare("UPDATE materials SET status = ?, updated_at = ? WHERE id = ?").run(
|
|
status,
|
|
new Date().toISOString(),
|
|
materialId,
|
|
);
|
|
}
|
|
|
|
class FailingProcessor extends MaterialProcessor {
|
|
public attempts = 0;
|
|
public failUntilAttempt = Number.POSITIVE_INFINITY;
|
|
|
|
protected override async processOne(material: ProcessableMaterial): Promise<string> {
|
|
this.attempts += 1;
|
|
if (this.attempts <= this.failUntilAttempt) {
|
|
throw new Error(`mock failure ${this.attempts}`);
|
|
}
|
|
return super.processOne(material);
|
|
}
|
|
}
|
|
|
|
describe("素材处理器", () => {
|
|
test("recoverStuckMaterials 将 processing 状态恢复为 pending", () => {
|
|
withProcessorDb((db) => {
|
|
const projectId = setupProject(db);
|
|
const id1 = setupMaterial(db, projectId);
|
|
const id2 = setupMaterial(db, projectId);
|
|
|
|
setMaterialStatus(db, id1, "processing");
|
|
setMaterialStatus(db, id2, "processing");
|
|
|
|
const processor = new MaterialProcessor(db, LOG);
|
|
const recovered = processor.recoverStuckMaterials();
|
|
|
|
expect(recovered).toBe(2);
|
|
expect(getMaterialRow(db, id1)?.status).toBe("pending");
|
|
expect(getMaterialRow(db, id2)?.status).toBe("pending");
|
|
});
|
|
});
|
|
|
|
test("recoverStuckMaterials 无 processing 素材时返回 0", () => {
|
|
withProcessorDb((db) => {
|
|
const projectId = setupProject(db);
|
|
setupMaterial(db, projectId);
|
|
|
|
const processor = new MaterialProcessor(db, LOG);
|
|
const recovered = processor.recoverStuckMaterials();
|
|
|
|
expect(recovered).toBe(0);
|
|
});
|
|
});
|
|
|
|
test("processNext 将 pending 素材处理为 review 并写入 processedContent", async () => {
|
|
await withProcessorDbAsync(async (db) => {
|
|
const projectId = setupProject(db);
|
|
const id = setupMaterial(db, projectId, { description: "测试内容" });
|
|
|
|
const processor = new MaterialProcessor(db, LOG);
|
|
await processor.processNext();
|
|
|
|
const row = getMaterialRow(db, id);
|
|
expect(row?.status).toBe("review");
|
|
expect(row?.processedContent).toBe("测试内容");
|
|
});
|
|
});
|
|
|
|
test("processNext 根据 materialType 选择模板", async () => {
|
|
await withProcessorDbAsync(async (db) => {
|
|
const projectId = setupProject(db);
|
|
const id = setupMaterial(db, projectId, {
|
|
description: "会议内容",
|
|
materialType: "meeting",
|
|
});
|
|
|
|
const processor = new MaterialProcessor(db, LOG);
|
|
await processor.processNext();
|
|
|
|
const row = getMaterialRow(db, id);
|
|
expect(row?.status).toBe("review");
|
|
expect(row?.processedContent).toBe("会议内容");
|
|
});
|
|
});
|
|
|
|
test("processNext 重试机制:前 2 次失败,第 3 次成功", async () => {
|
|
await withProcessorDbAsync(async (db) => {
|
|
const projectId = setupProject(db);
|
|
const id = setupMaterial(db, projectId);
|
|
|
|
const processor = new FailingProcessor(db, LOG);
|
|
processor.failUntilAttempt = 2;
|
|
|
|
await processor.processNext();
|
|
|
|
expect(processor.attempts).toBe(3);
|
|
const row = getMaterialRow(db, id);
|
|
expect(row?.status).toBe("review");
|
|
expect(row?.processedContent).not.toBeNull();
|
|
});
|
|
});
|
|
|
|
test("processNext 3 次都失败后标记为 failed", async () => {
|
|
await withProcessorDbAsync(async (db) => {
|
|
const projectId = setupProject(db);
|
|
const id = setupMaterial(db, projectId);
|
|
|
|
const processor = new FailingProcessor(db, LOG);
|
|
processor.failUntilAttempt = Number.POSITIVE_INFINITY;
|
|
|
|
await processor.processNext();
|
|
|
|
expect(processor.attempts).toBe(3);
|
|
const row = getMaterialRow(db, id);
|
|
expect(row?.status).toBe("failed");
|
|
expect(row?.processedContent).toBeNull();
|
|
});
|
|
});
|
|
|
|
test("空队列时不报错", async () => {
|
|
await withProcessorDbAsync(async (db) => {
|
|
setupProject(db);
|
|
|
|
const processor = new MaterialProcessor(db, LOG);
|
|
await processor.processNext();
|
|
});
|
|
});
|
|
|
|
test("FIFO 顺序:先创建的先处理", async () => {
|
|
await withProcessorDbAsync(async (db) => {
|
|
const projectId = setupProject(db);
|
|
|
|
const id1 = setupMaterial(db, projectId, { description: "第一个" });
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const id2 = setupMaterial(db, projectId, { description: "第二个" });
|
|
|
|
const processor = new MaterialProcessor(db, LOG);
|
|
await processor.processNext();
|
|
|
|
expect(getMaterialRow(db, id1)?.status).toBe("review");
|
|
expect(getMaterialRow(db, id2)?.status).toBe("pending");
|
|
});
|
|
});
|
|
|
|
test("start 启动后能正常 stop", () => {
|
|
withProcessorDb((db) => {
|
|
const processor = new MaterialProcessor(db, LOG);
|
|
processor.start(100);
|
|
processor.stop();
|
|
});
|
|
});
|
|
|
|
test("start 后定时器会推进 pending 素材到 review", async () => {
|
|
await withProcessorDbAsync(async (db) => {
|
|
const projectId = setupProject(db);
|
|
const id = setupMaterial(db, projectId, { description: "定时扫描" });
|
|
|
|
const processor = new MaterialProcessor(db, LOG);
|
|
processor.start(50);
|
|
|
|
await new Promise((r) => setTimeout(r, 300));
|
|
|
|
processor.stop();
|
|
|
|
const row = getMaterialRow(db, id);
|
|
expect(row?.status).toBe("review");
|
|
expect(row?.processedContent).toBe("定时扫描");
|
|
});
|
|
});
|
|
});
|
|
|
|
async function withProcessorDbAsync(callback: (db: Database) => Promise<void>): Promise<void> {
|
|
const handle = createMigratedTestDatabase("processor-test-async");
|
|
try {
|
|
await callback(handle.db);
|
|
handle.close();
|
|
} finally {
|
|
handle.cleanup();
|
|
}
|
|
}
|