| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- State Manager - 状态管理模块 (v5.4)
- 管理 state.json 的读写操作:
- - 实体状态管理
- - 进度追踪
- - 关系记录
- v5.1 变更(v5.4 沿用):
- - 集成 SQLStateManager,同步写入 SQLite (index.db)
- - state.json 保留精简数据,大数据自动迁移到 SQLite
- """
- import json
- import logging
- import sys
- from copy import deepcopy
- from pathlib import Path
- from runtime_compat import enable_windows_utf8_stdio
- from typing import Dict, List, Optional, Any
- from dataclasses import dataclass, field, asdict
- from datetime import datetime
- import filelock
- from .config import get_config
- from .observability import safe_log_tool_call
- logger = logging.getLogger(__name__)
- try:
- # 当 scripts 目录在 sys.path 中(常见:从 scripts/ 运行)
- from security_utils import atomic_write_json, read_json_safe
- except ImportError: # pragma: no cover
- # 当以 `python -m scripts.data_modules...` 从仓库根目录运行
- from scripts.security_utils import atomic_write_json, read_json_safe
- @dataclass
- class EntityState:
- """实体状态"""
- id: str
- name: str
- type: str # 角色/地点/物品/势力
- tier: str = "装饰" # 核心/重要/次要/装饰
- aliases: List[str] = field(default_factory=list)
- attributes: Dict[str, Any] = field(default_factory=dict)
- first_appearance: int = 0
- last_appearance: int = 0
- @dataclass
- class Relationship:
- """实体关系"""
- from_entity: str
- to_entity: str
- type: str
- description: str
- chapter: int
- @dataclass
- class StateChange:
- """状态变化记录"""
- entity_id: str
- field: str
- old_value: Any
- new_value: Any
- reason: str
- chapter: int
- timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
- @dataclass
- class _EntityPatch:
- """待写入的实体增量补丁(用于锁内合并)"""
- entity_type: str
- entity_id: str
- replace: bool = False
- base_entity: Optional[Dict[str, Any]] = None # 新建实体时的完整快照(用于填充缺失字段)
- top_updates: Dict[str, Any] = field(default_factory=dict)
- current_updates: Dict[str, Any] = field(default_factory=dict)
- appearance_chapter: Optional[int] = None
- class StateManager:
- """状态管理器(v5.1 entities_v3 格式 + SQLite 同步,v5.4 沿用)"""
- # v5.0 引入的实体类型
- ENTITY_TYPES = ["角色", "地点", "物品", "势力", "招式"]
- def __init__(self, config=None, enable_sqlite_sync: bool = True):
- """
- 初始化状态管理器
- 参数:
- - config: 配置对象
- - enable_sqlite_sync: 是否启用 SQLite 同步 (默认 True)
- """
- self.config = config or get_config()
- self._state: Dict[str, Any] = {}
- # 与 security_utils.atomic_write_json 保持一致:state.json.lock
- self._lock_path = self.config.state_file.with_suffix(self.config.state_file.suffix + ".lock")
- # v5.1 引入: SQLite 同步
- self._enable_sqlite_sync = enable_sqlite_sync
- self._sql_state_manager = None
- if enable_sqlite_sync:
- try:
- from .sql_state_manager import SQLStateManager
- self._sql_state_manager = SQLStateManager(self.config)
- except ImportError:
- pass # SQLStateManager 不可用时静默降级
- # 待写入的增量(锁内重读 + 合并 + 写入)
- self._pending_entity_patches: Dict[tuple[str, str], _EntityPatch] = {}
- self._pending_alias_entries: Dict[str, List[Dict[str, str]]] = {}
- self._pending_state_changes: List[Dict[str, Any]] = []
- self._pending_structured_relationships: List[Dict[str, Any]] = []
- self._pending_disambiguation_warnings: List[Dict[str, Any]] = []
- self._pending_disambiguation_pending: List[Dict[str, Any]] = []
- self._pending_progress_chapter: Optional[int] = None
- self._pending_progress_words_delta: int = 0
- self._pending_chapter_meta: Dict[str, Any] = {}
- # v5.1 引入: 缓存待同步到 SQLite 的数据
- self._pending_sqlite_data: Dict[str, Any] = {
- "entities_appeared": [],
- "entities_new": [],
- "state_changes": [],
- "relationships_new": [],
- "chapter": None
- }
- self._load_state()
- def _now_progress_timestamp(self) -> str:
- return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- def _ensure_state_schema(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """确保 state.json 具备运行所需的关键字段(尽量不破坏既有数据)。"""
- if not isinstance(state, dict):
- state = {}
- state.setdefault("project_info", {})
- state.setdefault("progress", {})
- state.setdefault("protagonist_state", {})
- # relationships: 旧版本可能是 list(实体关系),v5.0 运行态用 dict(人物关系/重要关系)
- relationships = state.get("relationships")
- if isinstance(relationships, list):
- state.setdefault("structured_relationships", [])
- if isinstance(state.get("structured_relationships"), list):
- state["structured_relationships"].extend(relationships)
- state["relationships"] = {}
- elif not isinstance(relationships, dict):
- state["relationships"] = {}
- state.setdefault("world_settings", {"power_system": [], "factions": [], "locations": []})
- state.setdefault("plot_threads", {"active_threads": [], "foreshadowing": []})
- state.setdefault("review_checkpoints", [])
- state.setdefault("chapter_meta", {})
- state.setdefault(
- "strand_tracker",
- {
- "last_quest_chapter": 0,
- "last_fire_chapter": 0,
- "last_constellation_chapter": 0,
- "current_dominant": "quest",
- "chapters_since_switch": 0,
- "history": [],
- },
- )
- entities_v3 = state.get("entities_v3")
- # v5.1 引入: entities_v3, alias_index, state_changes, structured_relationships 已迁移到 index.db
- # 不再在 state.json 中初始化或维护这些字段
- if not isinstance(state.get("disambiguation_warnings"), list):
- state["disambiguation_warnings"] = []
- if not isinstance(state.get("disambiguation_pending"), list):
- state["disambiguation_pending"] = []
- # progress 基础字段
- progress = state["progress"]
- if not isinstance(progress, dict):
- progress = {}
- state["progress"] = progress
- progress.setdefault("current_chapter", 0)
- progress.setdefault("total_words", 0)
- progress.setdefault("last_updated", self._now_progress_timestamp())
- return state
- def _load_state(self):
- """加载状态文件"""
- if self.config.state_file.exists():
- self._state = read_json_safe(self.config.state_file, default={})
- self._state = self._ensure_state_schema(self._state)
- else:
- self._state = self._ensure_state_schema({})
- def save_state(self):
- """
- 保存状态文件(锁内重读 + 合并 + 原子写入)。
- 解决多 Agent 并行下的“读-改-写覆盖”风险:
- - 获取锁
- - 重新读取磁盘最新 state.json
- - 仅合并本实例产生的增量(pending_*)
- - 原子化写入
- """
- # 无增量时不写入,避免无意义覆盖
- has_pending = any(
- [
- self._pending_entity_patches,
- self._pending_alias_entries,
- self._pending_state_changes,
- self._pending_structured_relationships,
- self._pending_disambiguation_warnings,
- self._pending_disambiguation_pending,
- self._pending_chapter_meta,
- self._pending_progress_chapter is not None,
- self._pending_progress_words_delta != 0,
- ]
- )
- if not has_pending:
- return
- self.config.ensure_dirs()
- lock = filelock.FileLock(str(self._lock_path), timeout=10)
- try:
- with lock:
- disk_state = read_json_safe(self.config.state_file, default={})
- disk_state = self._ensure_state_schema(disk_state)
- # progress(合并为 max(chapter) + words_delta 累加)
- if self._pending_progress_chapter is not None or self._pending_progress_words_delta != 0:
- progress = disk_state.get("progress", {})
- if not isinstance(progress, dict):
- progress = {}
- disk_state["progress"] = progress
- try:
- current_chapter = int(progress.get("current_chapter", 0) or 0)
- except (TypeError, ValueError):
- current_chapter = 0
- if self._pending_progress_chapter is not None:
- progress["current_chapter"] = max(current_chapter, int(self._pending_progress_chapter))
- if self._pending_progress_words_delta:
- try:
- total_words = int(progress.get("total_words", 0) or 0)
- except (TypeError, ValueError):
- total_words = 0
- progress["total_words"] = total_words + int(self._pending_progress_words_delta)
- progress["last_updated"] = self._now_progress_timestamp()
- # v5.1 引入: 强制使用 SQLite 模式,移除大数据字段
- # 确保 state.json 中不存在这些膨胀字段
- for field in ["entities_v3", "alias_index", "state_changes", "structured_relationships"]:
- disk_state.pop(field, None)
- # 标记已迁移
- disk_state["_migrated_to_sqlite"] = True
- # disambiguation_warnings(追加去重 + 截断)
- if self._pending_disambiguation_warnings:
- warnings_list = disk_state.get("disambiguation_warnings")
- if not isinstance(warnings_list, list):
- warnings_list = []
- disk_state["disambiguation_warnings"] = warnings_list
- def _warn_key(w: Dict[str, Any]) -> tuple:
- return (
- w.get("chapter"),
- w.get("mention"),
- w.get("chosen_id"),
- w.get("confidence"),
- )
- existing_keys = {_warn_key(w) for w in warnings_list if isinstance(w, dict)}
- for w in self._pending_disambiguation_warnings:
- if not isinstance(w, dict):
- continue
- k = _warn_key(w)
- if k in existing_keys:
- continue
- warnings_list.append(w)
- existing_keys.add(k)
- # 只保留最近 N 条,避免文件无限增长
- max_keep = self.config.max_disambiguation_warnings
- if len(warnings_list) > max_keep:
- disk_state["disambiguation_warnings"] = warnings_list[-max_keep:]
- # disambiguation_pending(追加去重 + 截断)
- if self._pending_disambiguation_pending:
- pending_list = disk_state.get("disambiguation_pending")
- if not isinstance(pending_list, list):
- pending_list = []
- disk_state["disambiguation_pending"] = pending_list
- def _pending_key(w: Dict[str, Any]) -> tuple:
- return (
- w.get("chapter"),
- w.get("mention"),
- w.get("suggested_id"),
- w.get("confidence"),
- )
- existing_keys = {_pending_key(w) for w in pending_list if isinstance(w, dict)}
- for w in self._pending_disambiguation_pending:
- if not isinstance(w, dict):
- continue
- k = _pending_key(w)
- if k in existing_keys:
- continue
- pending_list.append(w)
- existing_keys.add(k)
- max_keep = self.config.max_disambiguation_pending
- if len(pending_list) > max_keep:
- disk_state["disambiguation_pending"] = pending_list[-max_keep:]
- # chapter_meta(新增:按章节号覆盖写入)
- if self._pending_chapter_meta:
- chapter_meta = disk_state.get("chapter_meta")
- if not isinstance(chapter_meta, dict):
- chapter_meta = {}
- disk_state["chapter_meta"] = chapter_meta
- chapter_meta.update(self._pending_chapter_meta)
- # 原子写入(锁已持有,不再二次加锁)
- atomic_write_json(self.config.state_file, disk_state, use_lock=False, backup=True)
- # v5.1 引入: 同步到 SQLite(失败时保留 pending 以便重试)
- sqlite_pending_snapshot = self._snapshot_sqlite_pending()
- sqlite_sync_ok = self._sync_to_sqlite()
- # 同步内存为磁盘最新快照
- self._state = disk_state
- # state.json 侧 pending 已写盘,直接清空
- self._pending_disambiguation_warnings.clear()
- self._pending_disambiguation_pending.clear()
- self._pending_chapter_meta.clear()
- self._pending_progress_chapter = None
- self._pending_progress_words_delta = 0
- # SQLite 侧 pending:成功后清空,失败则恢复快照(避免静默丢数据)
- if sqlite_sync_ok:
- self._pending_entity_patches.clear()
- self._pending_alias_entries.clear()
- self._pending_state_changes.clear()
- self._pending_structured_relationships.clear()
- self._clear_pending_sqlite_data()
- else:
- self._restore_sqlite_pending(sqlite_pending_snapshot)
- except filelock.Timeout:
- raise RuntimeError("无法获取 state.json 文件锁,请稍后重试")
- def _sync_to_sqlite(self) -> bool:
- """同步待处理数据到 SQLite(v5.1 引入,v5.4 沿用)"""
- if not self._sql_state_manager:
- return True
- # 方式1: 通过 process_chapter_result 收集的数据
- sqlite_data = self._pending_sqlite_data
- chapter = sqlite_data.get("chapter")
- # 记录已处理的 (entity_id, chapter) 组合,避免重复写入 appearances
- processed_appearances = set()
- if chapter is not None:
- try:
- self._sql_state_manager.process_chapter_entities(
- chapter=chapter,
- entities_appeared=sqlite_data.get("entities_appeared", []),
- entities_new=sqlite_data.get("entities_new", []),
- state_changes=sqlite_data.get("state_changes", []),
- relationships_new=sqlite_data.get("relationships_new", [])
- )
- # 标记已处理的出场记录
- for entity in sqlite_data.get("entities_appeared", []):
- if entity.get("id"):
- processed_appearances.add((entity.get("id"), chapter))
- for entity in sqlite_data.get("entities_new", []):
- eid = entity.get("suggested_id") or entity.get("id")
- if eid:
- processed_appearances.add((eid, chapter))
- except Exception as exc:
- logger.warning("SQLite sync failed (process_chapter_entities): %s", exc)
- return False
- # 方式2: 使用 add_entity/update_entity 收集的增量数据。
- # 数据缓存在 _pending_entity_patches 等变量中。
- return self._sync_pending_patches_to_sqlite(processed_appearances)
- def _sync_pending_patches_to_sqlite(self, processed_appearances: set = None) -> bool:
- """同步 _pending_entity_patches 等到 SQLite(v5.1 引入,v5.4 沿用)
- Args:
- processed_appearances: 已通过 process_chapter_entities 处理的 (entity_id, chapter) 集合,
- 用于避免重复写入 appearances 表(防止覆盖 mentions)
- """
- if not self._sql_state_manager:
- return True
- if processed_appearances is None:
- processed_appearances = set()
- # 元数据字段(不应写入 current_json)
- METADATA_FIELDS = {"canonical_name", "tier", "desc", "is_protagonist", "is_archived"}
- try:
- from .sql_state_manager import EntityData
- from .index_manager import EntityMeta
- # 同步实体补丁
- for (entity_type, entity_id), patch in self._pending_entity_patches.items():
- if patch.base_entity:
- # 新实体
- entity_data = EntityData(
- id=entity_id,
- type=entity_type,
- name=patch.base_entity.get("canonical_name", entity_id),
- tier=patch.base_entity.get("tier", "装饰"),
- desc=patch.base_entity.get("desc", ""),
- current=patch.base_entity.get("current", {}),
- aliases=[],
- first_appearance=patch.base_entity.get("first_appearance", 0),
- last_appearance=patch.base_entity.get("last_appearance", 0),
- is_protagonist=patch.base_entity.get("is_protagonist", False)
- )
- self._sql_state_manager.upsert_entity(entity_data)
- # 记录首次出场(跳过已处理的,避免覆盖 mentions)
- if patch.appearance_chapter is not None:
- if (entity_id, patch.appearance_chapter) not in processed_appearances:
- self._sql_state_manager._index_manager.record_appearance(
- entity_id=entity_id,
- chapter=patch.appearance_chapter,
- mentions=[entity_data.name],
- confidence=1.0,
- skip_if_exists=True # 关键:不覆盖已有记录
- )
- else:
- # 更新现有实体
- has_metadata_updates = bool(patch.top_updates and
- any(k in METADATA_FIELDS for k in patch.top_updates))
- # 非元数据的 top_updates 应该当作 current 更新
- # 例如:realm, layer, location 等状态字段
- non_metadata_top_updates = {
- k: v for k, v in patch.top_updates.items()
- if k not in METADATA_FIELDS
- } if patch.top_updates else {}
- # 合并 current_updates 和非元数据的 top_updates
- effective_current_updates = {**non_metadata_top_updates}
- if patch.current_updates:
- effective_current_updates.update(patch.current_updates)
- if has_metadata_updates:
- # 有元数据更新:使用 upsert_entity(update_metadata=True)
- existing = self._sql_state_manager.get_entity(entity_id)
- if existing:
- # 合并 current
- current = existing.get("current_json", {})
- if isinstance(current, str):
- import json
- current = json.loads(current) if current else {}
- if effective_current_updates:
- current.update(effective_current_updates)
- new_canonical_name = patch.top_updates.get("canonical_name")
- old_canonical_name = existing.get("canonical_name", "")
- entity_meta = EntityMeta(
- id=entity_id,
- type=existing.get("type", entity_type),
- canonical_name=new_canonical_name or old_canonical_name,
- tier=patch.top_updates.get("tier", existing.get("tier", "装饰")),
- desc=patch.top_updates.get("desc", existing.get("desc", "")),
- current=current,
- first_appearance=existing.get("first_appearance", 0),
- last_appearance=patch.appearance_chapter or existing.get("last_appearance", 0),
- is_protagonist=patch.top_updates.get("is_protagonist", existing.get("is_protagonist", False)),
- is_archived=patch.top_updates.get("is_archived", existing.get("is_archived", False))
- )
- self._sql_state_manager._index_manager.upsert_entity(entity_meta, update_metadata=True)
- # 如果 canonical_name 改名,自动注册新名字为 alias
- if new_canonical_name and new_canonical_name != old_canonical_name:
- self._sql_state_manager.register_alias(
- new_canonical_name, entity_id, existing.get("type", entity_type)
- )
- elif effective_current_updates:
- # 只有 current 更新(包括非元数据的 top_updates)
- self._sql_state_manager.update_entity_current(entity_id, effective_current_updates)
- # 更新 last_appearance 并记录出场
- if patch.appearance_chapter is not None:
- self._sql_state_manager._update_last_appearance(entity_id, patch.appearance_chapter)
- # 补充 appearances 记录
- # 使用 skip_if_exists=True 避免覆盖已有记录的 mentions
- if (entity_id, patch.appearance_chapter) not in processed_appearances:
- self._sql_state_manager._index_manager.record_appearance(
- entity_id=entity_id,
- chapter=patch.appearance_chapter,
- mentions=[],
- confidence=1.0,
- skip_if_exists=True # 关键:不覆盖已有记录
- )
- # 同步别名
- for alias, entries in self._pending_alias_entries.items():
- for entry in entries:
- entity_type = entry.get("type")
- entity_id = entry.get("id")
- if entity_type and entity_id:
- self._sql_state_manager.register_alias(alias, entity_id, entity_type)
- # 同步状态变化
- for change in self._pending_state_changes:
- self._sql_state_manager.record_state_change(
- entity_id=change.get("entity_id", ""),
- field=change.get("field", ""),
- old_value=change.get("old", change.get("old_value", "")),
- new_value=change.get("new", change.get("new_value", "")),
- reason=change.get("reason", ""),
- chapter=change.get("chapter", 0)
- )
- # 同步关系
- for rel in self._pending_structured_relationships:
- self._sql_state_manager.upsert_relationship(
- from_entity=rel.get("from_entity", ""),
- to_entity=rel.get("to_entity", ""),
- type=rel.get("type", "相识"),
- description=rel.get("description", ""),
- chapter=rel.get("chapter", 0)
- )
- return True
- except Exception as e:
- # SQLite 同步失败时记录警告(不中断主流程)
- logger.warning("SQLite sync failed: %s", e)
- return False
- def _snapshot_sqlite_pending(self) -> Dict[str, Any]:
- """抓取 SQLite 侧 pending 快照,用于同步失败回滚内存队列。"""
- return {
- "entity_patches": deepcopy(self._pending_entity_patches),
- "alias_entries": deepcopy(self._pending_alias_entries),
- "state_changes": deepcopy(self._pending_state_changes),
- "structured_relationships": deepcopy(self._pending_structured_relationships),
- "sqlite_data": deepcopy(self._pending_sqlite_data),
- }
- def _restore_sqlite_pending(self, snapshot: Dict[str, Any]) -> None:
- """恢复 SQLite 侧 pending 快照,避免同步失败后数据静默丢失。"""
- self._pending_entity_patches = snapshot.get("entity_patches", {})
- self._pending_alias_entries = snapshot.get("alias_entries", {})
- self._pending_state_changes = snapshot.get("state_changes", [])
- self._pending_structured_relationships = snapshot.get("structured_relationships", [])
- self._pending_sqlite_data = snapshot.get("sqlite_data", {
- "entities_appeared": [],
- "entities_new": [],
- "state_changes": [],
- "relationships_new": [],
- "chapter": None,
- })
- def _clear_pending_sqlite_data(self):
- """清空待同步的 SQLite 数据"""
- self._pending_sqlite_data = {
- "entities_appeared": [],
- "entities_new": [],
- "state_changes": [],
- "relationships_new": [],
- "chapter": None
- }
- # ==================== 进度管理 ====================
- def get_current_chapter(self) -> int:
- """获取当前章节号"""
- return self._state.get("progress", {}).get("current_chapter", 0)
- def update_progress(self, chapter: int, words: int = 0):
- """更新进度"""
- if "progress" not in self._state:
- self._state["progress"] = {}
- self._state["progress"]["current_chapter"] = chapter
- if words > 0:
- total = self._state["progress"].get("total_words", 0)
- self._state["progress"]["total_words"] = total + words
- # 记录增量:锁内合并时用 max(chapter) + words_delta 累加
- if self._pending_progress_chapter is None:
- self._pending_progress_chapter = chapter
- else:
- self._pending_progress_chapter = max(self._pending_progress_chapter, chapter)
- if words > 0:
- self._pending_progress_words_delta += int(words)
- # ==================== 实体管理 (v5.1 SQLite-first) ====================
- def get_entity(self, entity_id: str, entity_type: str = None) -> Optional[Dict]:
- """获取实体(v5.1 引入:优先从 SQLite 读取)"""
- # v5.1 引入: 优先从 SQLite 读取
- if self._sql_state_manager:
- entity = self._sql_state_manager._index_manager.get_entity(entity_id)
- if entity:
- return entity
- # 回退到内存 state (兼容未迁移场景)
- entities_v3 = self._state.get("entities_v3", {})
- if entity_type:
- return entities_v3.get(entity_type, {}).get(entity_id)
- # 遍历所有类型查找
- for type_name, entities in entities_v3.items():
- if entity_id in entities:
- return entities[entity_id]
- return None
- def get_entity_type(self, entity_id: str) -> Optional[str]:
- """获取实体所属类型"""
- # v5.1 引入: 优先从 SQLite 读取
- if self._sql_state_manager:
- entity = self._sql_state_manager._index_manager.get_entity(entity_id)
- if entity:
- return entity.get("type")
- # 回退到内存 state
- for type_name, entities in self._state.get("entities_v3", {}).items():
- if entity_id in entities:
- return type_name
- return None
- def get_all_entities(self) -> Dict[str, Dict]:
- """获取所有实体(扁平化视图)"""
- # v5.1 引入: 优先从 SQLite 读取
- if self._sql_state_manager:
- result = {}
- for entity_type in self.ENTITY_TYPES:
- entities = self._sql_state_manager._index_manager.get_entities_by_type(entity_type)
- for e in entities:
- eid = e.get("id")
- if eid:
- result[eid] = {**e, "type": entity_type}
- if result:
- return result
- # 回退到内存 state
- result = {}
- for type_name, entities in self._state.get("entities_v3", {}).items():
- for eid, e in entities.items():
- result[eid] = {**e, "type": type_name}
- return result
- def get_entities_by_type(self, entity_type: str) -> Dict[str, Dict]:
- """按类型获取实体"""
- # v5.1 引入: 优先从 SQLite 读取
- if self._sql_state_manager:
- entities = self._sql_state_manager._index_manager.get_entities_by_type(entity_type)
- if entities:
- return {e.get("id"): e for e in entities if e.get("id")}
- # 回退到内存 state
- return self._state.get("entities_v3", {}).get(entity_type, {})
- def get_entities_by_tier(self, tier: str) -> Dict[str, Dict]:
- """按层级获取实体"""
- # v5.1 引入: 优先从 SQLite 读取
- if self._sql_state_manager:
- result = {}
- for entity_type in self.ENTITY_TYPES:
- entities = self._sql_state_manager._index_manager.get_entities_by_tier(tier)
- for e in entities:
- eid = e.get("id")
- if eid and e.get("type") == entity_type:
- result[eid] = {**e, "type": entity_type}
- if result:
- return result
- # 回退到内存 state
- result = {}
- for type_name, entities in self._state.get("entities_v3", {}).items():
- for eid, e in entities.items():
- if e.get("tier") == tier:
- result[eid] = {**e, "type": type_name}
- return result
- def add_entity(self, entity: EntityState) -> bool:
- """添加新实体(v5.0 entities_v3 格式,v5.4 沿用)"""
- entity_type = entity.type
- if entity_type not in self.ENTITY_TYPES:
- entity_type = "角色"
- if "entities_v3" not in self._state:
- self._state["entities_v3"] = {t: {} for t in self.ENTITY_TYPES}
- if entity_type not in self._state["entities_v3"]:
- self._state["entities_v3"][entity_type] = {}
- # 检查是否已存在
- if entity.id in self._state["entities_v3"][entity_type]:
- return False
- # 转换为 v3 格式
- v3_entity = {
- "canonical_name": entity.name,
- "tier": entity.tier,
- "desc": "",
- "current": entity.attributes,
- "first_appearance": entity.first_appearance,
- "last_appearance": entity.last_appearance,
- "history": []
- }
- self._state["entities_v3"][entity_type][entity.id] = v3_entity
- # 记录实体补丁(新建:仅填充缺失字段,避免覆盖并发写入)
- patch = self._pending_entity_patches.get((entity_type, entity.id))
- if patch is None:
- patch = _EntityPatch(entity_type=entity_type, entity_id=entity.id)
- self._pending_entity_patches[(entity_type, entity.id)] = patch
- patch.replace = True
- patch.base_entity = v3_entity
- # v5.1 引入: 注册别名到 index.db (通过 SQLStateManager)
- if self._sql_state_manager:
- self._sql_state_manager._index_manager.register_alias(entity.name, entity.id, entity_type)
- for alias in entity.aliases:
- if alias:
- self._sql_state_manager._index_manager.register_alias(alias, entity.id, entity_type)
- return True
- def _register_alias_internal(self, entity_id: str, entity_type: str, alias: str):
- """内部方法:注册别名到 index.db(v5.1 引入)"""
- if not alias:
- return
- # v5.1 引入: 直接写入 SQLite
- if self._sql_state_manager:
- self._sql_state_manager._index_manager.register_alias(alias, entity_id, entity_type)
- def update_entity(self, entity_id: str, updates: Dict[str, Any], entity_type: str = None) -> bool:
- """更新实体属性(v5.0 引入,v5.4 沿用)"""
- # 查找实体
- if entity_type:
- if entity_id not in self._state.get("entities_v3", {}).get(entity_type, {}):
- return False
- entity = self._state["entities_v3"][entity_type][entity_id]
- else:
- entity_type = self.get_entity_type(entity_id)
- if not entity_type:
- return False
- entity = self._state["entities_v3"][entity_type][entity_id]
- for key, value in updates.items():
- if key == "attributes" and isinstance(value, dict):
- # v5.0 引入: attributes 存在 current 字段
- if "current" not in entity:
- entity["current"] = {}
- entity["current"].update(value)
- # 记录补丁(current 增量)
- patch = self._pending_entity_patches.get((entity_type, entity_id))
- if patch is None:
- patch = _EntityPatch(entity_type=entity_type, entity_id=entity_id)
- self._pending_entity_patches[(entity_type, entity_id)] = patch
- patch.current_updates.update(value)
- elif key == "current" and isinstance(value, dict):
- if "current" not in entity:
- entity["current"] = {}
- entity["current"].update(value)
- patch = self._pending_entity_patches.get((entity_type, entity_id))
- if patch is None:
- patch = _EntityPatch(entity_type=entity_type, entity_id=entity_id)
- self._pending_entity_patches[(entity_type, entity_id)] = patch
- patch.current_updates.update(value)
- else:
- entity[key] = value
- patch = self._pending_entity_patches.get((entity_type, entity_id))
- if patch is None:
- patch = _EntityPatch(entity_type=entity_type, entity_id=entity_id)
- self._pending_entity_patches[(entity_type, entity_id)] = patch
- patch.top_updates[key] = value
- return True
- def update_entity_appearance(self, entity_id: str, chapter: int, entity_type: str = None):
- """更新实体出场章节"""
- if not entity_type:
- entity_type = self.get_entity_type(entity_id)
- if not entity_type:
- return
- entities_v3 = self._state.get("entities_v3")
- if not isinstance(entities_v3, dict):
- entities_v3 = {t: {} for t in self.ENTITY_TYPES}
- self._state["entities_v3"] = entities_v3
- entities_v3.setdefault(entity_type, {})
- entity = entities_v3[entity_type].get(entity_id)
- if entity:
- if entity.get("first_appearance", 0) == 0:
- entity["first_appearance"] = chapter
- entity["last_appearance"] = chapter
- # 记录补丁:锁内应用 first=min(non-zero), last=max
- patch = self._pending_entity_patches.get((entity_type, entity_id))
- if patch is None:
- patch = _EntityPatch(entity_type=entity_type, entity_id=entity_id)
- self._pending_entity_patches[(entity_type, entity_id)] = patch
- if patch.appearance_chapter is None:
- patch.appearance_chapter = chapter
- else:
- patch.appearance_chapter = max(int(patch.appearance_chapter), int(chapter))
- # ==================== 状态变化记录 ====================
- def record_state_change(
- self,
- entity_id: str,
- field: str,
- old_value: Any,
- new_value: Any,
- reason: str,
- chapter: int
- ):
- """记录状态变化"""
- if "state_changes" not in self._state:
- self._state["state_changes"] = []
- change = StateChange(
- entity_id=entity_id,
- field=field,
- old_value=old_value,
- new_value=new_value,
- reason=reason,
- chapter=chapter
- )
- change_dict = asdict(change)
- self._state["state_changes"].append(change_dict)
- self._pending_state_changes.append(change_dict)
- # 同时更新实体属性
- self.update_entity(entity_id, {"attributes": {field: new_value}})
- def get_state_changes(self, entity_id: Optional[str] = None) -> List[Dict]:
- """获取状态变化历史"""
- changes = self._state.get("state_changes", [])
- if entity_id:
- changes = [c for c in changes if c.get("entity_id") == entity_id]
- return changes
- # ==================== 关系管理 ====================
- def add_relationship(
- self,
- from_entity: str,
- to_entity: str,
- rel_type: str,
- description: str,
- chapter: int
- ):
- """添加关系"""
- rel = Relationship(
- from_entity=from_entity,
- to_entity=to_entity,
- type=rel_type,
- description=description,
- chapter=chapter
- )
- # v5.0 引入: 实体关系存入 structured_relationships,避免与 relationships(人物关系字典) 冲突
- if "structured_relationships" not in self._state:
- self._state["structured_relationships"] = []
- rel_dict = asdict(rel)
- self._state["structured_relationships"].append(rel_dict)
- self._pending_structured_relationships.append(rel_dict)
- def get_relationships(self, entity_id: Optional[str] = None) -> List[Dict]:
- """获取关系列表"""
- rels = self._state.get("structured_relationships", [])
- if entity_id:
- rels = [
- r for r in rels
- if r.get("from_entity") == entity_id or r.get("to_entity") == entity_id
- ]
- return rels
- # ==================== 批量操作 ====================
- def _record_disambiguation(self, chapter: int, uncertain_items: Any) -> List[str]:
- """
- 记录消歧反馈到 state.json,便于 Writer/Context Agent 感知风险。
- 约定:
- - >= extraction_confidence_medium:写入 disambiguation_warnings(采用但警告)
- - < extraction_confidence_medium:写入 disambiguation_pending(需人工确认)
- """
- if not isinstance(uncertain_items, list) or not uncertain_items:
- return []
- warnings: List[str] = []
- now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- for item in uncertain_items:
- if not isinstance(item, dict):
- continue
- mention = str(item.get("mention", "") or "").strip()
- if not mention:
- continue
- raw_conf = item.get("confidence", 0.0)
- try:
- confidence = float(raw_conf)
- except (TypeError, ValueError):
- confidence = 0.0
- # 候选:支持 [{"type","id"}...] 或 ["id1","id2"] 两种形式
- candidates_raw = item.get("candidates", [])
- candidates: List[Dict[str, str]] = []
- if isinstance(candidates_raw, list):
- for c in candidates_raw:
- if isinstance(c, dict):
- cid = str(c.get("id", "") or "").strip()
- ctype = str(c.get("type", "") or "").strip()
- entry: Dict[str, str] = {}
- if ctype:
- entry["type"] = ctype
- if cid:
- entry["id"] = cid
- if entry:
- candidates.append(entry)
- else:
- cid = str(c).strip()
- if cid:
- candidates.append({"id": cid})
- entity_type = str(item.get("type", "") or "").strip()
- suggested_id = str(item.get("suggested", "") or "").strip()
- adopted_raw = item.get("adopted", None)
- chosen_id = ""
- if isinstance(adopted_raw, str):
- chosen_id = adopted_raw.strip()
- elif adopted_raw is True:
- chosen_id = suggested_id
- else:
- # 兼容字段名:entity_id / chosen_id
- chosen_id = str(item.get("entity_id") or item.get("chosen_id") or "").strip() or suggested_id
- context = str(item.get("context", "") or "").strip()
- note = str(item.get("warning", "") or "").strip()
- record: Dict[str, Any] = {
- "chapter": int(chapter),
- "mention": mention,
- "type": entity_type,
- "suggested_id": suggested_id,
- "chosen_id": chosen_id,
- "confidence": confidence,
- "candidates": candidates,
- "context": context,
- "note": note,
- "created_at": now,
- }
- if confidence >= float(self.config.extraction_confidence_medium):
- self._state.setdefault("disambiguation_warnings", []).append(record)
- self._pending_disambiguation_warnings.append(record)
- chosen_part = f" → {chosen_id}" if chosen_id else ""
- warnings.append(f"消歧警告: {mention}{chosen_part} (confidence: {confidence:.2f})")
- else:
- self._state.setdefault("disambiguation_pending", []).append(record)
- self._pending_disambiguation_pending.append(record)
- warnings.append(f"消歧需人工确认: {mention} (confidence: {confidence:.2f})")
- return warnings
- def process_chapter_result(self, chapter: int, result: Dict) -> List[str]:
- """
- 处理 Data Agent 的章节处理结果(v5.1 引入,v5.4 沿用)
- 输入格式:
- - entities_appeared: 出场实体列表
- - entities_new: 新实体列表
- - state_changes: 状态变化列表
- - relationships_new: 新关系列表
- 返回警告列表
- """
- warnings = []
- # v5.1 引入: 记录章节号用于 SQLite 同步
- self._pending_sqlite_data["chapter"] = chapter
- # 处理出场实体
- for entity in result.get("entities_appeared", []):
- entity_id = entity.get("id")
- entity_type = entity.get("type")
- if entity_id:
- self.update_entity_appearance(entity_id, chapter, entity_type)
- # v5.1 引入: 缓存用于 SQLite 同步
- self._pending_sqlite_data["entities_appeared"].append(entity)
- # 处理新实体
- for entity in result.get("entities_new", []):
- entity_id = entity.get("suggested_id") or entity.get("id")
- if entity_id and entity_id != "NEW":
- new_entity = EntityState(
- id=entity_id,
- name=entity.get("name", ""),
- type=entity.get("type", "角色"),
- tier=entity.get("tier", "装饰"),
- aliases=entity.get("mentions", []),
- first_appearance=chapter,
- last_appearance=chapter
- )
- if not self.add_entity(new_entity):
- warnings.append(f"实体已存在: {entity_id}")
- # v5.1 引入: 缓存用于 SQLite 同步
- self._pending_sqlite_data["entities_new"].append(entity)
- # 处理状态变化
- for change in result.get("state_changes", []):
- self.record_state_change(
- entity_id=change.get("entity_id", ""),
- field=change.get("field", ""),
- old_value=change.get("old"),
- new_value=change.get("new"),
- reason=change.get("reason", ""),
- chapter=chapter
- )
- # v5.1 引入: 缓存用于 SQLite 同步
- self._pending_sqlite_data["state_changes"].append(change)
- # 处理关系
- for rel in result.get("relationships_new", []):
- self.add_relationship(
- from_entity=rel.get("from", ""),
- to_entity=rel.get("to", ""),
- rel_type=rel.get("type", ""),
- description=rel.get("description", ""),
- chapter=chapter
- )
- # v5.1 引入: 缓存用于 SQLite 同步
- self._pending_sqlite_data["relationships_new"].append(rel)
- # 处理消歧不确定项(不影响实体写入,但必须对 Writer 可见)
- warnings.extend(self._record_disambiguation(chapter, result.get("uncertain", [])))
- # 写入 chapter_meta(钩子/模式/结束状态)
- chapter_meta = result.get("chapter_meta")
- if isinstance(chapter_meta, dict):
- meta_key = f"{int(chapter):04d}"
- self._state.setdefault("chapter_meta", {})
- self._state["chapter_meta"][meta_key] = chapter_meta
- self._pending_chapter_meta[meta_key] = chapter_meta
- # 更新进度
- self.update_progress(chapter)
- # 同步主角状态(entities_v3 → protagonist_state)
- self.sync_protagonist_from_entity()
- return warnings
- # ==================== 导出 ====================
- def export_for_context(self) -> Dict:
- """导出用于上下文的精简版状态(v5.0 引入,v5.4 沿用)"""
- # 从 entities_v3 构建精简视图
- entities_flat = {}
- for type_name, entities in self._state.get("entities_v3", {}).items():
- for eid, e in entities.items():
- entities_flat[eid] = {
- "name": e.get("canonical_name", eid),
- "type": type_name,
- "tier": e.get("tier", "装饰"),
- "current": e.get("current", {})
- }
- return {
- "progress": self._state.get("progress", {}),
- "entities": entities_flat,
- # v5.1 引入: alias_index 已迁移到 index.db,这里返回空(兼容性)
- "alias_index": {},
- "recent_changes": [], # v5.1 引入: 从 index.db 查询
- "disambiguation": {
- "warnings": self._state.get("disambiguation_warnings", [])[-self.config.export_disambiguation_slice:],
- "pending": self._state.get("disambiguation_pending", [])[-self.config.export_disambiguation_slice:],
- },
- }
- # ==================== 主角同步 ====================
- def get_protagonist_entity_id(self) -> Optional[str]:
- """获取主角实体 ID(通过 is_protagonist 标记或 SQLite 查询)"""
- # 方式1: 通过 SQLStateManager 查询 (v5.1)
- if self._sql_state_manager:
- protagonist = self._sql_state_manager.get_protagonist()
- if protagonist:
- return protagonist.get("id")
- # 方式2: 通过 protagonist_state.name 查找别名
- protag_name = self._state.get("protagonist_state", {}).get("name")
- if protag_name and self._sql_state_manager:
- entities = self._sql_state_manager._index_manager.get_entities_by_alias(protag_name)
- for entry in entities:
- if entry.get("type") == "角色":
- return entry.get("id")
- return None
- def sync_protagonist_from_entity(self, entity_id: str = None):
- """
- 将主角实体的状态同步到 protagonist_state (v5.1: 从 SQLite 读取)
- 用于确保 consistency-checker 等依赖 protagonist_state 的组件获取最新数据
- """
- if entity_id is None:
- entity_id = self.get_protagonist_entity_id()
- if entity_id is None:
- return
- entity = self.get_entity(entity_id, "角色")
- if not entity:
- return
- current = entity.get("current")
- if not isinstance(current, dict):
- current = entity.get("current_json", {})
- if isinstance(current, str):
- try:
- current = json.loads(current) if current else {}
- except (json.JSONDecodeError, TypeError):
- current = {}
- if not isinstance(current, dict):
- current = {}
- protag = self._state.setdefault("protagonist_state", {})
- # 同步境界
- if "realm" in current:
- power = protag.setdefault("power", {})
- power["realm"] = current["realm"]
- if "layer" in current:
- power["layer"] = current["layer"]
- # 同步位置
- if "location" in current:
- loc = protag.setdefault("location", {})
- loc["current"] = current["location"]
- if "last_chapter" in current:
- loc["last_chapter"] = current["last_chapter"]
- def sync_protagonist_to_entity(self, entity_id: str = None):
- """
- 将 protagonist_state 同步到 entities_v3 中的主角实体
- 用于初始化或手动编辑 protagonist_state 后保持一致性
- """
- if entity_id is None:
- entity_id = self.get_protagonist_entity_id()
- if entity_id is None:
- return
- protag = self._state.get("protagonist_state", {})
- if not protag:
- return
- updates = {}
- # 同步境界
- power = protag.get("power", {})
- if power.get("realm"):
- updates["realm"] = power["realm"]
- if power.get("layer"):
- updates["layer"] = power["layer"]
- # 同步位置
- loc = protag.get("location", {})
- if loc.get("current"):
- updates["location"] = loc["current"]
- if updates:
- self.update_entity(entity_id, updates, "角色")
- # ==================== CLI 接口 ====================
- def main():
- import argparse
- from pydantic import ValidationError
- from .cli_output import print_success, print_error
- from .schemas import validate_data_agent_output, format_validation_error, normalize_data_agent_output
- from .index_manager import IndexManager
- parser = argparse.ArgumentParser(description="State Manager CLI (v5.4)")
- parser.add_argument("--project-root", type=str, help="项目根目录")
- subparsers = parser.add_subparsers(dest="command")
- # 读取进度
- subparsers.add_parser("get-progress")
- # 获取实体
- get_entity_parser = subparsers.add_parser("get-entity")
- get_entity_parser.add_argument("--id", required=True)
- # 列出实体
- list_parser = subparsers.add_parser("list-entities")
- list_parser.add_argument("--type", help="按类型过滤")
- list_parser.add_argument("--tier", help="按层级过滤")
- # 处理章节结果
- process_parser = subparsers.add_parser("process-chapter")
- process_parser.add_argument("--chapter", type=int, required=True, help="章节号")
- process_parser.add_argument("--data", required=True, help="JSON 格式的处理结果")
- args = parser.parse_args()
- # 初始化
- config = None
- if args.project_root:
- from .config import DataModulesConfig
- config = DataModulesConfig.from_project_root(args.project_root)
- manager = StateManager(config)
- logger = IndexManager(config)
- tool_name = f"state_manager:{args.command or 'unknown'}"
- def emit_success(data=None, message: str = "ok"):
- print_success(data, message=message)
- safe_log_tool_call(logger, tool_name=tool_name, success=True)
- def emit_error(code: str, message: str, suggestion: str | None = None):
- print_error(code, message, suggestion=suggestion)
- safe_log_tool_call(
- logger,
- tool_name=tool_name,
- success=False,
- error_code=code,
- error_message=message,
- )
- if args.command == "get-progress":
- emit_success(manager._state.get("progress", {}), message="progress")
- elif args.command == "get-entity":
- entity = manager.get_entity(args.id)
- if entity:
- emit_success(entity, message="entity")
- else:
- emit_error("NOT_FOUND", f"未找到实体: {args.id}")
- elif args.command == "list-entities":
- if args.type:
- entities = manager.get_entities_by_type(args.type)
- elif args.tier:
- entities = manager.get_entities_by_tier(args.tier)
- else:
- entities = manager.get_all_entities()
- payload = [{"id": eid, **e} for eid, e in entities.items()]
- emit_success(payload, message="entities")
- elif args.command == "process-chapter":
- data = json.loads(args.data)
- validated = None
- last_exc = None
- for _ in range(3):
- try:
- validated = validate_data_agent_output(data)
- break
- except ValidationError as exc:
- last_exc = exc
- data = normalize_data_agent_output(data)
- if validated is None:
- err = format_validation_error(last_exc) if last_exc else {
- "code": "SCHEMA_VALIDATION_FAILED",
- "message": "数据结构校验失败",
- "details": {"errors": []},
- "suggestion": "请检查 data-agent 输出字段是否完整且类型正确",
- }
- emit_error(err["code"], err["message"], suggestion=err.get("suggestion"))
- return
- warnings = manager.process_chapter_result(args.chapter, validated.model_dump(by_alias=True))
- manager.save_state()
- emit_success({"chapter": args.chapter, "warnings": warnings}, message="chapter_processed")
- else:
- emit_error("UNKNOWN_COMMAND", "未指定有效命令", suggestion="请查看 --help")
- if __name__ == "__main__":
- if sys.platform == "win32":
- enable_windows_utf8_stdio()
- main()
|