| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- IndexEntityMixin extracted from IndexManager.
- """
- from __future__ import annotations
- import json
- import logging
- import re
- import sqlite3
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- logger = logging.getLogger(__name__)
- class IndexEntityMixin:
- def upsert_entity(self, entity: EntityMeta, update_metadata: bool = False) -> bool:
- """
- 插入或更新实体 (智能合并)
- - 新实体: 直接插入
- - 已存在: 更新 current_json, last_appearance, updated_at
- - update_metadata=True: 同时更新 canonical_name/tier/desc/is_protagonist/is_archived
- 返回是否为新实体
- """
- with self._get_conn() as conn:
- cursor = conn.cursor()
- # 检查是否存在
- cursor.execute(
- "SELECT id, current_json FROM entities WHERE id = ?", (entity.id,)
- )
- existing = cursor.fetchone()
- if existing:
- # 已存在: 智能合并 current_json
- old_current = {}
- if existing["current_json"]:
- try:
- old_current = json.loads(existing["current_json"])
- except json.JSONDecodeError as exc:
- logger.warning(
- "failed to parse JSON in entities.current_json: %s",
- exc,
- )
- # 合并 current (新值覆盖旧值)
- merged_current = {**old_current, **entity.current}
- if update_metadata:
- # 完整更新(包括元数据)
- cursor.execute(
- """
- UPDATE entities SET
- canonical_name = ?,
- tier = ?,
- desc = ?,
- current_json = ?,
- last_appearance = ?,
- is_protagonist = ?,
- is_archived = ?,
- updated_at = CURRENT_TIMESTAMP
- WHERE id = ?
- """,
- (
- entity.canonical_name,
- entity.tier,
- entity.desc,
- json.dumps(merged_current, ensure_ascii=False),
- entity.last_appearance,
- 1 if entity.is_protagonist else 0,
- 1 if entity.is_archived else 0,
- entity.id,
- ),
- )
- else:
- # 只更新 current 和 last_appearance
- cursor.execute(
- """
- UPDATE entities SET
- current_json = ?,
- last_appearance = ?,
- updated_at = CURRENT_TIMESTAMP
- WHERE id = ?
- """,
- (
- json.dumps(merged_current, ensure_ascii=False),
- entity.last_appearance,
- entity.id,
- ),
- )
- conn.commit()
- return False
- else:
- # 新实体: 插入
- cursor.execute(
- """
- INSERT INTO entities
- (id, type, canonical_name, tier, desc, current_json,
- first_appearance, last_appearance, is_protagonist, is_archived)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (
- entity.id,
- entity.type,
- entity.canonical_name,
- entity.tier,
- entity.desc,
- json.dumps(entity.current, ensure_ascii=False),
- entity.first_appearance,
- entity.last_appearance,
- 1 if entity.is_protagonist else 0,
- 1 if entity.is_archived else 0,
- ),
- )
- conn.commit()
- return True
- def get_entity(self, entity_id: str) -> Optional[Dict]:
- """获取单个实体"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM entities WHERE id = ?", (entity_id,))
- row = cursor.fetchone()
- if row:
- return self._row_to_dict(row, parse_json=["current_json"])
- return None
- def get_entities_by_type(
- self, entity_type: str, include_archived: bool = False
- ) -> List[Dict]:
- """按类型获取实体"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- if include_archived:
- cursor.execute(
- """
- SELECT * FROM entities WHERE type = ?
- ORDER BY last_appearance DESC
- """,
- (entity_type,),
- )
- else:
- cursor.execute(
- """
- SELECT * FROM entities WHERE type = ? AND is_archived = 0
- ORDER BY last_appearance DESC
- """,
- (entity_type,),
- )
- return [
- self._row_to_dict(row, parse_json=["current_json"])
- for row in cursor.fetchall()
- ]
- def get_entities_by_tier(self, tier: str) -> List[Dict]:
- """按重要度获取实体 (核心/重要/次要/装饰)"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT * FROM entities WHERE tier = ? AND is_archived = 0
- ORDER BY last_appearance DESC
- """,
- (tier,),
- )
- return [
- self._row_to_dict(row, parse_json=["current_json"])
- for row in cursor.fetchall()
- ]
- def get_core_entities(self) -> List[Dict]:
- """获取所有核心实体 (用于 Context Agent 全量加载)"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute("""
- SELECT * FROM entities
- WHERE (tier IN ('核心', '重要') OR is_protagonist = 1) AND is_archived = 0
- ORDER BY is_protagonist DESC, tier, last_appearance DESC
- """)
- return [
- self._row_to_dict(row, parse_json=["current_json"])
- for row in cursor.fetchall()
- ]
- def get_protagonist(self) -> Optional[Dict]:
- """获取主角实体"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM entities WHERE is_protagonist = 1 LIMIT 1")
- row = cursor.fetchone()
- if row:
- return self._row_to_dict(row, parse_json=["current_json"])
- return None
- def update_entity_current(self, entity_id: str, updates: Dict) -> bool:
- """
- 增量更新实体的 current 字段 (不覆盖其他字段)
- 例如: update_entity_current("xiaoyan", {"realm": "斗师"})
- """
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- "SELECT current_json FROM entities WHERE id = ?", (entity_id,)
- )
- row = cursor.fetchone()
- if not row:
- return False
- current = {}
- if row["current_json"]:
- try:
- current = json.loads(row["current_json"])
- except json.JSONDecodeError as exc:
- logger.warning(
- "failed to parse JSON in update_entity_current current_json: %s",
- exc,
- )
- current.update(updates)
- cursor.execute(
- """
- UPDATE entities SET
- current_json = ?,
- updated_at = CURRENT_TIMESTAMP
- WHERE id = ?
- """,
- (json.dumps(current, ensure_ascii=False), entity_id),
- )
- conn.commit()
- return True
- def archive_entity(self, entity_id: str) -> bool:
- """归档实体 (不删除,只是标记)"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- UPDATE entities SET is_archived = 1, updated_at = CURRENT_TIMESTAMP
- WHERE id = ?
- """,
- (entity_id,),
- )
- conn.commit()
- return cursor.rowcount > 0
- # ==================== v5.1 别名操作 ====================
- def register_alias(self, alias: str, entity_id: str, entity_type: str) -> bool:
- """
- 注册别名 (支持一对多)
- 同一别名可映射多个实体 (如 "天云宗" → 地点 + 势力)
- """
- with self._get_conn() as conn:
- cursor = conn.cursor()
- try:
- cursor.execute(
- """
- INSERT OR IGNORE INTO aliases (alias, entity_id, entity_type)
- VALUES (?, ?, ?)
- """,
- (alias, entity_id, entity_type),
- )
- conn.commit()
- return cursor.rowcount > 0
- except sqlite3.IntegrityError:
- return False
- def get_entities_by_alias(self, alias: str) -> List[Dict]:
- """
- 根据别名查找实体 (一对多)
- 返回所有匹配的实体 (可能有多个不同类型)
- """
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT e.*, a.entity_type as alias_type
- FROM entities e
- JOIN aliases a ON e.id = a.entity_id
- WHERE a.alias = ?
- """,
- (alias,),
- )
- return [
- self._row_to_dict(row, parse_json=["current_json"])
- for row in cursor.fetchall()
- ]
- def get_entity_aliases(self, entity_id: str) -> List[str]:
- """获取实体的所有别名"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- "SELECT alias FROM aliases WHERE entity_id = ?", (entity_id,)
- )
- return [row["alias"] for row in cursor.fetchall()]
- def remove_alias(self, alias: str, entity_id: str) -> bool:
- """移除别名"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- "DELETE FROM aliases WHERE alias = ? AND entity_id = ?",
- (alias, entity_id),
- )
- conn.commit()
- return cursor.rowcount > 0
- # ==================== v5.1 状态变化操作 ====================
- def record_state_change(self, change: StateChangeMeta) -> int:
- """
- 记录状态变化
- 返回记录 ID
- """
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- INSERT INTO state_changes
- (entity_id, field, old_value, new_value, reason, chapter)
- VALUES (?, ?, ?, ?, ?, ?)
- """,
- (
- change.entity_id,
- change.field,
- change.old_value,
- change.new_value,
- change.reason,
- change.chapter,
- ),
- )
- conn.commit()
- return cursor.lastrowid
- def get_entity_state_changes(self, entity_id: str, limit: int = 20) -> List[Dict]:
- """获取实体的状态变化历史"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT * FROM state_changes
- WHERE entity_id = ?
- ORDER BY chapter DESC, id DESC
- LIMIT ?
- """,
- (entity_id, limit),
- )
- return [dict(row) for row in cursor.fetchall()]
- def get_recent_state_changes(self, limit: int = 50) -> List[Dict]:
- """获取最近的状态变化"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT * FROM state_changes
- ORDER BY chapter DESC, id DESC
- LIMIT ?
- """,
- (limit,),
- )
- return [dict(row) for row in cursor.fetchall()]
- def get_chapter_state_changes(self, chapter: int) -> List[Dict]:
- """获取某章的所有状态变化"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT * FROM state_changes
- WHERE chapter = ?
- ORDER BY id
- """,
- (chapter,),
- )
- return [dict(row) for row in cursor.fetchall()]
- # ==================== v5.1 关系操作 ====================
- def upsert_relationship(self, rel: RelationshipMeta) -> bool:
- """
- 插入或更新关系
- 相同 (from, to, type) 会更新 description 和 chapter
- 返回是否为新关系
- """
- with self._get_conn() as conn:
- cursor = conn.cursor()
- # 检查是否存在
- cursor.execute(
- """
- SELECT id FROM relationships
- WHERE from_entity = ? AND to_entity = ? AND type = ?
- """,
- (rel.from_entity, rel.to_entity, rel.type),
- )
- existing = cursor.fetchone()
- if existing:
- cursor.execute(
- """
- UPDATE relationships SET
- description = ?,
- chapter = ?
- WHERE id = ?
- """,
- (rel.description, rel.chapter, existing["id"]),
- )
- conn.commit()
- return False
- else:
- cursor.execute(
- """
- INSERT INTO relationships
- (from_entity, to_entity, type, description, chapter)
- VALUES (?, ?, ?, ?, ?)
- """,
- (
- rel.from_entity,
- rel.to_entity,
- rel.type,
- rel.description,
- rel.chapter,
- ),
- )
- conn.commit()
- return True
- def get_entity_relationships(
- self, entity_id: str, direction: str = "both"
- ) -> List[Dict]:
- """
- 获取实体的关系
- direction: "from" | "to" | "both"
- """
- with self._get_conn() as conn:
- cursor = conn.cursor()
- if direction == "from":
- cursor.execute(
- """
- SELECT * FROM relationships WHERE from_entity = ?
- ORDER BY chapter DESC
- """,
- (entity_id,),
- )
- elif direction == "to":
- cursor.execute(
- """
- SELECT * FROM relationships WHERE to_entity = ?
- ORDER BY chapter DESC
- """,
- (entity_id,),
- )
- else: # both
- cursor.execute(
- """
- SELECT * FROM relationships
- WHERE from_entity = ? OR to_entity = ?
- ORDER BY chapter DESC
- """,
- (entity_id, entity_id),
- )
- return [dict(row) for row in cursor.fetchall()]
- def get_relationship_between(self, entity1: str, entity2: str) -> List[Dict]:
- """获取两个实体之间的所有关系"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT * FROM relationships
- WHERE (from_entity = ? AND to_entity = ?)
- OR (from_entity = ? AND to_entity = ?)
- ORDER BY chapter DESC
- """,
- (entity1, entity2, entity2, entity1),
- )
- return [dict(row) for row in cursor.fetchall()]
- def get_recent_relationships(self, limit: int = 30) -> List[Dict]:
- """获取最近建立的关系"""
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT * FROM relationships
- ORDER BY chapter DESC, id DESC
- LIMIT ?
- """,
- (limit,),
- )
- return [dict(row) for row in cursor.fetchall()]
- # ==================== v5.5 关系事件与图谱 ====================
- def _infer_relationship_polarity(self, rel_type: str) -> int:
- """基于关系类型推断极性:-1 敌对,0 中立,1 友好。"""
- t = str(rel_type or "")
- positive_keywords = ("盟友", "友好", "师徒", "同伴", "亲", "爱", "合作")
- negative_keywords = ("敌", "仇", "恨", "对立", "冲突", "背叛", "追杀")
- if any(k in t for k in negative_keywords):
- return -1
- if any(k in t for k in positive_keywords):
- return 1
- return 0
- def record_relationship_event(self, event: RelationshipEventMeta) -> int:
- """记录关系事件,返回事件 ID。"""
- from_entity = str(getattr(event, "from_entity", "") or "").strip()
- to_entity = str(getattr(event, "to_entity", "") or "").strip()
- rel_type = str(getattr(event, "type", "") or "").strip()
- if not from_entity or not to_entity or not rel_type:
- return 0
- action = str(getattr(event, "action", "update") or "update").strip().lower()
- if action not in {"create", "update", "decay", "remove"}:
- action = "update"
- try:
- chapter = int(getattr(event, "chapter", 0) or 0)
- except (TypeError, ValueError):
- return 0
- if chapter <= 0:
- return 0
- try:
- scene_index = int(getattr(event, "scene_index", 0) or 0)
- except (TypeError, ValueError):
- scene_index = 0
- raw_polarity = getattr(event, "polarity", None)
- if raw_polarity is None:
- polarity = self._infer_relationship_polarity(rel_type)
- else:
- try:
- polarity = int(raw_polarity)
- except (TypeError, ValueError):
- polarity = 0
- if polarity > 1:
- polarity = 1
- elif polarity < -1:
- polarity = -1
- try:
- strength = float(getattr(event, "strength", 0.5) or 0.5)
- except (TypeError, ValueError):
- strength = 0.5
- strength = max(0.0, min(1.0, strength))
- description = str(getattr(event, "description", "") or "").strip()
- evidence = str(getattr(event, "evidence", "") or "").strip()
- try:
- confidence = float(getattr(event, "confidence", 1.0) or 1.0)
- except (TypeError, ValueError):
- confidence = 1.0
- confidence = max(0.0, min(1.0, confidence))
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- """
- INSERT INTO relationship_events
- (from_entity, to_entity, type, action, polarity, strength, description, chapter, scene_index, evidence, confidence)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (
- from_entity,
- to_entity,
- rel_type,
- action,
- polarity,
- strength,
- description,
- chapter,
- scene_index,
- evidence,
- confidence,
- ),
- )
- conn.commit()
- return int(cursor.lastrowid or 0)
- def get_relationship_events(
- self,
- entity_id: str,
- direction: str = "both",
- from_chapter: Optional[int] = None,
- to_chapter: Optional[int] = None,
- limit: int = 100,
- ) -> List[Dict[str, Any]]:
- """按实体查询关系事件。"""
- direction = str(direction or "both").lower()
- clauses: List[str] = []
- params: List[Any] = []
- if direction == "from":
- clauses.append("from_entity = ?")
- params.append(entity_id)
- elif direction == "to":
- clauses.append("to_entity = ?")
- params.append(entity_id)
- else:
- clauses.append("(from_entity = ? OR to_entity = ?)")
- params.extend([entity_id, entity_id])
- if from_chapter is not None:
- clauses.append("chapter >= ?")
- params.append(int(from_chapter))
- if to_chapter is not None:
- clauses.append("chapter <= ?")
- params.append(int(to_chapter))
- where_sql = " AND ".join(clauses) if clauses else "1=1"
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- f"""
- SELECT * FROM relationship_events
- WHERE {where_sql}
- ORDER BY chapter DESC, id DESC
- LIMIT ?
- """,
- (*params, int(limit)),
- )
- return [dict(row) for row in cursor.fetchall()]
- def get_relationship_timeline(
- self,
- entity1: str,
- entity2: str,
- from_chapter: Optional[int] = None,
- to_chapter: Optional[int] = None,
- limit: int = 100,
- ) -> List[Dict[str, Any]]:
- """查询两个实体之间的关系时间线。"""
- clauses = [
- "((from_entity = ? AND to_entity = ?) OR (from_entity = ? AND to_entity = ?))"
- ]
- params: List[Any] = [entity1, entity2, entity2, entity1]
- if from_chapter is not None:
- clauses.append("chapter >= ?")
- params.append(int(from_chapter))
- if to_chapter is not None:
- clauses.append("chapter <= ?")
- params.append(int(to_chapter))
- where_sql = " AND ".join(clauses)
- with self._get_conn() as conn:
- cursor = conn.cursor()
- cursor.execute(
- f"""
- SELECT * FROM relationship_events
- WHERE {where_sql}
- ORDER BY chapter ASC, id ASC
- LIMIT ?
- """,
- (*params, int(limit)),
- )
- return [dict(row) for row in cursor.fetchall()]
- def _load_effective_relationship_edges(
- self,
- chapter: Optional[int] = None,
- relation_types: Optional[List[str]] = None,
- ) -> List[Dict[str, Any]]:
- """加载指定章节截面的有效关系边。"""
- relation_types = [str(t) for t in (relation_types or []) if str(t).strip()]
- with self._get_conn() as conn:
- cursor = conn.cursor()
- if chapter is None:
- clauses = []
- params: List[Any] = []
- if relation_types:
- placeholders = ",".join("?" for _ in relation_types)
- clauses.append(f"type IN ({placeholders})")
- params.extend(relation_types)
- where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else ""
- cursor.execute(
- f"""
- SELECT from_entity, to_entity, type, description, chapter
- FROM relationships
- {where_sql}
- ORDER BY chapter DESC, id DESC
- """,
- tuple(params),
- )
- rows = cursor.fetchall()
- return [
- {
- "from": str(r["from_entity"]),
- "to": str(r["to_entity"]),
- "type": str(r["type"]),
- "description": str(r["description"] or ""),
- "chapter": int(r["chapter"] or 0),
- "action": "snapshot",
- "polarity": self._infer_relationship_polarity(str(r["type"])),
- "strength": 0.5,
- "evidence": "",
- "confidence": 1.0,
- }
- for r in rows
- ]
- clauses = ["chapter <= ?"]
- params = [int(chapter)]
- if relation_types:
- placeholders = ",".join("?" for _ in relation_types)
- clauses.append(f"type IN ({placeholders})")
- params.extend(relation_types)
- cursor.execute(
- f"""
- SELECT *
- FROM relationship_events
- WHERE {' AND '.join(clauses)}
- ORDER BY chapter DESC, id DESC
- """,
- tuple(params),
- )
- event_rows = cursor.fetchall()
- # 兼容旧数据:若事件流不完整,回退 relationships 快照补边
- snapshot_clauses = ["chapter <= ?"]
- snapshot_params: List[Any] = [int(chapter)]
- if relation_types:
- placeholders = ",".join("?" for _ in relation_types)
- snapshot_clauses.append(f"type IN ({placeholders})")
- snapshot_params.extend(relation_types)
- cursor.execute(
- f"""
- SELECT from_entity, to_entity, type, description, chapter
- FROM relationships
- WHERE {' AND '.join(snapshot_clauses)}
- ORDER BY chapter DESC, id DESC
- """,
- tuple(snapshot_params),
- )
- snapshot_rows = cursor.fetchall()
- # 章节截面:相同关系只保留“最近一次事件”,remove 视为已失效。
- effective: List[Dict[str, Any]] = []
- seen: set[tuple[str, str, str]] = set()
- for row in event_rows:
- key = (
- str(row["from_entity"]),
- str(row["to_entity"]),
- str(row["type"]),
- )
- if key in seen:
- continue
- seen.add(key)
- action = str(row["action"] or "update")
- if action == "remove":
- continue
- effective.append(
- {
- "from": key[0],
- "to": key[1],
- "type": key[2],
- "description": str(row["description"] or ""),
- "chapter": int(row["chapter"] or 0),
- "action": action,
- "polarity": int(row["polarity"] or 0),
- "strength": float(row["strength"] or 0.5),
- "evidence": str(row["evidence"] or ""),
- "confidence": float(row["confidence"] or 1.0),
- }
- )
- # 事件流缺失时,从关系快照补齐(若 key 已出现则以事件为准)
- for row in snapshot_rows:
- key = (
- str(row["from_entity"]),
- str(row["to_entity"]),
- str(row["type"]),
- )
- if key in seen:
- continue
- effective.append(
- {
- "from": key[0],
- "to": key[1],
- "type": key[2],
- "description": str(row["description"] or ""),
- "chapter": int(row["chapter"] or 0),
- "action": "snapshot",
- "polarity": self._infer_relationship_polarity(key[2]),
- "strength": 0.5,
- "evidence": "",
- "confidence": 1.0,
- }
- )
- return effective
- def build_relationship_subgraph(
- self,
- center_entity: str,
- depth: int = 2,
- chapter: Optional[int] = None,
- top_edges: int = 50,
- relation_types: Optional[List[str]] = None,
- ) -> Dict[str, Any]:
- """按中心实体构建关系子图。"""
- center_entity = str(center_entity or "").strip()
- depth = max(1, int(depth or 1))
- top_edges = max(1, int(top_edges or 1))
- edges_all = self._load_effective_relationship_edges(
- chapter=chapter,
- relation_types=relation_types,
- )
- edges_all.sort(key=lambda x: int(x.get("chapter", 0)), reverse=True)
- selected_edges: List[Dict[str, Any]] = []
- selected_keys: set[tuple[str, str, str]] = set()
- visited_nodes: set[str] = {center_entity} if center_entity else set()
- frontier: set[str] = {center_entity} if center_entity else set()
- for _ in range(depth):
- if not frontier:
- break
- next_frontier: set[str] = set()
- for edge in edges_all:
- from_entity = str(edge.get("from") or "")
- to_entity = str(edge.get("to") or "")
- if from_entity not in frontier and to_entity not in frontier:
- continue
- key = (from_entity, to_entity, str(edge.get("type") or ""))
- if key in selected_keys:
- continue
- selected_keys.add(key)
- selected_edges.append(edge)
- if from_entity and from_entity not in visited_nodes:
- visited_nodes.add(from_entity)
- next_frontier.add(from_entity)
- if to_entity and to_entity not in visited_nodes:
- visited_nodes.add(to_entity)
- next_frontier.add(to_entity)
- if len(selected_edges) >= top_edges:
- break
- frontier = next_frontier
- if len(selected_edges) >= top_edges:
- break
- if center_entity and center_entity not in visited_nodes:
- visited_nodes.add(center_entity)
- # 查询节点详情
- entity_map: Dict[str, Dict[str, Any]] = {}
- if visited_nodes:
- with self._get_conn() as conn:
- cursor = conn.cursor()
- placeholders = ",".join("?" for _ in visited_nodes)
- cursor.execute(
- f"""
- SELECT id, canonical_name, type, tier, last_appearance
- FROM entities
- WHERE id IN ({placeholders})
- """,
- tuple(visited_nodes),
- )
- for row in cursor.fetchall():
- entity_map[str(row["id"])] = {
- "id": str(row["id"]),
- "name": str(row["canonical_name"] or row["id"]),
- "type": str(row["type"] or "未知"),
- "tier": str(row["tier"] or "装饰"),
- "last_appearance": int(row["last_appearance"] or 0),
- }
- nodes: List[Dict[str, Any]] = []
- for entity_id in sorted(
- visited_nodes,
- key=lambda eid: (
- 0 if eid == center_entity else 1,
- -(entity_map.get(eid, {}).get("last_appearance", 0)),
- eid,
- ),
- ):
- if entity_id in entity_map:
- nodes.append(entity_map[entity_id])
- else:
- nodes.append(
- {
- "id": entity_id,
- "name": entity_id or "未知",
- "type": "未知",
- "tier": "装饰",
- "last_appearance": 0,
- }
- )
- return {
- "center": center_entity,
- "depth": depth,
- "chapter": chapter,
- "nodes": nodes,
- "edges": selected_edges[:top_edges],
- "generated_at": datetime.now().isoformat(timespec="seconds"),
- }
- def _sanitize_mermaid_node_id(self, raw_id: str) -> str:
- safe = re.sub(r"[^0-9a-zA-Z_]", "_", str(raw_id or "node"))
- if not safe:
- safe = "node"
- if safe[0].isdigit():
- safe = f"n_{safe}"
- return safe
- def render_relationship_subgraph_mermaid(self, graph: Dict[str, Any]) -> str:
- """将关系子图渲染为 Mermaid。"""
- lines = ["```mermaid", "graph LR"]
- nodes = graph.get("nodes") or []
- edges = graph.get("edges") or []
- if not nodes:
- lines.append(" EMPTY[暂无关系数据]")
- lines.append("```")
- return "\n".join(lines)
- node_alias: Dict[str, str] = {}
- for node in nodes:
- entity_id = str(node.get("id") or "")
- if not entity_id:
- continue
- alias = self._sanitize_mermaid_node_id(entity_id)
- node_alias[entity_id] = alias
- label = str(node.get("name") or entity_id).replace('"', "'")
- lines.append(f' {alias}["{label}"]')
- for edge in edges:
- from_entity = str(edge.get("from") or "")
- to_entity = str(edge.get("to") or "")
- if from_entity not in node_alias or to_entity not in node_alias:
- continue
- edge_type = str(edge.get("type") or "关联")
- chapter = edge.get("chapter")
- chapter_suffix = f"@{chapter}" if chapter not in (None, "") else ""
- label = f"{edge_type}{chapter_suffix}".replace('"', "'")
- try:
- polarity = int(edge.get("polarity", 0) or 0)
- except (TypeError, ValueError):
- polarity = 0
- if polarity < 0:
- connector = "-.->"
- else:
- connector = "-->"
- lines.append(
- f" {node_alias[from_entity]} {connector}|{label}| {node_alias[to_entity]}"
- )
- lines.append("```")
- return "\n".join(lines)
- # ==================== v5.3 Override Contract 操作 ====================
- def update_entity_field(self, entity_id: str, field: str, value: Any) -> bool:
- """Compatibility helper to update a single entity field in current_json."""
- return self.update_entity_current(entity_id, {field: value})
|