context_manager.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. ContextManager - assemble context packs with weighted priorities.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import re
  9. from pathlib import Path
  10. from typing import Any, Dict, List, Optional
  11. from .config import get_config
  12. from .index_manager import IndexManager
  13. from .snapshot_manager import SnapshotManager, SnapshotVersionMismatch
  14. class ContextManager:
  15. DEFAULT_TEMPLATE = "plot"
  16. TEMPLATE_WEIGHTS = {
  17. "plot": {"core": 0.40, "scene": 0.35, "global": 0.25},
  18. "battle": {"core": 0.35, "scene": 0.45, "global": 0.20},
  19. "emotion": {"core": 0.45, "scene": 0.35, "global": 0.20},
  20. "transition": {"core": 0.50, "scene": 0.25, "global": 0.25},
  21. }
  22. EXTRA_SECTIONS = {"story_skeleton", "memory", "preferences", "alerts"}
  23. SECTION_ORDER = ["core", "scene", "global", "story_skeleton", "memory", "preferences", "alerts"]
  24. SUMMARY_SECTION_RE = re.compile(r"##\s*剧情摘要\s*\r?\n(.*?)(?=\r?\n##|\Z)", re.DOTALL)
  25. def __init__(self, config=None, snapshot_manager: Optional[SnapshotManager] = None):
  26. self.config = config or get_config()
  27. self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
  28. self.index_manager = IndexManager(self.config)
  29. def build_context(
  30. self,
  31. chapter: int,
  32. template: str | None = None,
  33. use_snapshot: bool = True,
  34. save_snapshot: bool = True,
  35. max_chars: Optional[int] = None,
  36. ) -> Dict[str, Any]:
  37. template = template or self.DEFAULT_TEMPLATE
  38. if template not in self.TEMPLATE_WEIGHTS:
  39. template = self.DEFAULT_TEMPLATE
  40. if use_snapshot:
  41. try:
  42. cached = self.snapshot_manager.load_snapshot(chapter)
  43. if cached:
  44. return cached.get("payload", cached)
  45. except SnapshotVersionMismatch:
  46. # Snapshot incompatible; rebuild below.
  47. pass
  48. pack = self._build_pack(chapter)
  49. assembled = self.assemble_context(pack, template=template, max_chars=max_chars)
  50. if save_snapshot:
  51. meta = {"template": template}
  52. self.snapshot_manager.save_snapshot(chapter, assembled, meta=meta)
  53. return assembled
  54. def assemble_context(
  55. self,
  56. pack: Dict[str, Any],
  57. template: str = DEFAULT_TEMPLATE,
  58. max_chars: Optional[int] = None,
  59. ) -> Dict[str, Any]:
  60. weights = self.TEMPLATE_WEIGHTS.get(template, self.TEMPLATE_WEIGHTS[self.DEFAULT_TEMPLATE])
  61. max_chars = max_chars or 8000
  62. extra_budget = int(self.config.context_extra_section_budget or 0)
  63. sections = {}
  64. for section_name in self.SECTION_ORDER:
  65. if section_name in pack:
  66. sections[section_name] = pack[section_name]
  67. assembled: Dict[str, Any] = {"meta": pack.get("meta", {}), "sections": {}}
  68. for name, content in sections.items():
  69. weight = weights.get(name, 0.0)
  70. if weight > 0:
  71. budget = int(max_chars * weight)
  72. elif name in self.EXTRA_SECTIONS and extra_budget > 0:
  73. budget = extra_budget
  74. else:
  75. budget = None
  76. text = json.dumps(content, ensure_ascii=False)
  77. if budget is not None and len(text) > budget:
  78. text = text[:budget]
  79. assembled["sections"][name] = {"content": content, "text": text, "budget": budget}
  80. assembled["template"] = template
  81. assembled["weights"] = weights
  82. return assembled
  83. def filter_invalid_items(self, items: List[Dict[str, Any]], source_type: str, id_key: str) -> List[Dict[str, Any]]:
  84. confirmed = self.index_manager.get_invalid_ids(source_type, status="confirmed")
  85. pending = self.index_manager.get_invalid_ids(source_type, status="pending")
  86. result = []
  87. for item in items:
  88. item_id = str(item.get(id_key, ""))
  89. if item_id in confirmed:
  90. continue
  91. if item_id in pending:
  92. item = dict(item)
  93. item["warning"] = "pending_invalid"
  94. result.append(item)
  95. return result
  96. def apply_confidence_filter(self, items: List[Dict[str, Any]], min_confidence: float) -> List[Dict[str, Any]]:
  97. filtered: List[Dict[str, Any]] = []
  98. for item in items:
  99. conf = item.get("confidence")
  100. if conf is None or conf >= min_confidence:
  101. filtered.append(item)
  102. return filtered
  103. def _build_pack(self, chapter: int) -> Dict[str, Any]:
  104. state = self._load_state()
  105. core = {
  106. "chapter_outline": self._load_outline(chapter),
  107. "protagonist_snapshot": state.get("protagonist_state", {}),
  108. "recent_summaries": self._load_recent_summaries(
  109. chapter,
  110. window=self.config.context_recent_summaries_window,
  111. ),
  112. "recent_meta": self._load_recent_meta(
  113. state,
  114. chapter,
  115. window=self.config.context_recent_meta_window,
  116. ),
  117. }
  118. scene = {
  119. "location_context": state.get("protagonist_state", {}).get("location", {}),
  120. "appearing_characters": self._load_recent_appearances(
  121. limit=self.config.context_max_appearing_characters,
  122. ),
  123. }
  124. scene["appearing_characters"] = self.filter_invalid_items(
  125. scene["appearing_characters"], source_type="entity", id_key="entity_id"
  126. )
  127. global_ctx = {
  128. "worldview_skeleton": self._load_setting("世界观"),
  129. "power_system_skeleton": self._load_setting("力量体系"),
  130. "style_contract_ref": self._load_setting("风格契约"),
  131. }
  132. preferences = self._load_json_optional(self.config.webnovel_dir / "preferences.json")
  133. memory = self._load_json_optional(self.config.webnovel_dir / "project_memory.json")
  134. story_skeleton = self._load_story_skeleton(chapter)
  135. alert_slice = max(0, int(self.config.context_alerts_slice))
  136. return {
  137. "meta": {"chapter": chapter},
  138. "core": core,
  139. "scene": scene,
  140. "global": global_ctx,
  141. "story_skeleton": story_skeleton,
  142. "preferences": preferences,
  143. "memory": memory,
  144. "alerts": {
  145. "disambiguation_warnings": (
  146. state.get("disambiguation_warnings", [])[-alert_slice:] if alert_slice else []
  147. ),
  148. "disambiguation_pending": (
  149. state.get("disambiguation_pending", [])[-alert_slice:] if alert_slice else []
  150. ),
  151. },
  152. }
  153. def _load_state(self) -> Dict[str, Any]:
  154. path = self.config.state_file
  155. if not path.exists():
  156. return {}
  157. return json.loads(path.read_text(encoding="utf-8"))
  158. def _load_outline(self, chapter: int) -> str:
  159. outline_dir = self.config.outline_dir
  160. patterns = [
  161. f"第{chapter}章*.md",
  162. f"第{chapter:02d}章*.md",
  163. f"第{chapter:03d}章*.md",
  164. f"第{chapter:04d}章*.md",
  165. ]
  166. for pattern in patterns:
  167. matches = list(outline_dir.glob(pattern))
  168. if matches:
  169. return matches[0].read_text(encoding="utf-8")
  170. return f"[大纲未找到: 第{chapter}章]"
  171. def _load_recent_summaries(self, chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  172. summaries = []
  173. for ch in range(max(1, chapter - window), chapter):
  174. summary = self._load_summary_text(ch)
  175. if summary:
  176. summaries.append(summary)
  177. return summaries
  178. def _load_recent_meta(self, state: Dict[str, Any], chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  179. meta = state.get("chapter_meta", {}) or {}
  180. results = []
  181. for ch in range(max(1, chapter - window), chapter):
  182. for key in (f"{ch:04d}", str(ch)):
  183. if key in meta:
  184. results.append({"chapter": ch, **meta.get(key, {})})
  185. break
  186. return results
  187. def _load_recent_appearances(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
  188. appearances = self.index_manager.get_recent_appearances(limit=limit)
  189. return appearances or []
  190. def _load_setting(self, keyword: str) -> str:
  191. settings_dir = self.config.settings_dir
  192. candidates = [
  193. settings_dir / f"{keyword}.md",
  194. ]
  195. for path in candidates:
  196. if path.exists():
  197. return path.read_text(encoding="utf-8")
  198. # fallback: any file containing keyword
  199. matches = list(settings_dir.glob(f"*{keyword}*.md"))
  200. if matches:
  201. return matches[0].read_text(encoding="utf-8")
  202. return f"[{keyword}设定未找到]"
  203. def _extract_summary_excerpt(self, text: str, max_chars: int) -> str:
  204. if not text:
  205. return ""
  206. match = self.SUMMARY_SECTION_RE.search(text)
  207. excerpt = match.group(1).strip() if match else text.strip()
  208. if max_chars > 0 and len(excerpt) > max_chars:
  209. return excerpt[:max_chars].rstrip()
  210. return excerpt
  211. def _load_summary_text(self, chapter: int, snippet_chars: Optional[int] = None) -> Optional[Dict[str, Any]]:
  212. summary_path = self.config.webnovel_dir / "summaries" / f"ch{chapter:04d}.md"
  213. if not summary_path.exists():
  214. return None
  215. text = summary_path.read_text(encoding="utf-8")
  216. if snippet_chars:
  217. summary_text = self._extract_summary_excerpt(text, snippet_chars)
  218. else:
  219. summary_text = text
  220. return {"chapter": chapter, "summary": summary_text}
  221. def _load_story_skeleton(self, chapter: int) -> List[Dict[str, Any]]:
  222. interval = max(1, int(self.config.context_story_skeleton_interval))
  223. max_samples = max(0, int(self.config.context_story_skeleton_max_samples))
  224. snippet_chars = int(self.config.context_story_skeleton_snippet_chars)
  225. if max_samples <= 0 or chapter <= interval:
  226. return []
  227. samples: List[Dict[str, Any]] = []
  228. cursor = chapter - interval
  229. while cursor >= 1 and len(samples) < max_samples:
  230. summary = self._load_summary_text(cursor, snippet_chars=snippet_chars)
  231. if summary and summary.get("summary"):
  232. samples.append(summary)
  233. cursor -= interval
  234. samples.reverse()
  235. return samples
  236. def _load_json_optional(self, path: Path) -> Dict[str, Any]:
  237. if not path.exists():
  238. return {}
  239. try:
  240. return json.loads(path.read_text(encoding="utf-8"))
  241. except json.JSONDecodeError:
  242. return {}
  243. def main():
  244. import argparse
  245. from .cli_output import print_success, print_error
  246. parser = argparse.ArgumentParser(description="Context Manager CLI")
  247. parser.add_argument("--project-root", type=str, help="项目根目录")
  248. parser.add_argument("--chapter", type=int, required=True)
  249. parser.add_argument("--template", type=str, default=ContextManager.DEFAULT_TEMPLATE)
  250. parser.add_argument("--no-snapshot", action="store_true")
  251. parser.add_argument("--max-chars", type=int, default=8000)
  252. args = parser.parse_args()
  253. config = None
  254. if args.project_root:
  255. from .config import DataModulesConfig
  256. config = DataModulesConfig.from_project_root(args.project_root)
  257. manager = ContextManager(config)
  258. try:
  259. payload = manager.build_context(
  260. chapter=args.chapter,
  261. template=args.template,
  262. use_snapshot=not args.no_snapshot,
  263. save_snapshot=True,
  264. max_chars=args.max_chars,
  265. )
  266. print_success(payload, message="context_built")
  267. try:
  268. manager.index_manager.log_tool_call("context_manager:build", True, chapter=args.chapter)
  269. except Exception:
  270. pass
  271. except Exception as exc:
  272. print_error("CONTEXT_BUILD_FAILED", str(exc), suggestion="请检查项目结构与依赖文件")
  273. try:
  274. manager.index_manager.log_tool_call(
  275. "context_manager:build", False, error_code="CONTEXT_BUILD_FAILED", error_message=str(exc), chapter=args.chapter
  276. )
  277. except Exception:
  278. pass
  279. if __name__ == "__main__":
  280. main()