Files
Alfred/src/server/processing/processor.ts
lanyuanxiaoyao 12edf0b545 feat: 实现阶段二实体体系——AI预处理真实化+实体CRUD+审核归一化
- 新增 entities 数据表(含迁移)、Entity 类型、DAO 层完整 CRUD
- AI 预处理管道接入真实模型(generateText),输出结构化 JSON(摘要+规范化内容+候选实体)
- 模板接口重构为 {systemPrompt, buildUserPrompt, parseOutput},general/meeting 模板真实化
- 新增 5 个实体路由端点 + 实体管理前端页面(列表/详情/编辑弹窗)
- 审核面板增强:展示 AI 预处理结构化结果+候选实体归一化面板(合并/新建/选择/放弃)
- 素材通过时根据用户确认的候选实体写入 entities 表
- 工作台菜单新增"实体"入口
- 新增 entities DAO 测试(16)、processor 测试(11)、路由测试(8),服务端 367 测试全部通过
- TypeScript 0 错误
2026-06-08 18:49:30 +08:00

199 lines
6.0 KiB
TypeScript

import type Database from "bun:sqlite";
import { generateText } from "ai";
import type { MaterialType } from "../../shared/api";
import type { Logger } from "../logger";
import { and, asc, eq } from "drizzle-orm";
import { buildProviderRegistry } from "../ai/registry";
import { notDeleted, timestamp, wrap } from "../db/connection";
import { listEntityNames } from "../db/entities";
import { getModelWithProvider, listModels } from "../db/models";
import { materials } from "../db/schema";
import { getSettings } from "../db/settings";
import { getTemplate } from "./templates";
const MAX_RETRIES = 3;
const DEFAULT_INTERVAL_MS = 5000;
export interface ProcessableMaterial {
description: string;
id: string;
materialType: MaterialType;
projectId: string;
}
export class MaterialProcessor {
private readonly db: Database;
private readonly logger: Logger;
private timer: ReturnType<typeof setInterval> | null = null;
private running = false;
constructor(db: Database, logger: Logger) {
this.db = db;
this.logger = logger.child({ component: "material-processor" });
}
recoverStuckMaterials(): number {
const db = wrap(this.db);
const now = timestamp();
const restored = db
.update(materials)
.set({ status: "pending", updatedAt: now })
.where(and(eq(materials.status, "processing"), notDeleted(materials)))
.returning({ id: materials.id })
.all();
const count = restored.length;
if (count > 0) {
this.logger.info({ count }, "恢复卡住的素材到 pending 状态");
}
return count;
}
start(intervalMs: number = DEFAULT_INTERVAL_MS): void {
const recovered = this.recoverStuckMaterials();
this.logger.info({ intervalMs, recovered }, "素材处理器启动");
this.timer = setInterval(() => {
void this.tick();
}, intervalMs);
}
stop(): void {
if (this.timer !== null) {
clearInterval(this.timer);
this.timer = null;
}
this.running = false;
this.logger.info("素材处理器停止");
}
private async tick(): Promise<void> {
if (this.running) {
this.logger.debug("上一轮处理尚未完成,跳过本次扫描");
return;
}
this.running = true;
try {
await this.processNext();
} catch (error: unknown) {
this.logger.error({ error: error instanceof Error ? error.message : String(error) }, "处理过程中发生未捕获错误");
} finally {
this.running = false;
}
}
async processNext(): Promise<void> {
const db = wrap(this.db);
const row = db
.select()
.from(materials)
.where(and(eq(materials.status, "pending"), notDeleted(materials)))
.orderBy(asc(materials.createdAt))
.limit(1)
.get();
if (!row) {
this.logger.debug("无待处理素材");
return;
}
const processingAt = timestamp();
db.update(materials).set({ status: "processing", updatedAt: processingAt }).where(eq(materials.id, row.id)).run();
const material: ProcessableMaterial = {
description: row.description,
id: row.id,
materialType: row.materialType as MaterialType,
projectId: row.projectId,
};
let lastError: unknown;
let success = false;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const result = await this.processOne(material);
const finishedAt = timestamp();
db.update(materials)
.set({ processedContent: result, status: "review", updatedAt: finishedAt })
.where(eq(materials.id, row.id))
.run();
this.logger.info({ attempt, materialId: row.id }, "素材处理成功");
success = true;
break;
} catch (error: unknown) {
lastError = error;
this.logger.warn(
{
attempt,
error: error instanceof Error ? error.message : String(error),
materialId: row.id,
},
`素材处理第 ${attempt} 次失败`,
);
}
}
if (!success) {
const failedAt = timestamp();
db.update(materials).set({ status: "failed", updatedAt: failedAt }).where(eq(materials.id, row.id)).run();
this.logger.warn(
{
error: lastError instanceof Error ? lastError.message : String(lastError),
materialId: row.id,
},
`素材处理 ${MAX_RETRIES} 次均失败,标记为 failed`,
);
}
}
protected async processOne(material: ProcessableMaterial): Promise<string> {
const modelInfo = getDefaultTextModel(this.db);
if (!modelInfo) {
throw new Error("没有可用的文本模型,请在设置中配置默认模型或添加至少一个模型");
}
const registry = buildProviderRegistry(this.db);
const model = registry.languageModel(`${modelInfo.providerId}:${modelInfo.externalId}`);
const existingEntities = listEntityNames(this.db, material.projectId);
const template = getTemplate(material.materialType);
const userPrompt = template.buildUserPrompt(material.description, existingEntities);
const result = await generateText({
model,
prompt: userPrompt,
system: template.systemPrompt,
});
const processingResult = template.parseOutput(result.text);
return JSON.stringify(processingResult);
}
}
function getDefaultTextModel(db: Database): { externalId: string; providerId: string } | null {
try {
const settings = getSettings(db);
if (settings.defaultModels?.text) {
const result = getModelWithProvider(db, settings.defaultModels.text);
if (!("error" in result)) {
return { externalId: result.model.externalId, providerId: result.provider.id };
}
}
} catch {
// settings 不存在或解析失败,使用 fallback
}
const fallback = listModels(db, { page: 1, pageSize: 1 });
const firstModel = fallback.items[0];
if (!firstModel) return null;
const result = getModelWithProvider(db, firstModel.id);
if ("error" in result) return null;
return { externalId: result.model.externalId, providerId: result.provider.id };
}