context_manager.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. ContextManager - assemble context packs with weighted priorities.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import re
  9. from pathlib import Path
  10. from typing import Any, Dict, List, Optional
  11. from .config import get_config
  12. from .index_manager import IndexManager
  13. from .context_ranker import ContextRanker
  14. from .snapshot_manager import SnapshotManager, SnapshotVersionMismatch
  15. class ContextManager:
  16. DEFAULT_TEMPLATE = "plot"
  17. TEMPLATE_WEIGHTS = {
  18. "plot": {"core": 0.40, "scene": 0.35, "global": 0.25},
  19. "battle": {"core": 0.35, "scene": 0.45, "global": 0.20},
  20. "emotion": {"core": 0.45, "scene": 0.35, "global": 0.20},
  21. "transition": {"core": 0.50, "scene": 0.25, "global": 0.25},
  22. }
  23. EXTRA_SECTIONS = {"story_skeleton", "memory", "preferences", "alerts"}
  24. SECTION_ORDER = ["core", "scene", "global", "story_skeleton", "memory", "preferences", "alerts"]
  25. SUMMARY_SECTION_RE = re.compile(r"##\s*剧情摘要\s*\r?\n(.*?)(?=\r?\n##|\Z)", re.DOTALL)
  26. def __init__(self, config=None, snapshot_manager: Optional[SnapshotManager] = None):
  27. self.config = config or get_config()
  28. self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
  29. self.index_manager = IndexManager(self.config)
  30. self.context_ranker = ContextRanker(self.config)
  31. def _is_snapshot_compatible(self, cached: Dict[str, Any], template: str) -> bool:
  32. """判断快照是否可用于当前模板。"""
  33. if not isinstance(cached, dict):
  34. return False
  35. meta = cached.get("meta")
  36. if not isinstance(meta, dict):
  37. # 兼容旧快照:未记录 template 时仅允许默认模板复用
  38. return template == self.DEFAULT_TEMPLATE
  39. cached_template = meta.get("template")
  40. if not isinstance(cached_template, str):
  41. return template == self.DEFAULT_TEMPLATE
  42. return cached_template == template
  43. def build_context(
  44. self,
  45. chapter: int,
  46. template: str | None = None,
  47. use_snapshot: bool = True,
  48. save_snapshot: bool = True,
  49. max_chars: Optional[int] = None,
  50. ) -> Dict[str, Any]:
  51. template = template or self.DEFAULT_TEMPLATE
  52. if template not in self.TEMPLATE_WEIGHTS:
  53. template = self.DEFAULT_TEMPLATE
  54. if use_snapshot:
  55. try:
  56. cached = self.snapshot_manager.load_snapshot(chapter)
  57. if cached and self._is_snapshot_compatible(cached, template):
  58. return cached.get("payload", cached)
  59. except SnapshotVersionMismatch:
  60. # Snapshot incompatible; rebuild below.
  61. pass
  62. pack = self._build_pack(chapter)
  63. if getattr(self.config, "context_ranker_enabled", True):
  64. pack = self.context_ranker.rank_pack(pack, chapter)
  65. assembled = self.assemble_context(pack, template=template, max_chars=max_chars)
  66. if save_snapshot:
  67. meta = {"template": template}
  68. self.snapshot_manager.save_snapshot(chapter, assembled, meta=meta)
  69. return assembled
  70. def assemble_context(
  71. self,
  72. pack: Dict[str, Any],
  73. template: str = DEFAULT_TEMPLATE,
  74. max_chars: Optional[int] = None,
  75. ) -> Dict[str, Any]:
  76. weights = self.TEMPLATE_WEIGHTS.get(template, self.TEMPLATE_WEIGHTS[self.DEFAULT_TEMPLATE])
  77. max_chars = max_chars or 8000
  78. extra_budget = int(self.config.context_extra_section_budget or 0)
  79. sections = {}
  80. for section_name in self.SECTION_ORDER:
  81. if section_name in pack:
  82. sections[section_name] = pack[section_name]
  83. assembled: Dict[str, Any] = {"meta": pack.get("meta", {}), "sections": {}}
  84. for name, content in sections.items():
  85. weight = weights.get(name, 0.0)
  86. if weight > 0:
  87. budget = int(max_chars * weight)
  88. elif name in self.EXTRA_SECTIONS and extra_budget > 0:
  89. budget = extra_budget
  90. else:
  91. budget = None
  92. text = json.dumps(content, ensure_ascii=False)
  93. if budget is not None and len(text) > budget:
  94. text = text[:budget]
  95. assembled["sections"][name] = {"content": content, "text": text, "budget": budget}
  96. assembled["template"] = template
  97. assembled["weights"] = weights
  98. return assembled
  99. def filter_invalid_items(self, items: List[Dict[str, Any]], source_type: str, id_key: str) -> List[Dict[str, Any]]:
  100. confirmed = self.index_manager.get_invalid_ids(source_type, status="confirmed")
  101. pending = self.index_manager.get_invalid_ids(source_type, status="pending")
  102. result = []
  103. for item in items:
  104. item_id = str(item.get(id_key, ""))
  105. if item_id in confirmed:
  106. continue
  107. if item_id in pending:
  108. item = dict(item)
  109. item["warning"] = "pending_invalid"
  110. result.append(item)
  111. return result
  112. def apply_confidence_filter(self, items: List[Dict[str, Any]], min_confidence: float) -> List[Dict[str, Any]]:
  113. filtered: List[Dict[str, Any]] = []
  114. for item in items:
  115. conf = item.get("confidence")
  116. if conf is None or conf >= min_confidence:
  117. filtered.append(item)
  118. return filtered
  119. def _build_pack(self, chapter: int) -> Dict[str, Any]:
  120. state = self._load_state()
  121. core = {
  122. "chapter_outline": self._load_outline(chapter),
  123. "protagonist_snapshot": state.get("protagonist_state", {}),
  124. "recent_summaries": self._load_recent_summaries(
  125. chapter,
  126. window=self.config.context_recent_summaries_window,
  127. ),
  128. "recent_meta": self._load_recent_meta(
  129. state,
  130. chapter,
  131. window=self.config.context_recent_meta_window,
  132. ),
  133. }
  134. scene = {
  135. "location_context": state.get("protagonist_state", {}).get("location", {}),
  136. "appearing_characters": self._load_recent_appearances(
  137. limit=self.config.context_max_appearing_characters,
  138. ),
  139. }
  140. scene["appearing_characters"] = self.filter_invalid_items(
  141. scene["appearing_characters"], source_type="entity", id_key="entity_id"
  142. )
  143. global_ctx = {
  144. "worldview_skeleton": self._load_setting("世界观"),
  145. "power_system_skeleton": self._load_setting("力量体系"),
  146. "style_contract_ref": self._load_setting("风格契约"),
  147. }
  148. preferences = self._load_json_optional(self.config.webnovel_dir / "preferences.json")
  149. memory = self._load_json_optional(self.config.webnovel_dir / "project_memory.json")
  150. story_skeleton = self._load_story_skeleton(chapter)
  151. alert_slice = max(0, int(self.config.context_alerts_slice))
  152. return {
  153. "meta": {"chapter": chapter},
  154. "core": core,
  155. "scene": scene,
  156. "global": global_ctx,
  157. "story_skeleton": story_skeleton,
  158. "preferences": preferences,
  159. "memory": memory,
  160. "alerts": {
  161. "disambiguation_warnings": (
  162. state.get("disambiguation_warnings", [])[-alert_slice:] if alert_slice else []
  163. ),
  164. "disambiguation_pending": (
  165. state.get("disambiguation_pending", [])[-alert_slice:] if alert_slice else []
  166. ),
  167. },
  168. }
  169. def _load_state(self) -> Dict[str, Any]:
  170. path = self.config.state_file
  171. if not path.exists():
  172. return {}
  173. return json.loads(path.read_text(encoding="utf-8"))
  174. def _load_outline(self, chapter: int) -> str:
  175. outline_dir = self.config.outline_dir
  176. patterns = [
  177. f"第{chapter}章*.md",
  178. f"第{chapter:02d}章*.md",
  179. f"第{chapter:03d}章*.md",
  180. f"第{chapter:04d}章*.md",
  181. ]
  182. for pattern in patterns:
  183. matches = list(outline_dir.glob(pattern))
  184. if matches:
  185. return matches[0].read_text(encoding="utf-8")
  186. return f"[大纲未找到: 第{chapter}章]"
  187. def _load_recent_summaries(self, chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  188. summaries = []
  189. for ch in range(max(1, chapter - window), chapter):
  190. summary = self._load_summary_text(ch)
  191. if summary:
  192. summaries.append(summary)
  193. return summaries
  194. def _load_recent_meta(self, state: Dict[str, Any], chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  195. meta = state.get("chapter_meta", {}) or {}
  196. results = []
  197. for ch in range(max(1, chapter - window), chapter):
  198. for key in (f"{ch:04d}", str(ch)):
  199. if key in meta:
  200. results.append({"chapter": ch, **meta.get(key, {})})
  201. break
  202. return results
  203. def _load_recent_appearances(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
  204. appearances = self.index_manager.get_recent_appearances(limit=limit)
  205. return appearances or []
  206. def _load_setting(self, keyword: str) -> str:
  207. settings_dir = self.config.settings_dir
  208. candidates = [
  209. settings_dir / f"{keyword}.md",
  210. ]
  211. for path in candidates:
  212. if path.exists():
  213. return path.read_text(encoding="utf-8")
  214. # fallback: any file containing keyword
  215. matches = list(settings_dir.glob(f"*{keyword}*.md"))
  216. if matches:
  217. return matches[0].read_text(encoding="utf-8")
  218. return f"[{keyword}设定未找到]"
  219. def _extract_summary_excerpt(self, text: str, max_chars: int) -> str:
  220. if not text:
  221. return ""
  222. match = self.SUMMARY_SECTION_RE.search(text)
  223. excerpt = match.group(1).strip() if match else text.strip()
  224. if max_chars > 0 and len(excerpt) > max_chars:
  225. return excerpt[:max_chars].rstrip()
  226. return excerpt
  227. def _load_summary_text(self, chapter: int, snippet_chars: Optional[int] = None) -> Optional[Dict[str, Any]]:
  228. summary_path = self.config.webnovel_dir / "summaries" / f"ch{chapter:04d}.md"
  229. if not summary_path.exists():
  230. return None
  231. text = summary_path.read_text(encoding="utf-8")
  232. if snippet_chars:
  233. summary_text = self._extract_summary_excerpt(text, snippet_chars)
  234. else:
  235. summary_text = text
  236. return {"chapter": chapter, "summary": summary_text}
  237. def _load_story_skeleton(self, chapter: int) -> List[Dict[str, Any]]:
  238. interval = max(1, int(self.config.context_story_skeleton_interval))
  239. max_samples = max(0, int(self.config.context_story_skeleton_max_samples))
  240. snippet_chars = int(self.config.context_story_skeleton_snippet_chars)
  241. if max_samples <= 0 or chapter <= interval:
  242. return []
  243. samples: List[Dict[str, Any]] = []
  244. cursor = chapter - interval
  245. while cursor >= 1 and len(samples) < max_samples:
  246. summary = self._load_summary_text(cursor, snippet_chars=snippet_chars)
  247. if summary and summary.get("summary"):
  248. samples.append(summary)
  249. cursor -= interval
  250. samples.reverse()
  251. return samples
  252. def _load_json_optional(self, path: Path) -> Dict[str, Any]:
  253. if not path.exists():
  254. return {}
  255. try:
  256. return json.loads(path.read_text(encoding="utf-8"))
  257. except json.JSONDecodeError:
  258. return {}
  259. def main():
  260. import argparse
  261. from .cli_output import print_success, print_error
  262. parser = argparse.ArgumentParser(description="Context Manager CLI")
  263. parser.add_argument("--project-root", type=str, help="项目根目录")
  264. parser.add_argument("--chapter", type=int, required=True)
  265. parser.add_argument("--template", type=str, default=ContextManager.DEFAULT_TEMPLATE)
  266. parser.add_argument("--no-snapshot", action="store_true")
  267. parser.add_argument("--max-chars", type=int, default=8000)
  268. args = parser.parse_args()
  269. config = None
  270. if args.project_root:
  271. from .config import DataModulesConfig
  272. config = DataModulesConfig.from_project_root(args.project_root)
  273. manager = ContextManager(config)
  274. try:
  275. payload = manager.build_context(
  276. chapter=args.chapter,
  277. template=args.template,
  278. use_snapshot=not args.no_snapshot,
  279. save_snapshot=True,
  280. max_chars=args.max_chars,
  281. )
  282. print_success(payload, message="context_built")
  283. try:
  284. manager.index_manager.log_tool_call("context_manager:build", True, chapter=args.chapter)
  285. except Exception:
  286. pass
  287. except Exception as exc:
  288. print_error("CONTEXT_BUILD_FAILED", str(exc), suggestion="请检查项目结构与依赖文件")
  289. try:
  290. manager.index_manager.log_tool_call(
  291. "context_manager:build", False, error_code="CONTEXT_BUILD_FAILED", error_message=str(exc), chapter=args.chapter
  292. )
  293. except Exception:
  294. pass
  295. if __name__ == "__main__":
  296. import sys
  297. if sys.platform == "win32":
  298. import io
  299. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
  300. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
  301. main()