import neo4j from "neo4j-driver"; /** * 将 ISO 时间转换为秒级时间戳,用于 Qdrant 的范围过滤。 */ const toTimestamp = (value) => { if (!value) return null; const ms = Date.parse(value); if (Number.isNaN(ms)) return null; return Math.floor(ms / 1000); }; const normalizeOccurredAt = (value) => { if (typeof value !== "string" || !value.trim()) { return new Date().toISOString(); } const ms = Date.parse(value); if (Number.isNaN(ms)) { return new Date().toISOString(); } return new Date(ms).toISOString(); }; /** * 统一 Neo4j 结果结构,避免控制器层处理图数据库 Record 细节。 */ const toEventDto = (record) => ({ id: record.get("id"), type: record.get("type"), summary: record.get("summary"), occurred_at: record.get("occurred_at"), importance: record.get("importance"), topics: record.get("topics") ?? [], participants: record.get("participants") ?? [] }); /** * 构造标准业务错误,便于控制器返回统一 HTTP 状态码。 */ const createHttpError = (statusCode, message) => { const error = new Error(message); error.statusCode = statusCode; return error; }; /** * 校验 /ingest 入参与向量维度。 */ const validateIngestInput = (body, embeddingDim, canAutoEmbed) => { const persons = Array.isArray(body.persons) ? body.persons : []; const events = Array.isArray(body.events) ? body.events : []; const chunks = Array.isArray(body.chunks) ? body.chunks : []; for (const person of persons) { if (!person?.id) { throw createHttpError(400, "persons[].id 必填"); } } for (const event of events) { if (!event?.id || !event?.occurred_at) { throw createHttpError(400, "events[].id 与 events[].occurred_at 必填"); } if (!Array.isArray(event.participants) || event.participants.length === 0) { throw createHttpError(400, "events[].participants 至少包含 1 个人物 id"); } } for (const chunk of chunks) { if (!chunk?.id) { throw createHttpError(400, "chunks[].id 必填"); } const hasVector = Array.isArray(chunk?.vector); const hasText = typeof chunk?.text === "string" && chunk.text.trim().length > 0; if (!hasVector && !(canAutoEmbed && hasText)) { throw createHttpError(400, "chunks[] 需提供 vector,或在配置 embedding 后提供 text"); } if (hasVector && chunk.vector.length !== embeddingDim) { throw createHttpError(400, `chunks[].vector 维度必须为 ${embeddingDim}`); } } return { persons, events, chunks }; }; /** * GraphRAG 核心服务: * 1) 图谱结构写入 Neo4j; * 2) 文本向量写入 Qdrant; * 3) 按时序与向量联合检索上下文。 */ export class GraphRagService { /** * @param {{ driver: import("neo4j-driver").Driver, qdrantClient: any, env: Record }} deps */ constructor({ driver, qdrantClient, embeddingService, rerankService, llmService, env }) { this.driver = driver; this.qdrantClient = qdrantClient; this.embeddingService = embeddingService; this.rerankService = rerankService; this.llmService = llmService; this.collection = env.QDRANT_COLLECTION; this.embeddingDim = env.EMBEDDING_DIM; } async resolveChunkVector(chunk) { if (Array.isArray(chunk?.vector)) { if (chunk.vector.length !== this.embeddingDim) { throw createHttpError(400, `chunks[].vector 维度必须为 ${this.embeddingDim}`); } return chunk.vector; } if (!this.embeddingService?.isEnabled()) { throw createHttpError(400, "未检测到可用 embedding 配置"); } return this.embeddingService.embed(chunk.text ?? ""); } async resolveQueryVector(body) { const queryVector = body?.query_vector; if (Array.isArray(queryVector)) { if (queryVector.length !== this.embeddingDim) { throw createHttpError(400, `query_vector 维度必须为 ${this.embeddingDim}`); } return queryVector; } const queryText = typeof body?.query_text === "string" ? body.query_text.trim() : ""; if (!queryText) { throw createHttpError(400, `query_vector 维度必须为 ${this.embeddingDim},或提供 query_text`); } if (!this.embeddingService?.isEnabled()) { throw createHttpError(400, "未检测到可用 embedding 配置,无法使用 query_text 检索"); } return this.embeddingService.embed(queryText); } /** * 连接就绪检查。 */ async ready() { const session = this.driver.session(); try { await session.run("RETURN 1 AS ok"); await this.qdrantClient.getCollections(); return { ok: true }; } finally { await session.close(); } } /** * 获取图谱统计数据,用于前端 D3.js 可视化渲染。 * 返回 nodes(人物/事件/主题)和 links(关系边)。 */ async getGraphStats() { const runQuery = async (query) => { const session = this.driver.session(); try { return await session.run(query) } finally { await session.close() } }; const [personResult, eventResult, topicResult, personEventResult, eventTopicResult] = await Promise.all([ runQuery(`MATCH (p:Person) RETURN p.id AS id, p.name AS name, 'person' AS type LIMIT 200`), runQuery(`MATCH (e:Event) RETURN e.id AS id, e.summary AS name, 'event' AS type, e.occurred_at AS occurred_at LIMIT 200`), runQuery(`MATCH (t:Topic) RETURN t.name AS id, t.name AS name, 'topic' AS type LIMIT 100`), runQuery(`MATCH (p:Person)-[:PARTICIPATES_IN]->(e:Event) RETURN p.id AS source, e.id AS target, 'PARTICIPATES_IN' AS type LIMIT 500`), runQuery(`MATCH (e:Event)-[:ABOUT]->(t:Topic) RETURN e.id AS source, t.name AS target, 'ABOUT' AS type LIMIT 300`) ]); const nodes = []; const idSet = new Set(); const addNode = (record) => { const id = record.get("id"); if (!id || idSet.has(id)) return; idSet.add(id); const occurredAt = record.keys.includes("occurred_at") ? record.get("occurred_at") : null; nodes.push({ id, name: record.get("name") ?? id, type: record.get("type"), occurred_at: occurredAt }); }; personResult.records.forEach(addNode); eventResult.records.forEach(addNode); topicResult.records.forEach(addNode); const links = [ ...personEventResult.records.map((r) => ({ source: r.get("source"), target: r.get("target"), type: r.get("type") })), ...eventTopicResult.records.map((r) => ({ source: r.get("source"), target: r.get("target"), type: r.get("type") })) ]; return { ok: true, nodes, links, total: nodes.length }; } /** * 初始化图谱约束与向量集合。 */ async bootstrap() { const session = this.driver.session(); try { await session.run("CREATE CONSTRAINT person_id IF NOT EXISTS FOR (p:Person) REQUIRE p.id IS UNIQUE"); await session.run("CREATE CONSTRAINT event_id IF NOT EXISTS FOR (e:Event) REQUIRE e.id IS UNIQUE"); await session.run("CREATE CONSTRAINT topic_name IF NOT EXISTS FOR (t:Topic) REQUIRE t.name IS UNIQUE"); await session.run("CREATE INDEX event_time IF NOT EXISTS FOR (e:Event) ON (e.occurred_at)"); const collections = await this.qdrantClient.getCollections(); const exists = collections.collections?.some((item) => item.name === this.collection); if (!exists) { await this.qdrantClient.createCollection(this.collection, { vectors: { size: this.embeddingDim, distance: "Cosine" } }); } return { ok: true, collection: this.collection }; } finally { await session.close(); } } /** * 写入人物、事件、主题关系,并可同步写入向量分片。 */ async ingest(body) { const { persons, events, chunks } = validateIngestInput( body ?? {}, this.embeddingDim, this.embeddingService?.isEnabled() ?? false ); const session = this.driver.session(); try { await session.executeWrite(async (tx) => { for (const person of persons) { await tx.run( ` MERGE (p:Person {id: $id}) SET p.name = coalesce($name, p.name), p.updated_at = datetime() `, { id: person.id, name: person.name ?? null } ); } for (const event of events) { await tx.run( ` MERGE (e:Event {id: $id}) SET e.type = $type, e.summary = $summary, e.occurred_at = datetime($occurred_at), e.importance = $importance, e.updated_at = datetime() `, { id: event.id, type: event.type ?? "event", summary: event.summary ?? "", occurred_at: event.occurred_at, importance: event.importance ?? 0.5 } ); for (const personId of event.participants) { await tx.run( ` MERGE (p:Person {id: $person_id}) SET p.updated_at = datetime() WITH p MATCH (e:Event {id: $event_id}) MERGE (p)-[:PARTICIPATES_IN]->(e) `, { person_id: personId, event_id: event.id } ); } const topics = Array.isArray(event.topics) ? event.topics : []; for (const topicName of topics) { await tx.run( ` MERGE (t:Topic {name: $name}) WITH t MATCH (e:Event {id: $event_id}) MERGE (e)-[:ABOUT]->(t) `, { name: topicName, event_id: event.id } ); } } }); if (chunks.length > 0) { const points = await Promise.all(chunks.map(async (chunk) => { const vector = await this.resolveChunkVector(chunk); const payload = chunk.payload ?? {}; return { id: chunk.id, vector, payload: { text: chunk.text ?? payload.text ?? "", event_id: payload.event_id ?? null, occurred_at: payload.occurred_at ?? null, occurred_ts: toTimestamp(payload.occurred_at), person_ids: Array.isArray(payload.person_ids) ? payload.person_ids : [], source: payload.source ?? "unknown" } }; })); await this.qdrantClient.upsert(this.collection, { points, wait: true }); } return { ok: true, ingested: { persons: persons.length, events: events.length, chunks: chunks.length } }; } finally { await session.close(); } } /** * 查询两个人在时间窗内的时序事件链。 */ async queryTimeline(body) { const { a_id, b_id, start, end, limit = 100 } = body ?? {}; if (!a_id || !b_id) { throw createHttpError(400, "a_id 和 b_id 必填"); } const session = this.driver.session(); try { const result = await session.run( ` MATCH (a:Person {id: $a_id})-[:PARTICIPATES_IN]->(e:Event)<-[:PARTICIPATES_IN]-(b:Person {id: $b_id}) WHERE ($start IS NULL OR e.occurred_at >= datetime($start)) AND ($end IS NULL OR e.occurred_at <= datetime($end)) OPTIONAL MATCH (e)-[:ABOUT]->(t:Topic) WITH e, collect(DISTINCT t.name) AS topics OPTIONAL MATCH (p:Person)-[:PARTICIPATES_IN]->(e) WITH e, topics, collect(DISTINCT {id: p.id, name: p.name}) AS participants RETURN e.id AS id, e.type AS type, e.summary AS summary, toString(e.occurred_at) AS occurred_at, e.importance AS importance, topics AS topics, participants AS participants ORDER BY e.occurred_at ASC LIMIT $limit `, { a_id, b_id, start: start ?? null, end: end ?? null, limit: neo4j.int(limit) } ); return { ok: true, total: result.records.length, timeline: result.records.map(toEventDto) }; } finally { await session.close(); } } /** * 先做向量召回,再回查图谱事件上下文,输出 GraphRAG 检索结果。 */ async queryGraphRag(body) { const { a_id = null, b_id = null, start = null, end = null, top_k = 8, timeline_limit = 60 } = body ?? {}; const queryVector = await this.resolveQueryVector(body ?? {}); const filterMust = []; const startTs = toTimestamp(start); const endTs = toTimestamp(end); if (startTs !== null || endTs !== null) { filterMust.push({ key: "occurred_ts", range: { ...(startTs !== null ? { gte: startTs } : {}), ...(endTs !== null ? { lte: endTs } : {}) } }); } const searchResult = await this.qdrantClient.search(this.collection, { vector: queryVector, limit: top_k, with_payload: true, ...(filterMust.length > 0 ? { filter: { must: filterMust } } : {}) }); let chunks = searchResult.map((item) => ({ id: item.id, score: item.score, text: item.payload?.text ?? "", payload: item.payload ?? {} })); if (body?.query_text && this.rerankService?.isEnabled()) { chunks = await this.rerankService.rerank(body.query_text, chunks); } const eventIds = Array.from( new Set( chunks .map((item) => item.payload?.event_id) .filter((id) => typeof id === "string" && id.length > 0) ) ); const session = this.driver.session(); try { const result = await session.run( ` MATCH (e:Event) WHERE (size($event_ids) = 0 OR e.id IN $event_ids) AND ($start IS NULL OR e.occurred_at >= datetime($start)) AND ($end IS NULL OR e.occurred_at <= datetime($end)) AND ($a_id IS NULL OR EXISTS { MATCH (:Person {id: $a_id})-[:PARTICIPATES_IN]->(e) }) AND ($b_id IS NULL OR EXISTS { MATCH (:Person {id: $b_id})-[:PARTICIPATES_IN]->(e) }) OPTIONAL MATCH (e)-[:ABOUT]->(t:Topic) WITH e, collect(DISTINCT t.name) AS topics OPTIONAL MATCH (p:Person)-[:PARTICIPATES_IN]->(e) WITH e, topics, collect(DISTINCT {id: p.id, name: p.name}) AS participants RETURN e.id AS id, e.type AS type, e.summary AS summary, toString(e.occurred_at) AS occurred_at, e.importance AS importance, topics AS topics, participants AS participants ORDER BY e.occurred_at DESC LIMIT $timeline_limit `, { event_ids: eventIds, start, end, a_id, b_id, timeline_limit: neo4j.int(timeline_limit) } ); return { ok: true, retrieved_chunks: chunks, timeline_context: result.records.map(toEventDto) }; } finally { await session.close(); } } async analyzeAndIngest(text) { if (!this.llmService?.isEnabled()) { throw createHttpError(400, "LLM 服务未配置,请检查 LLM_BASE_URL/LLM_API_KEY/LLM_MODEL_NAME"); } const analysis = await this.llmService.analyzeText(text); console.log("[DEBUG] LLM analysis result:", JSON.stringify(analysis)); if (!analysis || (!analysis.persons && !analysis.events && !analysis.topics)) { throw createHttpError(500, `LLM 返回数据异常: ${JSON.stringify(analysis)}`); } const session = this.driver.session(); console.log("[DEBUG] Got session, driver:", !!this.driver); try { const deleteResult = await session.run("MATCH (n) DETACH DELETE n", {}); console.log("[DEBUG] Delete result:", deleteResult?.summary?.counters); const personMap = {}; for (const person of (analysis.persons || [])) { console.log("[DEBUG] Creating person:", person); const result = await session.run( `CREATE (p:Person {id: $id, name: $name, description: $description}) RETURN p.id AS id`, { id: person.id, name: person.name, description: person.description || "" } ); console.log("[DEBUG] Person create result:", result?.records?.length); if (result?.records?.length > 0) { personMap[person.id] = person.name; } } const topicMap = {}; for (const topic of (analysis.topics || [])) { await session.run( `CREATE (t:Topic {name: $name})`, { name: topic.name } ); topicMap[topic.id] = topic.name; } for (const event of (analysis.events || [])) { const normalizedOccurredAt = normalizeOccurredAt(event.occurred_at); await session.run( `CREATE (e:Event { id: $id, type: $type, summary: $summary, occurred_at: datetime($occurred_at), importance: $importance })`, { id: event.id, type: event.type || "general", summary: event.summary || "", occurred_at: normalizedOccurredAt, importance: neo4j.int(event.importance || 5) } ); for (const pid of (event.participants || [])) { await session.run( `MATCH (p:Person {id: $pid}), (e:Event {id: $eid}) MERGE (p)-[:PARTICIPATES_IN]->(e)`, { pid, eid: event.id } ); } for (const tid of (event.topics || [])) { const topicName = topicMap[tid]; if (topicName) { await session.run( `MATCH (e:Event {id: $eid}), (t:Topic {name: $tname}) MERGE (e)-[:ABOUT]->(t)`, { eid: event.id, tname: topicName } ); } } } for (const rel of (analysis.relations || [])) { const sourceName = personMap[rel.source]; const targetName = personMap[rel.target]; if (sourceName && targetName) { await session.run( `MATCH (s:Person {name: $sname}), (t:Person {name: $tname}) MERGE (s)-[r:${rel.type}]->(t)`, { sname: sourceName, tname: targetName } ); } } return { ok: true, message: "分析并导入成功", analysis, stats: { persons: (analysis.persons || []).length, events: (analysis.events || []).length, topics: (analysis.topics || []).length, relations: (analysis.relations || []).length } }; } finally { await session.close(); } } }