context_manager.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. ContextManager - assemble context packs with weighted priorities.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import re
  9. from pathlib import Path
  10. from typing import Any, Dict, List, Optional
  11. from .config import get_config
  12. from .index_manager import IndexManager
  13. from .snapshot_manager import SnapshotManager, SnapshotVersionMismatch
  14. class ContextManager:
  15. DEFAULT_TEMPLATE = "plot"
  16. TEMPLATE_WEIGHTS = {
  17. "plot": {"core": 0.40, "scene": 0.35, "global": 0.25},
  18. "battle": {"core": 0.35, "scene": 0.45, "global": 0.20},
  19. "emotion": {"core": 0.45, "scene": 0.35, "global": 0.20},
  20. "transition": {"core": 0.50, "scene": 0.25, "global": 0.25},
  21. }
  22. EXTRA_SECTIONS = {"story_skeleton", "memory", "preferences", "alerts"}
  23. SECTION_ORDER = ["core", "scene", "global", "story_skeleton", "memory", "preferences", "alerts"]
  24. SUMMARY_SECTION_RE = re.compile(r"##\s*剧情摘要\s*\r?\n(.*?)(?=\r?\n##|\Z)", re.DOTALL)
  25. def __init__(self, config=None, snapshot_manager: Optional[SnapshotManager] = None):
  26. self.config = config or get_config()
  27. self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
  28. self.index_manager = IndexManager(self.config)
  29. def _is_snapshot_compatible(self, cached: Dict[str, Any], template: str) -> bool:
  30. """判断快照是否可用于当前模板。"""
  31. if not isinstance(cached, dict):
  32. return False
  33. meta = cached.get("meta")
  34. if not isinstance(meta, dict):
  35. # 兼容旧快照:未记录 template 时仅允许默认模板复用
  36. return template == self.DEFAULT_TEMPLATE
  37. cached_template = meta.get("template")
  38. if not isinstance(cached_template, str):
  39. return template == self.DEFAULT_TEMPLATE
  40. return cached_template == template
  41. def build_context(
  42. self,
  43. chapter: int,
  44. template: str | None = None,
  45. use_snapshot: bool = True,
  46. save_snapshot: bool = True,
  47. max_chars: Optional[int] = None,
  48. ) -> Dict[str, Any]:
  49. template = template or self.DEFAULT_TEMPLATE
  50. if template not in self.TEMPLATE_WEIGHTS:
  51. template = self.DEFAULT_TEMPLATE
  52. if use_snapshot:
  53. try:
  54. cached = self.snapshot_manager.load_snapshot(chapter)
  55. if cached and self._is_snapshot_compatible(cached, template):
  56. return cached.get("payload", cached)
  57. except SnapshotVersionMismatch:
  58. # Snapshot incompatible; rebuild below.
  59. pass
  60. pack = self._build_pack(chapter)
  61. assembled = self.assemble_context(pack, template=template, max_chars=max_chars)
  62. if save_snapshot:
  63. meta = {"template": template}
  64. self.snapshot_manager.save_snapshot(chapter, assembled, meta=meta)
  65. return assembled
  66. def assemble_context(
  67. self,
  68. pack: Dict[str, Any],
  69. template: str = DEFAULT_TEMPLATE,
  70. max_chars: Optional[int] = None,
  71. ) -> Dict[str, Any]:
  72. weights = self.TEMPLATE_WEIGHTS.get(template, self.TEMPLATE_WEIGHTS[self.DEFAULT_TEMPLATE])
  73. max_chars = max_chars or 8000
  74. extra_budget = int(self.config.context_extra_section_budget or 0)
  75. sections = {}
  76. for section_name in self.SECTION_ORDER:
  77. if section_name in pack:
  78. sections[section_name] = pack[section_name]
  79. assembled: Dict[str, Any] = {"meta": pack.get("meta", {}), "sections": {}}
  80. for name, content in sections.items():
  81. weight = weights.get(name, 0.0)
  82. if weight > 0:
  83. budget = int(max_chars * weight)
  84. elif name in self.EXTRA_SECTIONS and extra_budget > 0:
  85. budget = extra_budget
  86. else:
  87. budget = None
  88. text = json.dumps(content, ensure_ascii=False)
  89. if budget is not None and len(text) > budget:
  90. text = text[:budget]
  91. assembled["sections"][name] = {"content": content, "text": text, "budget": budget}
  92. assembled["template"] = template
  93. assembled["weights"] = weights
  94. return assembled
  95. def filter_invalid_items(self, items: List[Dict[str, Any]], source_type: str, id_key: str) -> List[Dict[str, Any]]:
  96. confirmed = self.index_manager.get_invalid_ids(source_type, status="confirmed")
  97. pending = self.index_manager.get_invalid_ids(source_type, status="pending")
  98. result = []
  99. for item in items:
  100. item_id = str(item.get(id_key, ""))
  101. if item_id in confirmed:
  102. continue
  103. if item_id in pending:
  104. item = dict(item)
  105. item["warning"] = "pending_invalid"
  106. result.append(item)
  107. return result
  108. def apply_confidence_filter(self, items: List[Dict[str, Any]], min_confidence: float) -> List[Dict[str, Any]]:
  109. filtered: List[Dict[str, Any]] = []
  110. for item in items:
  111. conf = item.get("confidence")
  112. if conf is None or conf >= min_confidence:
  113. filtered.append(item)
  114. return filtered
  115. def _build_pack(self, chapter: int) -> Dict[str, Any]:
  116. state = self._load_state()
  117. core = {
  118. "chapter_outline": self._load_outline(chapter),
  119. "protagonist_snapshot": state.get("protagonist_state", {}),
  120. "recent_summaries": self._load_recent_summaries(
  121. chapter,
  122. window=self.config.context_recent_summaries_window,
  123. ),
  124. "recent_meta": self._load_recent_meta(
  125. state,
  126. chapter,
  127. window=self.config.context_recent_meta_window,
  128. ),
  129. }
  130. scene = {
  131. "location_context": state.get("protagonist_state", {}).get("location", {}),
  132. "appearing_characters": self._load_recent_appearances(
  133. limit=self.config.context_max_appearing_characters,
  134. ),
  135. }
  136. scene["appearing_characters"] = self.filter_invalid_items(
  137. scene["appearing_characters"], source_type="entity", id_key="entity_id"
  138. )
  139. global_ctx = {
  140. "worldview_skeleton": self._load_setting("世界观"),
  141. "power_system_skeleton": self._load_setting("力量体系"),
  142. "style_contract_ref": self._load_setting("风格契约"),
  143. }
  144. preferences = self._load_json_optional(self.config.webnovel_dir / "preferences.json")
  145. memory = self._load_json_optional(self.config.webnovel_dir / "project_memory.json")
  146. story_skeleton = self._load_story_skeleton(chapter)
  147. alert_slice = max(0, int(self.config.context_alerts_slice))
  148. return {
  149. "meta": {"chapter": chapter},
  150. "core": core,
  151. "scene": scene,
  152. "global": global_ctx,
  153. "story_skeleton": story_skeleton,
  154. "preferences": preferences,
  155. "memory": memory,
  156. "alerts": {
  157. "disambiguation_warnings": (
  158. state.get("disambiguation_warnings", [])[-alert_slice:] if alert_slice else []
  159. ),
  160. "disambiguation_pending": (
  161. state.get("disambiguation_pending", [])[-alert_slice:] if alert_slice else []
  162. ),
  163. },
  164. }
  165. def _load_state(self) -> Dict[str, Any]:
  166. path = self.config.state_file
  167. if not path.exists():
  168. return {}
  169. return json.loads(path.read_text(encoding="utf-8"))
  170. def _load_outline(self, chapter: int) -> str:
  171. outline_dir = self.config.outline_dir
  172. patterns = [
  173. f"第{chapter}章*.md",
  174. f"第{chapter:02d}章*.md",
  175. f"第{chapter:03d}章*.md",
  176. f"第{chapter:04d}章*.md",
  177. ]
  178. for pattern in patterns:
  179. matches = list(outline_dir.glob(pattern))
  180. if matches:
  181. return matches[0].read_text(encoding="utf-8")
  182. return f"[大纲未找到: 第{chapter}章]"
  183. def _load_recent_summaries(self, chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  184. summaries = []
  185. for ch in range(max(1, chapter - window), chapter):
  186. summary = self._load_summary_text(ch)
  187. if summary:
  188. summaries.append(summary)
  189. return summaries
  190. def _load_recent_meta(self, state: Dict[str, Any], chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  191. meta = state.get("chapter_meta", {}) or {}
  192. results = []
  193. for ch in range(max(1, chapter - window), chapter):
  194. for key in (f"{ch:04d}", str(ch)):
  195. if key in meta:
  196. results.append({"chapter": ch, **meta.get(key, {})})
  197. break
  198. return results
  199. def _load_recent_appearances(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
  200. appearances = self.index_manager.get_recent_appearances(limit=limit)
  201. return appearances or []
  202. def _load_setting(self, keyword: str) -> str:
  203. settings_dir = self.config.settings_dir
  204. candidates = [
  205. settings_dir / f"{keyword}.md",
  206. ]
  207. for path in candidates:
  208. if path.exists():
  209. return path.read_text(encoding="utf-8")
  210. # fallback: any file containing keyword
  211. matches = list(settings_dir.glob(f"*{keyword}*.md"))
  212. if matches:
  213. return matches[0].read_text(encoding="utf-8")
  214. return f"[{keyword}设定未找到]"
  215. def _extract_summary_excerpt(self, text: str, max_chars: int) -> str:
  216. if not text:
  217. return ""
  218. match = self.SUMMARY_SECTION_RE.search(text)
  219. excerpt = match.group(1).strip() if match else text.strip()
  220. if max_chars > 0 and len(excerpt) > max_chars:
  221. return excerpt[:max_chars].rstrip()
  222. return excerpt
  223. def _load_summary_text(self, chapter: int, snippet_chars: Optional[int] = None) -> Optional[Dict[str, Any]]:
  224. summary_path = self.config.webnovel_dir / "summaries" / f"ch{chapter:04d}.md"
  225. if not summary_path.exists():
  226. return None
  227. text = summary_path.read_text(encoding="utf-8")
  228. if snippet_chars:
  229. summary_text = self._extract_summary_excerpt(text, snippet_chars)
  230. else:
  231. summary_text = text
  232. return {"chapter": chapter, "summary": summary_text}
  233. def _load_story_skeleton(self, chapter: int) -> List[Dict[str, Any]]:
  234. interval = max(1, int(self.config.context_story_skeleton_interval))
  235. max_samples = max(0, int(self.config.context_story_skeleton_max_samples))
  236. snippet_chars = int(self.config.context_story_skeleton_snippet_chars)
  237. if max_samples <= 0 or chapter <= interval:
  238. return []
  239. samples: List[Dict[str, Any]] = []
  240. cursor = chapter - interval
  241. while cursor >= 1 and len(samples) < max_samples:
  242. summary = self._load_summary_text(cursor, snippet_chars=snippet_chars)
  243. if summary and summary.get("summary"):
  244. samples.append(summary)
  245. cursor -= interval
  246. samples.reverse()
  247. return samples
  248. def _load_json_optional(self, path: Path) -> Dict[str, Any]:
  249. if not path.exists():
  250. return {}
  251. try:
  252. return json.loads(path.read_text(encoding="utf-8"))
  253. except json.JSONDecodeError:
  254. return {}
  255. def main():
  256. import argparse
  257. from .cli_output import print_success, print_error
  258. parser = argparse.ArgumentParser(description="Context Manager CLI")
  259. parser.add_argument("--project-root", type=str, help="项目根目录")
  260. parser.add_argument("--chapter", type=int, required=True)
  261. parser.add_argument("--template", type=str, default=ContextManager.DEFAULT_TEMPLATE)
  262. parser.add_argument("--no-snapshot", action="store_true")
  263. parser.add_argument("--max-chars", type=int, default=8000)
  264. args = parser.parse_args()
  265. config = None
  266. if args.project_root:
  267. from .config import DataModulesConfig
  268. config = DataModulesConfig.from_project_root(args.project_root)
  269. manager = ContextManager(config)
  270. try:
  271. payload = manager.build_context(
  272. chapter=args.chapter,
  273. template=args.template,
  274. use_snapshot=not args.no_snapshot,
  275. save_snapshot=True,
  276. max_chars=args.max_chars,
  277. )
  278. print_success(payload, message="context_built")
  279. try:
  280. manager.index_manager.log_tool_call("context_manager:build", True, chapter=args.chapter)
  281. except Exception:
  282. pass
  283. except Exception as exc:
  284. print_error("CONTEXT_BUILD_FAILED", str(exc), suggestion="请检查项目结构与依赖文件")
  285. try:
  286. manager.index_manager.log_tool_call(
  287. "context_manager:build", False, error_code="CONTEXT_BUILD_FAILED", error_message=str(exc), chapter=args.chapter
  288. )
  289. except Exception:
  290. pass
  291. if __name__ == "__main__":
  292. import sys
  293. if sys.platform == "win32":
  294. import io
  295. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
  296. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
  297. main()