feat: 素材处理管线——自动处理、审核流程、6状态机

This commit is contained in:
2026-06-07 22:50:05 +08:00
parent a389888eb4
commit 90fdb44b20
30 changed files with 1452 additions and 55 deletions

View File

@@ -0,0 +1,151 @@
import type Database from "bun:sqlite";
import type { MaterialType } from "../../shared/api";
import type { Logger } from "../logger";
import { and, asc, eq } from "drizzle-orm";
import { notDeleted, timestamp, wrap } from "../db/connection";
import { materials } from "../db/schema";
import { getTemplate } from "./templates";
const MAX_RETRIES = 3;
const DEFAULT_INTERVAL_MS = 5000;
export interface ProcessableMaterial {
description: string;
id: string;
materialType: MaterialType;
}
export class MaterialProcessor {
private readonly db: Database;
private readonly logger: Logger;
private timer: ReturnType<typeof setInterval> | null = null;
private running = false;
constructor(db: Database, logger: Logger) {
this.db = db;
this.logger = logger.child({ component: "material-processor" });
}
recoverStuckMaterials(): number {
const db = wrap(this.db);
const now = timestamp();
const restored = db
.update(materials)
.set({ status: "pending", updatedAt: now })
.where(and(eq(materials.status, "processing"), notDeleted(materials)))
.returning({ id: materials.id })
.all();
const count = restored.length;
if (count > 0) {
this.logger.info({ count }, "恢复卡住的素材到 pending 状态");
}
return count;
}
start(intervalMs: number = DEFAULT_INTERVAL_MS): void {
const recovered = this.recoverStuckMaterials();
this.logger.info({ intervalMs, recovered }, "素材处理器启动");
this.timer = setInterval(() => {
void this.tick();
}, intervalMs);
}
stop(): void {
if (this.timer !== null) {
clearInterval(this.timer);
this.timer = null;
}
this.running = false;
this.logger.info("素材处理器停止");
}
private async tick(): Promise<void> {
if (this.running) {
this.logger.debug("上一轮处理尚未完成,跳过本次扫描");
return;
}
this.running = true;
try {
await this.processNext();
} catch (error: unknown) {
this.logger.error({ error: error instanceof Error ? error.message : String(error) }, "处理过程中发生未捕获错误");
} finally {
this.running = false;
}
}
async processNext(): Promise<void> {
const db = wrap(this.db);
const row = db
.select()
.from(materials)
.where(and(eq(materials.status, "pending"), notDeleted(materials)))
.orderBy(asc(materials.createdAt))
.limit(1)
.get();
if (!row) {
this.logger.debug("无待处理素材");
return;
}
const processingAt = timestamp();
db.update(materials).set({ status: "processing", updatedAt: processingAt }).where(eq(materials.id, row.id)).run();
const material: ProcessableMaterial = {
description: row.description,
id: row.id,
materialType: row.materialType as MaterialType,
};
let lastError: unknown;
let success = false;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const result = await this.processOne(material);
const finishedAt = timestamp();
db.update(materials)
.set({ processedContent: result, status: "review", updatedAt: finishedAt })
.where(eq(materials.id, row.id))
.run();
this.logger.info({ attempt, materialId: row.id }, "素材处理成功");
success = true;
break;
} catch (error: unknown) {
lastError = error;
this.logger.warn(
{
attempt,
error: error instanceof Error ? error.message : String(error),
materialId: row.id,
},
`素材处理第 ${attempt} 次失败`,
);
}
}
if (!success) {
const failedAt = timestamp();
db.update(materials).set({ status: "failed", updatedAt: failedAt }).where(eq(materials.id, row.id)).run();
this.logger.warn(
{
error: lastError instanceof Error ? lastError.message : String(lastError),
materialId: row.id,
},
`素材处理 ${MAX_RETRIES} 次均失败,标记为 failed`,
);
}
}
protected processOne(material: ProcessableMaterial): Promise<string> {
const template = getTemplate(material.materialType);
// TODO: 替换为真实 AI Agent 调用
return Promise.resolve(template.outputTemplate.replace("{description}", material.description));
}
}