context_manager.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. ContextManager - assemble context packs with weighted priorities.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import re
  9. from pathlib import Path
  10. from typing import Any, Dict, List, Optional
  11. from .config import get_config
  12. from .index_manager import IndexManager
  13. from .context_ranker import ContextRanker
  14. from .snapshot_manager import SnapshotManager, SnapshotVersionMismatch
  15. class ContextManager:
  16. DEFAULT_TEMPLATE = "plot"
  17. TEMPLATE_WEIGHTS = {
  18. "plot": {"core": 0.40, "scene": 0.35, "global": 0.25},
  19. "battle": {"core": 0.35, "scene": 0.45, "global": 0.20},
  20. "emotion": {"core": 0.45, "scene": 0.35, "global": 0.20},
  21. "transition": {"core": 0.50, "scene": 0.25, "global": 0.25},
  22. }
  23. EXTRA_SECTIONS = {
  24. "story_skeleton",
  25. "memory",
  26. "preferences",
  27. "alerts",
  28. "reader_signal",
  29. "genre_profile",
  30. }
  31. SECTION_ORDER = [
  32. "core",
  33. "scene",
  34. "global",
  35. "reader_signal",
  36. "genre_profile",
  37. "story_skeleton",
  38. "memory",
  39. "preferences",
  40. "alerts",
  41. ]
  42. SUMMARY_SECTION_RE = re.compile(r"##\s*剧情摘要\s*\r?\n(.*?)(?=\r?\n##|\Z)", re.DOTALL)
  43. def __init__(self, config=None, snapshot_manager: Optional[SnapshotManager] = None):
  44. self.config = config or get_config()
  45. self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
  46. self.index_manager = IndexManager(self.config)
  47. self.context_ranker = ContextRanker(self.config)
  48. def _is_snapshot_compatible(self, cached: Dict[str, Any], template: str) -> bool:
  49. """判断快照是否可用于当前模板。"""
  50. if not isinstance(cached, dict):
  51. return False
  52. meta = cached.get("meta")
  53. if not isinstance(meta, dict):
  54. # 兼容旧快照:未记录 template 时仅允许默认模板复用
  55. return template == self.DEFAULT_TEMPLATE
  56. cached_template = meta.get("template")
  57. if not isinstance(cached_template, str):
  58. return template == self.DEFAULT_TEMPLATE
  59. return cached_template == template
  60. def build_context(
  61. self,
  62. chapter: int,
  63. template: str | None = None,
  64. use_snapshot: bool = True,
  65. save_snapshot: bool = True,
  66. max_chars: Optional[int] = None,
  67. ) -> Dict[str, Any]:
  68. template = template or self.DEFAULT_TEMPLATE
  69. if template not in self.TEMPLATE_WEIGHTS:
  70. template = self.DEFAULT_TEMPLATE
  71. if use_snapshot:
  72. try:
  73. cached = self.snapshot_manager.load_snapshot(chapter)
  74. if cached and self._is_snapshot_compatible(cached, template):
  75. return cached.get("payload", cached)
  76. except SnapshotVersionMismatch:
  77. # Snapshot incompatible; rebuild below.
  78. pass
  79. pack = self._build_pack(chapter)
  80. if getattr(self.config, "context_ranker_enabled", True):
  81. pack = self.context_ranker.rank_pack(pack, chapter)
  82. assembled = self.assemble_context(pack, template=template, max_chars=max_chars)
  83. if save_snapshot:
  84. meta = {"template": template}
  85. self.snapshot_manager.save_snapshot(chapter, assembled, meta=meta)
  86. return assembled
  87. def assemble_context(
  88. self,
  89. pack: Dict[str, Any],
  90. template: str = DEFAULT_TEMPLATE,
  91. max_chars: Optional[int] = None,
  92. ) -> Dict[str, Any]:
  93. weights = self.TEMPLATE_WEIGHTS.get(template, self.TEMPLATE_WEIGHTS[self.DEFAULT_TEMPLATE])
  94. max_chars = max_chars or 8000
  95. extra_budget = int(self.config.context_extra_section_budget or 0)
  96. sections = {}
  97. for section_name in self.SECTION_ORDER:
  98. if section_name in pack:
  99. sections[section_name] = pack[section_name]
  100. assembled: Dict[str, Any] = {"meta": pack.get("meta", {}), "sections": {}}
  101. for name, content in sections.items():
  102. weight = weights.get(name, 0.0)
  103. if weight > 0:
  104. budget = int(max_chars * weight)
  105. elif name in self.EXTRA_SECTIONS and extra_budget > 0:
  106. budget = extra_budget
  107. else:
  108. budget = None
  109. text = json.dumps(content, ensure_ascii=False)
  110. if budget is not None and len(text) > budget:
  111. text = text[:budget]
  112. assembled["sections"][name] = {"content": content, "text": text, "budget": budget}
  113. assembled["template"] = template
  114. assembled["weights"] = weights
  115. return assembled
  116. def filter_invalid_items(self, items: List[Dict[str, Any]], source_type: str, id_key: str) -> List[Dict[str, Any]]:
  117. confirmed = self.index_manager.get_invalid_ids(source_type, status="confirmed")
  118. pending = self.index_manager.get_invalid_ids(source_type, status="pending")
  119. result = []
  120. for item in items:
  121. item_id = str(item.get(id_key, ""))
  122. if item_id in confirmed:
  123. continue
  124. if item_id in pending:
  125. item = dict(item)
  126. item["warning"] = "pending_invalid"
  127. result.append(item)
  128. return result
  129. def apply_confidence_filter(self, items: List[Dict[str, Any]], min_confidence: float) -> List[Dict[str, Any]]:
  130. filtered: List[Dict[str, Any]] = []
  131. for item in items:
  132. conf = item.get("confidence")
  133. if conf is None or conf >= min_confidence:
  134. filtered.append(item)
  135. return filtered
  136. def _build_pack(self, chapter: int) -> Dict[str, Any]:
  137. state = self._load_state()
  138. core = {
  139. "chapter_outline": self._load_outline(chapter),
  140. "protagonist_snapshot": state.get("protagonist_state", {}),
  141. "recent_summaries": self._load_recent_summaries(
  142. chapter,
  143. window=self.config.context_recent_summaries_window,
  144. ),
  145. "recent_meta": self._load_recent_meta(
  146. state,
  147. chapter,
  148. window=self.config.context_recent_meta_window,
  149. ),
  150. }
  151. scene = {
  152. "location_context": state.get("protagonist_state", {}).get("location", {}),
  153. "appearing_characters": self._load_recent_appearances(
  154. limit=self.config.context_max_appearing_characters,
  155. ),
  156. }
  157. scene["appearing_characters"] = self.filter_invalid_items(
  158. scene["appearing_characters"], source_type="entity", id_key="entity_id"
  159. )
  160. global_ctx = {
  161. "worldview_skeleton": self._load_setting("世界观"),
  162. "power_system_skeleton": self._load_setting("力量体系"),
  163. "style_contract_ref": self._load_setting("风格契约"),
  164. }
  165. preferences = self._load_json_optional(self.config.webnovel_dir / "preferences.json")
  166. memory = self._load_json_optional(self.config.webnovel_dir / "project_memory.json")
  167. story_skeleton = self._load_story_skeleton(chapter)
  168. alert_slice = max(0, int(self.config.context_alerts_slice))
  169. reader_signal = self._load_reader_signal(chapter)
  170. genre_profile = self._load_genre_profile(state)
  171. return {
  172. "meta": {"chapter": chapter},
  173. "core": core,
  174. "scene": scene,
  175. "global": global_ctx,
  176. "reader_signal": reader_signal,
  177. "genre_profile": genre_profile,
  178. "story_skeleton": story_skeleton,
  179. "preferences": preferences,
  180. "memory": memory,
  181. "alerts": {
  182. "disambiguation_warnings": (
  183. state.get("disambiguation_warnings", [])[-alert_slice:] if alert_slice else []
  184. ),
  185. "disambiguation_pending": (
  186. state.get("disambiguation_pending", [])[-alert_slice:] if alert_slice else []
  187. ),
  188. },
  189. }
  190. def _load_reader_signal(self, chapter: int) -> Dict[str, Any]:
  191. if not getattr(self.config, "context_reader_signal_enabled", True):
  192. return {}
  193. recent_limit = max(1, int(getattr(self.config, "context_reader_signal_recent_limit", 5)))
  194. pattern_window = max(1, int(getattr(self.config, "context_reader_signal_window_chapters", 20)))
  195. review_window = max(1, int(getattr(self.config, "context_reader_signal_review_window", 5)))
  196. include_debt = bool(getattr(self.config, "context_reader_signal_include_debt", False))
  197. recent_power = self.index_manager.get_recent_reading_power(limit=recent_limit)
  198. pattern_stats = self.index_manager.get_pattern_usage_stats(last_n_chapters=pattern_window)
  199. hook_stats = self.index_manager.get_hook_type_stats(last_n_chapters=pattern_window)
  200. review_trend = self.index_manager.get_review_trend_stats(last_n=review_window)
  201. low_score_ranges: List[Dict[str, Any]] = []
  202. for row in review_trend.get("recent_ranges", []):
  203. score = row.get("overall_score")
  204. if isinstance(score, (int, float)) and float(score) < 75:
  205. low_score_ranges.append(
  206. {
  207. "start_chapter": row.get("start_chapter"),
  208. "end_chapter": row.get("end_chapter"),
  209. "overall_score": score,
  210. }
  211. )
  212. signal: Dict[str, Any] = {
  213. "recent_reading_power": recent_power,
  214. "pattern_usage": pattern_stats,
  215. "hook_type_usage": hook_stats,
  216. "review_trend": review_trend,
  217. "low_score_ranges": low_score_ranges,
  218. "next_chapter": chapter,
  219. }
  220. if include_debt:
  221. signal["debt_summary"] = self.index_manager.get_debt_summary()
  222. return signal
  223. def _load_genre_profile(self, state: Dict[str, Any]) -> Dict[str, Any]:
  224. if not getattr(self.config, "context_genre_profile_enabled", True):
  225. return {}
  226. fallback = str(getattr(self.config, "context_genre_profile_fallback", "shuangwen") or "shuangwen")
  227. genre = str((state.get("project") or {}).get("genre") or fallback)
  228. profile_path = self.config.project_root / ".claude" / "references" / "genre-profiles.md"
  229. taxonomy_path = self.config.project_root / ".claude" / "references" / "reading-power-taxonomy.md"
  230. profile_text = profile_path.read_text(encoding="utf-8") if profile_path.exists() else ""
  231. taxonomy_text = taxonomy_path.read_text(encoding="utf-8") if taxonomy_path.exists() else ""
  232. profile_excerpt = self._extract_genre_section(profile_text, genre)
  233. taxonomy_excerpt = self._extract_genre_section(taxonomy_text, genre)
  234. refs = self._extract_markdown_refs(
  235. profile_excerpt,
  236. max_items=int(getattr(self.config, "context_genre_profile_max_refs", 8)),
  237. )
  238. return {
  239. "genre": genre,
  240. "profile_excerpt": profile_excerpt,
  241. "taxonomy_excerpt": taxonomy_excerpt,
  242. "reference_hints": refs,
  243. }
  244. def _extract_genre_section(self, text: str, genre: str) -> str:
  245. if not text:
  246. return ""
  247. lines = text.splitlines()
  248. capture: List[str] = []
  249. active = False
  250. target = genre.strip().lower()
  251. for line in lines:
  252. normalized = line.strip().lower()
  253. if normalized.startswith("## "):
  254. if active:
  255. break
  256. active = target in normalized
  257. if active:
  258. capture.append(line)
  259. continue
  260. if active:
  261. capture.append(line)
  262. if capture:
  263. return "\n".join(capture).strip()
  264. return "\n".join(lines[:80]).strip()
  265. def _extract_markdown_refs(self, text: str, max_items: int = 8) -> List[str]:
  266. if not text:
  267. return []
  268. refs: List[str] = []
  269. for line in text.splitlines():
  270. row = line.strip().lstrip("-*").strip()
  271. if not row or row.startswith("#"):
  272. continue
  273. refs.append(row)
  274. if len(refs) >= max(1, max_items):
  275. break
  276. return refs
  277. def _load_state(self) -> Dict[str, Any]:
  278. path = self.config.state_file
  279. if not path.exists():
  280. return {}
  281. return json.loads(path.read_text(encoding="utf-8"))
  282. def _load_outline(self, chapter: int) -> str:
  283. outline_dir = self.config.outline_dir
  284. patterns = [
  285. f"第{chapter}章*.md",
  286. f"第{chapter:02d}章*.md",
  287. f"第{chapter:03d}章*.md",
  288. f"第{chapter:04d}章*.md",
  289. ]
  290. for pattern in patterns:
  291. matches = list(outline_dir.glob(pattern))
  292. if matches:
  293. return matches[0].read_text(encoding="utf-8")
  294. return f"[大纲未找到: 第{chapter}章]"
  295. def _load_recent_summaries(self, chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  296. summaries = []
  297. for ch in range(max(1, chapter - window), chapter):
  298. summary = self._load_summary_text(ch)
  299. if summary:
  300. summaries.append(summary)
  301. return summaries
  302. def _load_recent_meta(self, state: Dict[str, Any], chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  303. meta = state.get("chapter_meta", {}) or {}
  304. results = []
  305. for ch in range(max(1, chapter - window), chapter):
  306. for key in (f"{ch:04d}", str(ch)):
  307. if key in meta:
  308. results.append({"chapter": ch, **meta.get(key, {})})
  309. break
  310. return results
  311. def _load_recent_appearances(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
  312. appearances = self.index_manager.get_recent_appearances(limit=limit)
  313. return appearances or []
  314. def _load_setting(self, keyword: str) -> str:
  315. settings_dir = self.config.settings_dir
  316. candidates = [
  317. settings_dir / f"{keyword}.md",
  318. ]
  319. for path in candidates:
  320. if path.exists():
  321. return path.read_text(encoding="utf-8")
  322. # fallback: any file containing keyword
  323. matches = list(settings_dir.glob(f"*{keyword}*.md"))
  324. if matches:
  325. return matches[0].read_text(encoding="utf-8")
  326. return f"[{keyword}设定未找到]"
  327. def _extract_summary_excerpt(self, text: str, max_chars: int) -> str:
  328. if not text:
  329. return ""
  330. match = self.SUMMARY_SECTION_RE.search(text)
  331. excerpt = match.group(1).strip() if match else text.strip()
  332. if max_chars > 0 and len(excerpt) > max_chars:
  333. return excerpt[:max_chars].rstrip()
  334. return excerpt
  335. def _load_summary_text(self, chapter: int, snippet_chars: Optional[int] = None) -> Optional[Dict[str, Any]]:
  336. summary_path = self.config.webnovel_dir / "summaries" / f"ch{chapter:04d}.md"
  337. if not summary_path.exists():
  338. return None
  339. text = summary_path.read_text(encoding="utf-8")
  340. if snippet_chars:
  341. summary_text = self._extract_summary_excerpt(text, snippet_chars)
  342. else:
  343. summary_text = text
  344. return {"chapter": chapter, "summary": summary_text}
  345. def _load_story_skeleton(self, chapter: int) -> List[Dict[str, Any]]:
  346. interval = max(1, int(self.config.context_story_skeleton_interval))
  347. max_samples = max(0, int(self.config.context_story_skeleton_max_samples))
  348. snippet_chars = int(self.config.context_story_skeleton_snippet_chars)
  349. if max_samples <= 0 or chapter <= interval:
  350. return []
  351. samples: List[Dict[str, Any]] = []
  352. cursor = chapter - interval
  353. while cursor >= 1 and len(samples) < max_samples:
  354. summary = self._load_summary_text(cursor, snippet_chars=snippet_chars)
  355. if summary and summary.get("summary"):
  356. samples.append(summary)
  357. cursor -= interval
  358. samples.reverse()
  359. return samples
  360. def _load_json_optional(self, path: Path) -> Dict[str, Any]:
  361. if not path.exists():
  362. return {}
  363. try:
  364. return json.loads(path.read_text(encoding="utf-8"))
  365. except json.JSONDecodeError:
  366. return {}
  367. def main():
  368. import argparse
  369. from .cli_output import print_success, print_error
  370. parser = argparse.ArgumentParser(description="Context Manager CLI")
  371. parser.add_argument("--project-root", type=str, help="项目根目录")
  372. parser.add_argument("--chapter", type=int, required=True)
  373. parser.add_argument("--template", type=str, default=ContextManager.DEFAULT_TEMPLATE)
  374. parser.add_argument("--no-snapshot", action="store_true")
  375. parser.add_argument("--max-chars", type=int, default=8000)
  376. args = parser.parse_args()
  377. config = None
  378. if args.project_root:
  379. from .config import DataModulesConfig
  380. config = DataModulesConfig.from_project_root(args.project_root)
  381. manager = ContextManager(config)
  382. try:
  383. payload = manager.build_context(
  384. chapter=args.chapter,
  385. template=args.template,
  386. use_snapshot=not args.no_snapshot,
  387. save_snapshot=True,
  388. max_chars=args.max_chars,
  389. )
  390. print_success(payload, message="context_built")
  391. try:
  392. manager.index_manager.log_tool_call("context_manager:build", True, chapter=args.chapter)
  393. except Exception:
  394. pass
  395. except Exception as exc:
  396. print_error("CONTEXT_BUILD_FAILED", str(exc), suggestion="请检查项目结构与依赖文件")
  397. try:
  398. manager.index_manager.log_tool_call(
  399. "context_manager:build", False, error_code="CONTEXT_BUILD_FAILED", error_message=str(exc), chapter=args.chapter
  400. )
  401. except Exception:
  402. pass
  403. if __name__ == "__main__":
  404. import sys
  405. if sys.platform == "win32":
  406. import io
  407. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
  408. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
  409. main()