feat(graphrag): 新增多轮检索流式问答与用户管理界面

- 新增多轮 GraphRAG 检索功能,支持流式进度输出(SSE)
- 新增用户管理界面,可查看所有用户图谱统计并快速导航
- 新增多 Agent 任务拆解与执行服务,支持复杂任务协作处理
- 改进 embedding 和 rerank 服务的容错机制,支持备用模型和端点
- 更新前端样式遵循 Acmetone 设计规范,优化视觉一致性
- 新增流式分析接口,支持并行处理和专家评审选项
This commit is contained in:
KOSHM-Pig
2026-03-24 12:04:28 +08:00
parent adabd63769
commit 1f087599c5
16 changed files with 4955 additions and 452 deletions

View File

@@ -17,18 +17,97 @@ const sendServiceResult = async (reply, action) => {
/**
* GraphRAG 控制器:负责请求转发与响应封装。
*/
export const createGraphRagController = (service) => ({
export const createGraphRagController = (service, multiAgentService) => ({
health: async (_request, reply) => reply.send({ ok: true }),
ready: async (_request, reply) => sendServiceResult(reply, () => service.ready()),
bootstrap: async (_request, reply) => sendServiceResult(reply, () => service.bootstrap()),
listUsers: async (request, reply) => sendServiceResult(reply, () => service.listUsers(request.query.limit || 200)),
getGraphStats: async (request, reply) => sendServiceResult(reply, () => service.getGraphStats(request.query.userId || 'default')),
ingest: async (request, reply) => sendServiceResult(reply, () => service.ingest(request.body)),
queryTimeline: async (request, reply) =>
sendServiceResult(reply, () => service.queryTimeline(request.body)),
queryGraphRag: async (request, reply) =>
sendServiceResult(reply, () => service.queryGraphRag(request.body)),
queryGraphRagMultiRound: async (request, reply) =>
sendServiceResult(reply, () => service.queryGraphRagMultiRound(request.body)),
queryGraphRagMultiRoundStream: async (request, reply) => {
reply.hijack();
const raw = reply.raw;
const requestOrigin = request.headers.origin;
raw.setHeader('Access-Control-Allow-Origin', requestOrigin || '*');
raw.setHeader('Vary', 'Origin');
raw.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization');
raw.setHeader('Access-Control-Allow-Methods', 'POST, OPTIONS');
raw.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
raw.setHeader('Cache-Control', 'no-cache, no-transform');
raw.setHeader('Connection', 'keep-alive');
raw.setHeader('X-Accel-Buffering', 'no');
raw.flushHeaders?.();
const push = (event, data) => {
raw.write(`event: ${event}\n`);
raw.write(`data: ${JSON.stringify(data)}\n\n`);
};
try {
const result = await service.queryGraphRagMultiRound(request.body, {
onProgress: (event) => push('progress', event)
});
push('done', result);
} catch (error) {
push('error', {
ok: false,
statusCode: Number(error?.statusCode) || 500,
message: error?.message || 'internal error'
});
} finally {
raw.end();
}
},
analyzeAndIngest: async (request, reply) =>
sendServiceResult(reply, () => service.incrementalUpdate(request.body.text, request.body.userId || 'default')),
reindexUserVectors: async (request, reply) =>
sendServiceResult(reply, () => service.reindexUserVectors({
userId: request.body.userId || 'default',
limit: request.body.limit
})),
analyzeAndIngestStream: async (request, reply) => {
reply.hijack();
const raw = reply.raw;
const requestOrigin = request.headers.origin;
raw.setHeader('Access-Control-Allow-Origin', requestOrigin || '*');
raw.setHeader('Vary', 'Origin');
raw.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization');
raw.setHeader('Access-Control-Allow-Methods', 'POST, OPTIONS');
raw.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
raw.setHeader('Cache-Control', 'no-cache, no-transform');
raw.setHeader('Connection', 'keep-alive');
raw.setHeader('X-Accel-Buffering', 'no');
raw.flushHeaders?.();
const push = (event, data) => {
raw.write(`event: ${event}\n`);
raw.write(`data: ${JSON.stringify(data)}\n\n`);
};
try {
push('progress', { stage: 'start' });
const result = await service.incrementalUpdate(
request.body.text,
request.body.userId || 'default',
{
parallelism: request.body.parallelism,
expertReview: request.body.expertReview,
onProgress: (event) => push('progress', event)
}
);
push('done', result);
} catch (error) {
push('error', {
ok: false,
statusCode: Number(error?.statusCode) || 500,
message: error?.message || 'internal error'
});
} finally {
raw.end();
}
},
queryHistory: async (request, reply) =>
sendServiceResult(reply, () => service.queryRelationshipHistory(
request.body.userId || 'default',
@@ -36,5 +115,9 @@ export const createGraphRagController = (service) => ({
request.body.limit || 20
)),
getAdvice: async (request, reply) =>
sendServiceResult(reply, () => service.getRelationshipAdvice(request.body.userId || 'default'))
sendServiceResult(reply, () => service.getRelationshipAdvice(request.body.userId || 'default')),
decomposeMultiAgentTask: async (request, reply) =>
sendServiceResult(reply, () => multiAgentService.decomposeTask(request.body)),
executeMultiAgentTask: async (request, reply) =>
sendServiceResult(reply, () => multiAgentService.executeTaskWorkflow(request.body))
});

View File

@@ -48,6 +48,24 @@ export const registerGraphRagRoutes = async (app, controller) => {
* description: 初始化成功
*/
app.post("/bootstrap", controller.bootstrap);
/**
* @openapi
* /users:
* get:
* tags:
* - GraphRAG
* summary: 获取用户管理列表(含各用户图谱统计)
* parameters:
* - in: query
* name: limit
* schema:
* type: integer
* required: false
* responses:
* 200:
* description: 用户列表
*/
app.get("/users", controller.listUsers);
/**
* @openapi
* /graph/stats:
@@ -244,7 +262,97 @@ export const registerGraphRagRoutes = async (app, controller) => {
* description: 参数错误
*/
app.post("/query/graphrag", controller.queryGraphRag);
/**
* @openapi
* /query/graphrag/multi:
* post:
* tags:
* - GraphRAG
* summary: 多轮 GraphRAG 检索 + 关系整理 + 终审判别
* requestBody:
* required: true
* content:
* application/json:
* schema:
* type: object
* properties:
* userId:
* type: string
* query_text:
* type: string
* top_k:
* type: integer
* timeline_limit:
* type: integer
* max_rounds:
* type: integer
* responses:
* 200:
* description: 查询成功
* 400:
* description: 参数错误
*/
app.post("/query/graphrag/multi", controller.queryGraphRagMultiRound);
/**
* @openapi
* /query/graphrag/multi/stream:
* post:
* tags:
* - GraphRAG
* summary: 多轮 GraphRAG 流式过程输出SSE
* requestBody:
* required: true
* content:
* application/json:
* schema:
* type: object
* properties:
* userId:
* type: string
* query_text:
* type: string
* top_k:
* type: integer
* timeline_limit:
* type: integer
* max_rounds:
* type: integer
* responses:
* 200:
* description: 流式输出 progress/done/error 事件
*/
app.post("/query/graphrag/multi/stream", controller.queryGraphRagMultiRoundStream);
app.post("/analyze", controller.analyzeAndIngest);
app.post("/vectors/reindex", controller.reindexUserVectors);
/**
* @openapi
* /analyze/stream:
* post:
* tags:
* - GraphRAG
* summary: 流式分析并增量入图SSE
* requestBody:
* required: true
* content:
* application/json:
* schema:
* type: object
* properties:
* text:
* type: string
* userId:
* type: string
* parallelism:
* type: integer
* expertReview:
* type: boolean
* responses:
* 200:
* description: 流式输出 progress/done/error 事件
* 400:
* description: 参数错误
*/
app.post("/analyze/stream", controller.analyzeAndIngestStream);
/**
* @openapi
@@ -294,4 +402,70 @@ export const registerGraphRagRoutes = async (app, controller) => {
* description: 建议生成成功
*/
app.post("/query/advice", controller.getAdvice);
/**
* @openapi
* /multi-agent/decompose:
* post:
* tags:
* - GraphRAG
* summary: 使用多 Agent 方式拆解复杂任务
* requestBody:
* required: true
* content:
* application/json:
* schema:
* type: object
* required:
* - task
* properties:
* task:
* type: string
* context:
* type: string
* constraints:
* type: array
* items:
* type: string
* max_agents:
* type: integer
* responses:
* 200:
* description: 拆解成功
* 400:
* description: 参数错误
*/
app.post("/multi-agent/decompose", controller.decomposeMultiAgentTask);
/**
* @openapi
* /multi-agent/execute:
* post:
* tags:
* - GraphRAG
* summary: 执行多 Agent 协作流程(自动拆解并串行执行)
* requestBody:
* required: true
* content:
* application/json:
* schema:
* type: object
* properties:
* task:
* type: string
* context:
* type: string
* constraints:
* type: array
* items:
* type: string
* max_agents:
* type: integer
* plan:
* type: object
* responses:
* 200:
* description: 执行成功
* 400:
* description: 参数错误
*/
app.post("/multi-agent/execute", controller.executeMultiAgentTask);
};

View File

@@ -5,7 +5,7 @@ import swaggerUi from "@fastify/swagger-ui";
import { env } from "./config/env.js";
import { createClients } from "./config/clients.js";
import { createSwaggerSpec } from "./config/swagger.js";
import { EmbeddingService, RerankService, GraphRagService, LLMService } from "./services/index.js";
import { EmbeddingService, RerankService, GraphRagService, LLMService, MultiAgentService } from "./services/index.js";
import { createGraphRagController } from "./controllers/index.js";
import { registerRoutes } from "./routes/index.js";
@@ -42,7 +42,11 @@ export const createServer = async () => {
llmService,
env
});
const controller = createGraphRagController(service);
const multiAgentService = new MultiAgentService({
llmService,
logger: app.log
});
const controller = createGraphRagController(service, multiAgentService);
await registerRoutes(app, controller, env);

View File

@@ -10,6 +10,10 @@ export class EmbeddingService {
this.apiKey = env.EMBEDDING_API_KEY ?? "";
this.model = env.EMBEDDING_MODEL ?? "";
this.dimension = env.EMBEDDING_DIM;
this.fallbackModels = ["text-embedding-v3", "text-embedding-v2"];
this.endpoint = this.baseUrl.endsWith("/v1")
? `${this.baseUrl}/embeddings`
: `${this.baseUrl}/v1/embeddings`;
}
isEnabled() {
@@ -26,28 +30,41 @@ export class EmbeddingService {
throw createHttpError(400, "embedding 输入文本不能为空");
}
const response = await fetch(`${this.baseUrl}/v1/embeddings`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${this.apiKey}`
},
body: JSON.stringify({
model: this.model,
input: cleaned
})
});
const candidates = [this.model, ...this.fallbackModels]
.map((item) => String(item || "").trim())
.filter((item, index, arr) => item && arr.indexOf(item) === index);
let lastErrorText = "";
for (const model of candidates) {
const response = await fetch(this.endpoint, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${this.apiKey}`
},
body: JSON.stringify({
model,
input: cleaned
})
});
if (!response.ok) {
const errorText = await response.text();
throw createHttpError(response.status, `embedding 请求失败: ${errorText}`);
}
if (!response.ok) {
const errorText = await response.text();
lastErrorText = errorText || lastErrorText;
const maybeModelIssue = (response.status === 400 || response.status === 404) && /model_not_supported|unsupported model/i.test(errorText || "");
if (maybeModelIssue) continue;
throw createHttpError(response.status, `embedding 请求失败: ${errorText}`);
}
const data = await response.json();
const vector = data?.data?.[0]?.embedding;
if (!Array.isArray(vector) || vector.length !== this.dimension) {
throw createHttpError(400, `embedding 维度异常,期望 ${this.dimension}`);
const data = await response.json();
const vector = data?.data?.[0]?.embedding;
if (!Array.isArray(vector) || vector.length !== this.dimension) {
throw createHttpError(400, `embedding 维度异常,期望 ${this.dimension}`);
}
if (model !== this.model) {
this.model = model;
}
return vector;
}
return vector;
throw createHttpError(400, `embedding 请求失败: ${lastErrorText}`);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -2,3 +2,4 @@ export { EmbeddingService } from "./embedding.service.js";
export { RerankService } from "./rerank.service.js";
export { GraphRagService } from "./graphrag.service.js";
export { LLMService } from "./llm.service.js";
export { MultiAgentService } from "./multiagent.service.js";

View File

@@ -4,6 +4,41 @@ const createHttpError = (statusCode, message) => {
return error;
};
const estimateTokens = (text) => {
if (!text) return 0;
const str = String(text);
return Math.ceil(str.length / 2);
};
const estimateMessageTokens = (messages = []) => {
return messages.reduce((sum, message) => {
return sum + estimateTokens(message?.content || "") + 6;
}, 0);
};
const parseJsonObject = (content) => {
if (!content || typeof content !== "string") {
throw new Error("empty content");
}
const direct = content.trim();
try {
return JSON.parse(direct);
} catch {}
const codeBlockMatch = direct.match(/```json\s*([\s\S]*?)```/i) || direct.match(/```\s*([\s\S]*?)```/i);
if (codeBlockMatch?.[1]) {
try {
return JSON.parse(codeBlockMatch[1].trim());
} catch {}
}
const start = direct.indexOf("{");
const end = direct.lastIndexOf("}");
if (start >= 0 && end > start) {
const candidate = direct.slice(start, end + 1);
return JSON.parse(candidate);
}
throw new Error("json not found");
};
/**
* 文本预处理
*/
@@ -44,12 +79,179 @@ const splitTextIntoChunks = (text, chunkSize = 500, overlap = 50) => {
}
chunks.push(text.slice(start, chunkEnd).trim());
start = chunkEnd - overlap;
if (chunkEnd >= text.length) {
break;
}
const nextStart = chunkEnd - overlap;
start = nextStart > start ? nextStart : chunkEnd;
}
return chunks;
};
const ensureAnalysisShape = (parsed = {}) => {
if (!Array.isArray(parsed.persons)) parsed.persons = [];
if (!Array.isArray(parsed.organizations)) parsed.organizations = [];
if (!Array.isArray(parsed.events)) parsed.events = [];
if (!Array.isArray(parsed.topics)) parsed.topics = [];
if (!Array.isArray(parsed.relations)) parsed.relations = [];
return parsed;
};
const mergeByKey = (items, keyBuilder) => {
const map = new Map();
for (const item of items || []) {
const key = keyBuilder(item);
if (!key) continue;
if (!map.has(key)) {
map.set(key, item);
continue;
}
const oldValue = map.get(key) || {};
map.set(key, { ...oldValue, ...item });
}
return [...map.values()];
};
const mergeAnalyses = (analyses = []) => {
const normalized = analyses.map((a) => ensureAnalysisShape(a || {}));
const persons = mergeByKey(
normalized.flatMap((a) => a.persons || []),
(item) => item?.id || item?.name
);
const organizations = mergeByKey(
normalized.flatMap((a) => a.organizations || []),
(item) => item?.id || item?.name
);
const events = mergeByKey(
normalized.flatMap((a) => a.events || []),
(item) => item?.id || `${item?.type || ""}|${item?.summary || ""}|${item?.occurred_at || ""}`
);
const topics = mergeByKey(
normalized.flatMap((a) => a.topics || []),
(item) => item?.id || item?.name
);
const relations = mergeByKey(
normalized.flatMap((a) => a.relations || []),
(item) => `${item?.source || ""}|${item?.target || ""}|${item?.type || ""}|${item?.summary || ""}`
);
return { persons, organizations, events, topics, relations };
};
const runWithConcurrency = async (items, concurrency, worker) => {
const list = Array.isArray(items) ? items : [];
const results = new Array(list.length);
const maxConcurrency = Math.min(Math.max(Number(concurrency) || 1, 1), Math.max(list.length, 1));
let cursor = 0;
const workers = Array.from({ length: maxConcurrency }, async () => {
while (true) {
const index = cursor;
cursor += 1;
if (index >= list.length) {
break;
}
results[index] = await worker(list[index], index);
}
});
await Promise.all(workers);
return results;
};
const normalizeTextForMatch = (value) => String(value || "").toLowerCase().trim();
const compactSummaryText = (value, maxLen = 80) => {
const raw = String(value || "").trim();
if (!raw) return "";
const parts = raw
.split(/[|;。]/)
.map((p) => p.trim())
.filter(Boolean);
const seen = new Set();
const uniqueParts = [];
for (const part of parts) {
const key = normalizeTextForMatch(part);
if (!key || seen.has(key)) continue;
seen.add(key);
uniqueParts.push(part);
if (uniqueParts.length >= 2) break;
}
const merged = (uniqueParts.length > 0 ? uniqueParts.join("") : raw).trim();
return merged.length > maxLen ? `${merged.slice(0, maxLen - 1)}` : merged;
};
const extractMatchTokens = (text) => {
const raw = String(text || "");
const zh = raw.match(/[\u4e00-\u9fa5]{2,}/g) || [];
const en = raw.match(/[a-zA-Z][a-zA-Z0-9_-]{2,}/g) || [];
const set = new Set();
for (const token of [...zh, ...en]) {
const clean = normalizeTextForMatch(token);
if (!clean || clean.length < 2) continue;
set.add(clean);
if (set.size >= 24) break;
}
return [...set];
};
const buildExistingContext = (text, existingEntities = {}) => {
const persons = Array.isArray(existingEntities?.persons) ? existingEntities.persons : [];
const organizations = Array.isArray(existingEntities?.organizations) ? existingEntities.organizations : [];
if (persons.length === 0 && organizations.length === 0) return "";
const rawInput = String(text || "");
const normalizedInput = normalizeTextForMatch(rawInput);
const tokens = extractMatchTokens(rawInput);
const rankEntities = (list, type) => {
const dedup = new Map();
for (const item of list || []) {
const name = String(item?.name || "").trim();
if (!name) continue;
const dedupKey = `${type}:${normalizeTextForMatch(name)}`;
const summary = compactSummaryText(item?.summary || "");
const id = String(item?.id || "").trim();
const nameNorm = normalizeTextForMatch(name);
const coreHit = id === "user" || id === "partner" || /__user$|__partner$/i.test(id);
const nameHit = nameNorm && normalizedInput.includes(nameNorm);
let tokenHitCount = 0;
if (summary) {
const summaryNorm = normalizeTextForMatch(summary);
for (const t of tokens) {
if (summaryNorm.includes(t)) tokenHitCount += 1;
if (tokenHitCount >= 3) break;
}
}
const score = (coreHit ? 100 : 0) + (nameHit ? 60 : 0) + tokenHitCount * 5;
const current = dedup.get(dedupKey);
const next = { id, name, summary, score, nameHit, coreHit };
if (!current || next.score > current.score) dedup.set(dedupKey, next);
}
return [...dedup.values()].sort((a, b) => b.score - a.score || a.name.length - b.name.length);
};
const personTop = rankEntities(persons, "person").slice(0, 12);
const orgTop = rankEntities(organizations, "organization").slice(0, 8);
const matchedPersons = personTop.filter((p) => p.nameHit || p.coreHit).length;
const matchedOrgs = orgTop.filter((o) => o.nameHit || o.coreHit).length;
return `
## 已有实体列表(命中优先 + 精简)
**若文本提到下列人物/组织,请复用 ID不要新建同名实体。**
输入命中:人物 ${matchedPersons}/${personTop.length},组织 ${matchedOrgs}/${orgTop.length}
已有的人物:
${personTop.map((p) => `- ID: "${p.id}", 名字:"${p.name}", 摘要:${p.summary || "无"}`).join('\n')}
已有的组织:
${orgTop.map((o) => `- ID: "${o.id}", 名字:"${o.name}", 摘要:${o.summary || "无"}`).join('\n')}
代词解析:
- "我" 优先映射 user
- "女朋友/男朋友/对象/她/他" 优先映射 partner若上下文一致
- 出现同名实体时必须复用以上 ID`;
};
export class LLMService {
constructor(env) {
this.baseUrl = (env.LLM_BASE_URL ?? "").replace(/\/+$/, "");
@@ -66,6 +268,9 @@ export class LLMService {
throw createHttpError(400, "LLM 服务未配置,请提供 LLM_BASE_URL/LLM_API_KEY/LLM_MODEL_NAME");
}
const promptTokensEstimated = estimateMessageTokens(messages);
console.log(`[TOKEN] model=${this.model} prompt_tokens_estimated=${promptTokensEstimated} max_tokens=4096`);
const response = await fetch(`${this.baseUrl}/chat/completions`, {
method: "POST",
headers: {
@@ -86,38 +291,17 @@ export class LLMService {
}
const data = await response.json();
const usage = data?.usage || {};
const promptTokens = usage.prompt_tokens;
const completionTokens = usage.completion_tokens;
const totalTokens = usage.total_tokens;
console.log(
`[TOKEN] model=${this.model} prompt_tokens_actual=${promptTokens ?? "n/a"} completion_tokens=${completionTokens ?? "n/a"} total_tokens=${totalTokens ?? "n/a"}`
);
return data;
}
/**
* 分析文本并提取详细的实体和关系MiroFish 风格)
* @param {string} text - 用户输入的文本
* @param {object} existingEntities - 现有实体列表(用于识别是否已存在)
*/
async analyzeText(text, existingEntities = {}) {
if (!text?.trim()) {
throw createHttpError(400, "分析文本不能为空");
}
const existingContext = existingEntities.persons?.length > 0 || existingEntities.organizations?.length > 0
? `
## 已有实体列表(极其重要!)
**如果文本中提到的人/组织已存在于下方列表中,必须复用相同的 ID不要创建新实体**
已有的人物:
${(existingEntities.persons || []).map(p => `- ID: "${p.id}", 名字:"${p.name}", 描述:${p.summary}`).join('\n')}
已有的组织:
${(existingEntities.organizations || []).map(o => `- ID: "${o.id}", 名字:"${o.name}", 描述:${o.summary}`).join('\n')}
**代词解析指南**:
- "我女朋友" = 已有实体中的"女朋友"(如果有)
- "她" = 根据上下文推断指代哪个女性角色
- "他" = 根据上下文推断指代哪个男性角色
- "丽丽" = 如果已有实体中有"丽丽",复用 ID`
: '';
async _analyzeTextSinglePass(text, existingContext) {
const systemPrompt = `你是一个恋爱关系知识图谱构建专家。从用户输入的文本中提取实体和关系,用于后续的恋爱决策建议。
## 核心原则
@@ -125,6 +309,7 @@ ${(existingEntities.organizations || []).map(o => `- ID: "${o.id}", 名字:"${
2. **其他人物**:朋友、闺蜜、同事等只需要记录基本信息(名字、与用户的关系)
3. **事件细节**:记录争吵、约会、礼物、重要对话等影响关系的事件
4. **情感线索**:提取情绪变化、态度、期望等软性信息
5. **学校组织**:出现大学/学院/学校名称时,必须抽取为 organizations 节点
## 输出格式(严格 JSON
{
@@ -133,7 +318,15 @@ ${(existingEntities.organizations || []).map(o => `- ID: "${o.id}", 名字:"${
"id": "p1",
"name": "人物名称",
"summary": "人物描述",
"role": "用户 | 恋爱对象 | 朋友 | 家人 | 同事 | 其他"
"role": "用户 | 恋爱对象 | 朋友 | 家人 | 同事 | 其他",
"gender": "male | female | unknown"
}
],
"organizations": [
{
"id": "o1",
"name": "组织名称(公司/学校/大学)",
"summary": "组织描述"
}
],
"events": [
@@ -175,6 +368,10 @@ ${(existingEntities.organizations || []).map(o => `- ID: "${o.id}", 名字:"${
4. **实体去重**:如果文本中提到的人已存在于"已有实体"列表中,**复用相同的 ID**
5. **时间标准化**occurred_at 使用 ISO 格式
6. **情感标注**emotional_tone 标注事件的情感倾向positive/neutral/negative
7. 数量控制persons 最多 8 个organizations 最多 8 个events 最多 8 个topics 最多 8 个relations 最多 16 个
8. 长度控制persons.summary 不超过 100 字events.summary 不超过 120 字relations.summary 不超过 80 字
9. 人物性别person.gender 只允许 male/female/unknown无法确认时填 unknown
10. 若文本出现“某人毕业于/就读于/来自某大学”,需创建该大学 organization并补充关系如 STUDIED_AT/ALUMNUS_OF
只返回 JSON不要有其他文字。`;
@@ -182,10 +379,9 @@ ${(existingEntities.organizations || []).map(o => `- ID: "${o.id}", 名字:"${
{ role: "system", content: systemPrompt },
{ role: "user", content: `${existingContext}\n\n## 待分析文本\n${text}` }
];
console.log("[DEBUG] LLM request messages:", JSON.stringify(messages));
const result = await this.chat(messages, 0.3);
const content = result?.choices?.[0]?.message?.content;
console.log("[DEBUG] LLM raw response:", content);
if (!content) {
@@ -194,18 +390,270 @@ ${(existingEntities.organizations || []).map(o => `- ID: "${o.id}", 名字:"${
let parsed;
try {
const jsonMatch = content.match(/\{[\s\S]*\}/);
parsed = jsonMatch ? JSON.parse(jsonMatch[0]) : JSON.parse(content);
parsed = parseJsonObject(content);
} catch (e) {
throw createHttpError(500, `LLM 返回格式错误:${content.substring(0, 200)}`);
const repaired = await this.chat([
{ role: "system", content: "你是 JSON 修复器。你会把不完整或格式错误的 JSON 修复为合法 JSON。只输出 JSON不要输出其他内容。" },
{
role: "user",
content: `请把下面内容修复为合法 JSON必须包含 persons/organizations/events/topics/relations 五个数组字段,缺失就补空数组:\n\n${content}`
}
], 0.1);
const repairedContent = repaired?.choices?.[0]?.message?.content;
try {
parsed = parseJsonObject(repairedContent);
} catch {
throw createHttpError(500, `LLM 返回格式错误:${content.substring(0, 200)}`);
}
}
return ensureAnalysisShape(parsed);
}
async _expertReviewMergedAnalysis(merged, existingContext) {
const systemPrompt = `你是知识图谱质检专家。你会对已合并的抽取结果进行去重、补全与标准化,只返回 JSON。
输出格式:
{
"persons": [],
"organizations": [],
"events": [],
"topics": [],
"relations": []
}
规则:
1) 保持与输入语义一致,不凭空捏造
2) persons 最多 12organizations 最多 12events 最多 12topics 最多 10relations 最多 24
3) summary 保持简洁
4) 必须返回五个数组字段`;
const reviewed = await this.chat([
{ role: "system", content: systemPrompt },
{
role: "user",
content: `${existingContext}\n\n请审查并标准化下面的合并结果:\n${JSON.stringify(ensureAnalysisShape(merged), null, 2)}`
}
], 0.2);
const reviewedContent = reviewed?.choices?.[0]?.message?.content;
return ensureAnalysisShape(parseJsonObject(reviewedContent || ""));
}
/**
* 分析文本并提取详细的实体和关系MiroFish 风格)
* @param {string} text - 用户输入的文本
* @param {object} existingEntities - 现有实体列表(用于识别是否已存在)
*/
async analyzeText(text, existingEntities = {}, options = {}) {
if (!text?.trim()) {
throw createHttpError(400, "分析文本不能为空");
}
return parsed;
const normalizedText = preprocessText(text);
const existingContext = buildExistingContext(normalizedText, existingEntities);
const onProgress = typeof options?.onProgress === "function" ? options.onProgress : () => {};
const parallelism = Math.min(Math.max(Number(options?.parallelism || 3), 1), 6);
const expertReviewEnabled = options?.expertReview !== false;
const estimatedInputTokens = estimateTokens(normalizedText) + estimateTokens(existingContext);
const shouldChunk = normalizedText.length > 1600 || estimatedInputTokens > 2200;
if (!shouldChunk) {
onProgress({ stage: "single_pass_start", estimated_tokens: estimatedInputTokens });
return await this._analyzeTextSinglePass(normalizedText, existingContext);
}
const chunks = splitTextIntoChunks(normalizedText, 1100, 120).filter(Boolean);
console.log(`[TOKEN] long_text_detected=1 chunks=${chunks.length} estimated_input_tokens=${estimatedInputTokens}`);
onProgress({
stage: "chunking",
chunk_count: chunks.length,
estimated_tokens: estimatedInputTokens,
parallelism
});
const analyses = await runWithConcurrency(chunks, parallelism, async (chunk, index) => {
const chunkTokens = estimateTokens(chunk);
onProgress({
stage: "chunk_start",
chunk_index: index + 1,
chunk_count: chunks.length,
chunk_tokens_estimated: chunkTokens
});
console.log(`[TOKEN] chunk_index=${index + 1}/${chunks.length} chunk_tokens_estimated=${chunkTokens}`);
try {
const chunkAnalysis = await this._analyzeTextSinglePass(chunk, existingContext);
onProgress({
stage: "chunk_done",
chunk_index: index + 1,
chunk_count: chunks.length,
persons: chunkAnalysis.persons.length,
organizations: chunkAnalysis.organizations.length,
events: chunkAnalysis.events.length,
topics: chunkAnalysis.topics.length,
relations: chunkAnalysis.relations.length
});
return chunkAnalysis;
} catch (error) {
onProgress({
stage: "chunk_failed",
chunk_index: index + 1,
chunk_count: chunks.length,
error: error?.message || "chunk_analyze_failed"
});
return { persons: [], events: [], topics: [], relations: [] };
}
});
const merged = mergeAnalyses(analyses);
onProgress({
stage: "merge_done",
persons: merged.persons.length,
organizations: merged.organizations.length,
events: merged.events.length,
topics: merged.topics.length,
relations: merged.relations.length
});
if (!expertReviewEnabled || chunks.length < 2) {
return merged;
}
onProgress({ stage: "expert_review_start" });
try {
const reviewed = await this._expertReviewMergedAnalysis(merged, existingContext);
onProgress({
stage: "expert_review_done",
persons: reviewed.persons.length,
organizations: reviewed.organizations.length,
events: reviewed.events.length,
topics: reviewed.topics.length,
relations: reviewed.relations.length
});
return reviewed;
} catch (error) {
onProgress({
stage: "expert_review_failed",
error: error?.message || "expert_review_failed"
});
return merged;
}
}
async adjudicateEventCorrections(text, recentEvents = [], options = {}) {
const rawText = typeof text === "string" ? text.trim() : "";
if (!rawText) return { decisions: [] };
const maxEvents = Math.min(Math.max(Number(options?.maxEvents || 12), 1), 20);
const minConfidence = Math.min(Math.max(Number(options?.minConfidence ?? 0.72), 0), 1);
const compactEvents = (Array.isArray(recentEvents) ? recentEvents : [])
.slice(0, maxEvents)
.map((e) => ({
id: e.id,
type: e.type || "general",
summary: String(e.summary || "").slice(0, 120),
occurred_at: e.occurred_at || null,
importance: e.importance ?? 5
}))
.filter((e) => e.id);
if (compactEvents.length === 0) return { decisions: [] };
const systemPrompt = `你是“事件纠错裁决器”,按低成本多角色流程工作:抽取员->校验员->裁决员。
只返回 JSON
{
"decisions": [
{
"event_id": "事件ID",
"action": "keep | invalidate | update",
"confidence": 0.0,
"reason": "一句话",
"new_summary": "仅 action=update 时可填",
"new_type": "仅 action=update 时可填",
"new_importance": 1-10
}
]
}
规则:
1) 仅在用户明确表达“误会、说错、澄清、撤回、并非、不是”等纠错语义时,才给 invalidate/update。
2) 不要猜测;证据不足就 keep。
3) event_id 必须来自输入列表。
4) confidence < ${minConfidence} 的决策不要输出。`;
const result = await this.chat([
{ role: "system", content: systemPrompt },
{
role: "user",
content: `用户新输入:\n${rawText}\n\n候选历史事件:\n${JSON.stringify(compactEvents, null, 2)}`
}
], 0.1);
const content = result?.choices?.[0]?.message?.content || "";
let parsed;
try {
parsed = parseJsonObject(content);
} catch {
parsed = { decisions: [] };
}
const validActions = new Set(["keep", "invalidate", "update"]);
const allowedIds = new Set(compactEvents.map((e) => e.id));
const decisions = (Array.isArray(parsed?.decisions) ? parsed.decisions : [])
.filter((d) => d && allowedIds.has(d.event_id) && validActions.has(String(d.action || "").trim()))
.map((d) => ({
event_id: d.event_id,
action: String(d.action).trim(),
confidence: Math.min(Math.max(Number(d.confidence ?? 0), 0), 1),
reason: String(d.reason || "").slice(0, 120),
new_summary: d.new_summary ? String(d.new_summary).slice(0, 160) : "",
new_type: d.new_type ? String(d.new_type).slice(0, 40) : "",
new_importance: Number.isFinite(Number(d.new_importance)) ? Math.min(10, Math.max(1, Number(d.new_importance))) : null
}))
.filter((d) => d.confidence >= minConfidence && d.action !== "keep");
return { decisions };
}
async detectImplicitCorrectionIntent(text, recentEvents = [], options = {}) {
const rawText = typeof text === "string" ? text.trim() : "";
if (!rawText) return { trigger: false, confidence: 0, reason: "empty_text" };
const maxEvents = Math.min(Math.max(Number(options?.maxEvents || 6), 1), 12);
const minConfidence = Math.min(Math.max(Number(options?.minConfidence ?? 0.78), 0), 1);
const compactEvents = (Array.isArray(recentEvents) ? recentEvents : [])
.slice(0, maxEvents)
.map((e) => ({
id: e.id,
type: e.type || "general",
summary: String(e.summary || "").slice(0, 80)
}))
.filter((e) => e.id);
if (compactEvents.length === 0) return { trigger: false, confidence: 0, reason: "no_events" };
const result = await this.chat([
{
role: "system",
content: `你是“纠错触发判定器”。判断用户新输入是否在表达“对历史事件的反转/否定/修正”。只返回 JSON
{"trigger":true|false,"confidence":0.0,"reason":"一句话"}
规则:
1) 仅当明确出现与历史事件相反的事实时 trigger=true
2) 普通新增信息、情绪表达、泛化观点都 trigger=false
3) 不要猜测,保守判断`
},
{
role: "user",
content: `用户新输入:\n${rawText}\n\n最近事件:\n${JSON.stringify(compactEvents, null, 2)}`
}
], 0.1);
const content = result?.choices?.[0]?.message?.content || "";
let parsed;
try {
parsed = parseJsonObject(content);
} catch {
parsed = { trigger: false, confidence: 0, reason: "parse_failed" };
}
const confidence = Math.min(Math.max(Number(parsed?.confidence ?? 0), 0), 1);
const trigger = Boolean(parsed?.trigger) && confidence >= minConfidence;
return {
trigger,
confidence,
reason: String(parsed?.reason || "").slice(0, 120)
};
}
isEmptyAnalysis(data) {
return !data
|| (!Array.isArray(data.persons) || data.persons.length === 0)
&& (!Array.isArray(data.organizations) || data.organizations.length === 0)
&& (!Array.isArray(data.events) || data.events.length === 0)
&& (!Array.isArray(data.topics) || data.topics.length === 0)
&& (!Array.isArray(data.relations) || data.relations.length === 0);

View File

@@ -0,0 +1,362 @@
const createHttpError = (statusCode, message) => {
const error = new Error(message);
error.statusCode = statusCode;
return error;
};
const parseJsonObject = (content) => {
if (!content || typeof content !== "string") {
throw new Error("empty content");
}
const direct = content.trim();
try {
return JSON.parse(direct);
} catch {}
const codeBlockMatch = direct.match(/```json\s*([\s\S]*?)```/i) || direct.match(/```\s*([\s\S]*?)```/i);
if (codeBlockMatch?.[1]) {
try {
return JSON.parse(codeBlockMatch[1].trim());
} catch {}
}
const start = direct.indexOf("{");
const end = direct.lastIndexOf("}");
if (start >= 0 && end > start) {
return JSON.parse(direct.slice(start, end + 1));
}
throw new Error("json not found");
};
export class MultiAgentService {
constructor({ llmService, logger }) {
this.llmService = llmService;
this.logger = logger;
}
_fallbackPlan(task, context, maxAgents) {
const n = Math.min(Math.max(Number(maxAgents || 4), 2), 8);
const baseAgents = [
{
id: "planner",
name: "任务规划Agent",
role: "planner",
goal: "澄清目标、边界与验收标准",
input: "原始任务、约束、上下文",
output: "结构化目标与执行计划"
},
{
id: "analyst",
name: "证据分析Agent",
role: "analyst",
goal: "识别事实、风险与依赖",
input: "规划结果与上下文信息",
output: "关键证据、风险与优先级"
},
{
id: "executor",
name: "执行Agent",
role: "executor",
goal: "将计划落地为可执行动作",
input: "分析结论与执行约束",
output: "执行步骤与回滚策略"
},
{
id: "reviewer",
name: "评审Agent",
role: "reviewer",
goal: "检查质量、合规与可交付性",
input: "执行结果与验收标准",
output: "评审结论与修正建议"
}
];
const agents = baseAgents.slice(0, n);
const workflow = agents.slice(1).map((agent, index) => ({
from: agents[index].id,
to: agent.id,
handoff: "传递结构化结果"
}));
return {
ok: true,
mode: "fallback",
plan_title: "多Agent任务拆解",
task,
context: context || "",
agents,
workflow,
milestones: [
"完成目标澄清",
"完成风险评估",
"完成执行方案",
"完成质量评审"
],
risks: [
"上下文不足导致拆解偏差",
"依赖未显式声明导致计划延误",
"执行与验收标准不一致"
]
};
}
_normalizePlan(plan, maxAgents) {
const agents = Array.isArray(plan?.agents) ? plan.agents.slice(0, maxAgents) : [];
const agentIds = new Set(agents.map((a) => a?.id).filter(Boolean));
const workflow = (Array.isArray(plan?.workflow) ? plan.workflow : [])
.filter((w) => agentIds.has(w?.from) && agentIds.has(w?.to));
return {
ok: true,
mode: plan?.mode || "llm",
plan_title: plan?.plan_title || "多Agent任务拆解",
task: plan?.task || "",
context: plan?.context || "",
agents,
workflow,
milestones: Array.isArray(plan?.milestones) ? plan.milestones : [],
risks: Array.isArray(plan?.risks) ? plan.risks : []
};
}
async executeTaskWorkflow(payload = {}) {
const task = String(payload.task || "").trim();
const context = String(payload.context || "").trim();
const constraints = Array.isArray(payload.constraints) ? payload.constraints : [];
const maxAgents = Math.min(Math.max(Number(payload.max_agents || 4), 2), 8);
const inputPlan = payload.plan && typeof payload.plan === "object" ? payload.plan : null;
if (!task && !inputPlan) {
throw createHttpError(400, "task 不能为空");
}
this.logger?.info?.({
event: "multi_agent_execute_start",
has_input_plan: Boolean(inputPlan),
task_length: task.length,
context_length: context.length,
constraints_count: constraints.length,
max_agents: maxAgents
});
const plan = inputPlan
? this._normalizePlan({
...inputPlan,
task: inputPlan.task || task,
context: inputPlan.context || context
}, maxAgents)
: await this.decomposeTask({ task, context, constraints, max_agents: maxAgents });
if (!Array.isArray(plan.agents) || plan.agents.length < 2) {
throw createHttpError(400, "plan.agents 至少需要 2 个");
}
if (!this.llmService?.isEnabled?.()) {
const steps = plan.agents.map((agent, index) => ({
index: index + 1,
agent_id: agent.id,
agent_name: agent.name,
role: agent.role,
result: `${agent.name}已完成占位执行请在启用LLM后获得真实执行结果`,
deliverables: [agent.output || "结构化输出"],
risks: [],
handoff_to_next: index < plan.agents.length - 1 ? plan.agents[index + 1].id : null
}));
this.logger?.warn?.({
event: "multi_agent_execute_fallback",
reason: "llm_not_enabled",
step_count: steps.length
});
return {
ok: true,
mode: "fallback",
task: plan.task,
plan,
steps,
final_summary: "已按回退模式完成流程编排,当前结果为占位执行"
};
}
const steps = [];
for (let i = 0; i < plan.agents.length; i += 1) {
const agent = plan.agents[i];
const previousOutputs = steps.map((s) => ({
agent_id: s.agent_id,
result: s.result,
deliverables: s.deliverables
}));
const handoffFromWorkflow = plan.workflow.find((w) => w.from === agent.id)?.handoff || "";
const systemPrompt = `你是${agent.name},角色是${agent.role}。你需要对任务执行本角色工作并返回JSON不要输出任何额外文字。
输出格式:
{
"result":"本角色结论",
"deliverables":["交付物1","交付物2"],
"risks":["风险1","风险2"],
"next_handoff":"给下个Agent的交接信息"
}`;
const userPrompt = JSON.stringify({
task: plan.task,
context: plan.context,
constraints,
agent,
handoff_from_workflow: handoffFromWorkflow,
previous_outputs: previousOutputs
});
let stepResult;
try {
const response = await this.llmService.chat([
{ role: "system", content: systemPrompt },
{ role: "user", content: userPrompt }
], 0.2);
const content = response?.choices?.[0]?.message?.content || "";
const parsed = parseJsonObject(content);
stepResult = {
index: i + 1,
agent_id: agent.id,
agent_name: agent.name,
role: agent.role,
result: parsed.result || "",
deliverables: Array.isArray(parsed.deliverables) ? parsed.deliverables : [],
risks: Array.isArray(parsed.risks) ? parsed.risks : [],
next_handoff: parsed.next_handoff || ""
};
} catch (error) {
this.logger?.warn?.({
event: "multi_agent_execute_step_failed",
agent_id: agent.id,
message: error?.message || "unknown_error"
});
stepResult = {
index: i + 1,
agent_id: agent.id,
agent_name: agent.name,
role: agent.role,
result: "本步骤执行失败,已跳过",
deliverables: [],
risks: ["LLM返回异常或解析失败"],
next_handoff: ""
};
}
steps.push(stepResult);
}
const finalSummary = steps.map((s) => `[${s.agent_name}] ${s.result}`).join(" | ");
this.logger?.info?.({
event: "multi_agent_execute_success",
step_count: steps.length,
failed_step_count: steps.filter((s) => s.result === "本步骤执行失败,已跳过").length
});
return {
ok: true,
mode: "llm",
task: plan.task,
plan,
steps,
final_summary: finalSummary
};
}
async decomposeTask(payload = {}) {
const task = String(payload.task || payload.query || "").trim();
const context = String(payload.context || "").trim();
const constraints = Array.isArray(payload.constraints) ? payload.constraints : [];
const maxAgents = Math.min(Math.max(Number(payload.max_agents || 4), 2), 8);
if (!task) {
throw createHttpError(400, "task 不能为空");
}
this.logger?.info?.({
event: "multi_agent_decompose_start",
task_length: task.length,
context_length: context.length,
constraints_count: constraints.length,
max_agents: maxAgents
});
if (!this.llmService?.isEnabled?.()) {
const fallback = this._fallbackPlan(task, context, maxAgents);
this.logger?.warn?.({
event: "multi_agent_decompose_fallback",
reason: "llm_not_enabled",
agent_count: fallback.agents.length
});
return fallback;
}
const systemPrompt = `你是多Agent编排专家。请把任务拆解为多个可协作智能体仅返回JSON。
输出格式:
{
"plan_title":"字符串",
"agents":[
{"id":"a1","name":"名称","role":"planner|analyst|executor|reviewer|researcher","goal":"目标","input":"输入","output":"输出"}
],
"workflow":[
{"from":"a1","to":"a2","handoff":"交接内容"}
],
"milestones":["里程碑1","里程碑2"],
"risks":["风险1","风险2"]
}
规则:
1) agents 数量为 2-${maxAgents}
2) id 唯一workflow 中引用必须有效
3) 结果要可执行、可验收
4) 不要输出任何解释文字`;
const userPrompt = JSON.stringify({
task,
context,
constraints
});
let parsed;
try {
const response = await this.llmService.chat([
{ role: "system", content: systemPrompt },
{ role: "user", content: userPrompt }
], 0.2);
const content = response?.choices?.[0]?.message?.content || "";
parsed = parseJsonObject(content);
} catch (error) {
this.logger?.warn?.({
event: "multi_agent_decompose_parse_failed",
message: error?.message || "unknown_error"
});
return this._fallbackPlan(task, context, maxAgents);
}
const agents = Array.isArray(parsed.agents) ? parsed.agents.slice(0, maxAgents) : [];
const idSet = new Set(agents.map((a) => a?.id).filter(Boolean));
const workflow = (Array.isArray(parsed.workflow) ? parsed.workflow : [])
.filter((w) => idSet.has(w?.from) && idSet.has(w?.to));
const result = {
ok: true,
mode: "llm",
plan_title: parsed.plan_title || "多Agent任务拆解",
task,
context,
agents,
workflow,
milestones: Array.isArray(parsed.milestones) ? parsed.milestones : [],
risks: Array.isArray(parsed.risks) ? parsed.risks : []
};
if (result.agents.length < 2) {
const fallback = this._fallbackPlan(task, context, maxAgents);
this.logger?.warn?.({
event: "multi_agent_decompose_invalid_output",
reason: "agent_count_lt_2"
});
return fallback;
}
this.logger?.info?.({
event: "multi_agent_decompose_success",
mode: result.mode,
agent_count: result.agents.length,
workflow_count: result.workflow.length,
milestone_count: result.milestones.length,
risk_count: result.risks.length
});
return result;
}
}

View File

@@ -9,10 +9,30 @@ export class RerankService {
this.baseUrl = (env.RERANK_BASE_URL ?? "").replace(/\/+$/, "");
this.apiKey = env.RERANK_API_KEY ?? "";
this.model = env.RERANK_MODEL ?? "";
this.disabledReason = "";
const candidates = [];
if (this.baseUrl.endsWith("/v1")) {
candidates.push(`${this.baseUrl}/rerank`);
} else if (this.baseUrl) {
candidates.push(`${this.baseUrl}/v1/rerank`, `${this.baseUrl}/rerank`);
}
if (this.baseUrl.includes("dashscope.aliyuncs.com")) {
candidates.push("https://dashscope.aliyuncs.com/api/v1/services/rerank/text-rerank/text-rerank");
}
this.endpointCandidates = [...new Set(candidates)];
}
isEnabled() {
return Boolean(this.baseUrl && this.apiKey && this.model);
return Boolean(this.baseUrl && this.apiKey && this.model) && !this.disabledReason;
}
getRuntimeInfo() {
return {
configured: Boolean(this.baseUrl && this.apiKey && this.model),
enabled: this.isEnabled(),
model: this.model || null,
disabled_reason: this.disabledReason || null
};
}
/**
@@ -38,47 +58,63 @@ export class RerankService {
const texts = chunks.map(c => c.text);
try {
// 假设使用类似 OpenAI 或通用的 rerank 接口格式
// 实际使用时需根据具体第三方模型的 API 调整参数和路径
const response = await fetch(`${this.baseUrl}/v1/rerank`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${this.apiKey}`
},
body: JSON.stringify({
model: this.model,
query: cleanedQuery,
texts: texts
})
});
if (!response.ok) {
const errorText = await response.text();
let response = null;
let errorText = "";
let usedEndpoint = "";
for (const endpoint of this.endpointCandidates) {
usedEndpoint = endpoint;
const isDashScopeTextRerank = endpoint.includes("/services/rerank/text-rerank/text-rerank");
const body = isDashScopeTextRerank
? {
model: this.model,
input: {
query: cleanedQuery,
documents: texts
},
parameters: {
return_documents: false,
top_n: texts.length
}
}
: {
model: this.model,
query: cleanedQuery,
texts
};
response = await fetch(endpoint, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${this.apiKey}`
},
body: JSON.stringify(body)
});
if (response.ok) break;
errorText = await response.text();
if (response.status === 404) continue;
throw createHttpError(response.status, `rerank 请求失败: ${errorText}`);
}
if (!response?.ok) {
this.disabledReason = `endpoint_not_supported:${usedEndpoint || "unknown"}`;
return chunks;
}
const data = await response.json();
// data.results 格式通常为: [{ index: 0, relevance_score: 0.9 }, ...]
const results = data?.results;
const results = Array.isArray(data?.results) ? data.results : data?.output?.results;
if (!Array.isArray(results)) {
throw createHttpError(500, "rerank 返回格式异常");
}
// 根据重排结果重新排序 chunks
const rerankedChunks = results
this.disabledReason = "";
return results
.sort((a, b) => b.relevance_score - a.relevance_score)
.map(r => ({
...chunks[r.index],
relevance_score: r.relevance_score
}));
return rerankedChunks;
} catch (error) {
// 重排失败时,为了不阻断流程,可以选择直接返回原结果并记录日志,或者抛出错误
console.error("Rerank error:", error);
return chunks;
} catch {
return chunks;
}
}
}