context_manager.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. ContextManager - assemble context packs with weighted priorities.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import re
  9. from pathlib import Path
  10. from typing import Any, Dict, List, Optional
  11. from .config import get_config
  12. from .index_manager import IndexManager
  13. from .context_ranker import ContextRanker
  14. from .snapshot_manager import SnapshotManager, SnapshotVersionMismatch
  15. class ContextManager:
  16. DEFAULT_TEMPLATE = "plot"
  17. TEMPLATE_WEIGHTS = {
  18. "plot": {"core": 0.40, "scene": 0.35, "global": 0.25},
  19. "battle": {"core": 0.35, "scene": 0.45, "global": 0.20},
  20. "emotion": {"core": 0.45, "scene": 0.35, "global": 0.20},
  21. "transition": {"core": 0.50, "scene": 0.25, "global": 0.25},
  22. }
  23. EXTRA_SECTIONS = {
  24. "story_skeleton",
  25. "memory",
  26. "preferences",
  27. "alerts",
  28. "reader_signal",
  29. "genre_profile",
  30. "writing_guidance",
  31. }
  32. SECTION_ORDER = [
  33. "core",
  34. "scene",
  35. "global",
  36. "reader_signal",
  37. "genre_profile",
  38. "writing_guidance",
  39. "story_skeleton",
  40. "memory",
  41. "preferences",
  42. "alerts",
  43. ]
  44. SUMMARY_SECTION_RE = re.compile(r"##\s*剧情摘要\s*\r?\n(.*?)(?=\r?\n##|\Z)", re.DOTALL)
  45. def __init__(self, config=None, snapshot_manager: Optional[SnapshotManager] = None):
  46. self.config = config or get_config()
  47. self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
  48. self.index_manager = IndexManager(self.config)
  49. self.context_ranker = ContextRanker(self.config)
  50. def _is_snapshot_compatible(self, cached: Dict[str, Any], template: str) -> bool:
  51. """判断快照是否可用于当前模板。"""
  52. if not isinstance(cached, dict):
  53. return False
  54. meta = cached.get("meta")
  55. if not isinstance(meta, dict):
  56. # 兼容旧快照:未记录 template 时仅允许默认模板复用
  57. return template == self.DEFAULT_TEMPLATE
  58. cached_template = meta.get("template")
  59. if not isinstance(cached_template, str):
  60. return template == self.DEFAULT_TEMPLATE
  61. return cached_template == template
  62. def build_context(
  63. self,
  64. chapter: int,
  65. template: str | None = None,
  66. use_snapshot: bool = True,
  67. save_snapshot: bool = True,
  68. max_chars: Optional[int] = None,
  69. ) -> Dict[str, Any]:
  70. template = template or self.DEFAULT_TEMPLATE
  71. if template not in self.TEMPLATE_WEIGHTS:
  72. template = self.DEFAULT_TEMPLATE
  73. if use_snapshot:
  74. try:
  75. cached = self.snapshot_manager.load_snapshot(chapter)
  76. if cached and self._is_snapshot_compatible(cached, template):
  77. return cached.get("payload", cached)
  78. except SnapshotVersionMismatch:
  79. # Snapshot incompatible; rebuild below.
  80. pass
  81. pack = self._build_pack(chapter)
  82. if getattr(self.config, "context_ranker_enabled", True):
  83. pack = self.context_ranker.rank_pack(pack, chapter)
  84. assembled = self.assemble_context(pack, template=template, max_chars=max_chars)
  85. if save_snapshot:
  86. meta = {"template": template}
  87. self.snapshot_manager.save_snapshot(chapter, assembled, meta=meta)
  88. return assembled
  89. def assemble_context(
  90. self,
  91. pack: Dict[str, Any],
  92. template: str = DEFAULT_TEMPLATE,
  93. max_chars: Optional[int] = None,
  94. ) -> Dict[str, Any]:
  95. weights = self.TEMPLATE_WEIGHTS.get(template, self.TEMPLATE_WEIGHTS[self.DEFAULT_TEMPLATE])
  96. max_chars = max_chars or 8000
  97. extra_budget = int(self.config.context_extra_section_budget or 0)
  98. sections = {}
  99. for section_name in self.SECTION_ORDER:
  100. if section_name in pack:
  101. sections[section_name] = pack[section_name]
  102. assembled: Dict[str, Any] = {"meta": pack.get("meta", {}), "sections": {}}
  103. for name, content in sections.items():
  104. weight = weights.get(name, 0.0)
  105. if weight > 0:
  106. budget = int(max_chars * weight)
  107. elif name in self.EXTRA_SECTIONS and extra_budget > 0:
  108. budget = extra_budget
  109. else:
  110. budget = None
  111. text = self._compact_json_text(content, budget)
  112. assembled["sections"][name] = {"content": content, "text": text, "budget": budget}
  113. assembled["template"] = template
  114. assembled["weights"] = weights
  115. return assembled
  116. def filter_invalid_items(self, items: List[Dict[str, Any]], source_type: str, id_key: str) -> List[Dict[str, Any]]:
  117. confirmed = self.index_manager.get_invalid_ids(source_type, status="confirmed")
  118. pending = self.index_manager.get_invalid_ids(source_type, status="pending")
  119. result = []
  120. for item in items:
  121. item_id = str(item.get(id_key, ""))
  122. if item_id in confirmed:
  123. continue
  124. if item_id in pending:
  125. item = dict(item)
  126. item["warning"] = "pending_invalid"
  127. result.append(item)
  128. return result
  129. def apply_confidence_filter(self, items: List[Dict[str, Any]], min_confidence: float) -> List[Dict[str, Any]]:
  130. filtered: List[Dict[str, Any]] = []
  131. for item in items:
  132. conf = item.get("confidence")
  133. if conf is None or conf >= min_confidence:
  134. filtered.append(item)
  135. return filtered
  136. def _build_pack(self, chapter: int) -> Dict[str, Any]:
  137. state = self._load_state()
  138. core = {
  139. "chapter_outline": self._load_outline(chapter),
  140. "protagonist_snapshot": state.get("protagonist_state", {}),
  141. "recent_summaries": self._load_recent_summaries(
  142. chapter,
  143. window=self.config.context_recent_summaries_window,
  144. ),
  145. "recent_meta": self._load_recent_meta(
  146. state,
  147. chapter,
  148. window=self.config.context_recent_meta_window,
  149. ),
  150. }
  151. scene = {
  152. "location_context": state.get("protagonist_state", {}).get("location", {}),
  153. "appearing_characters": self._load_recent_appearances(
  154. limit=self.config.context_max_appearing_characters,
  155. ),
  156. }
  157. scene["appearing_characters"] = self.filter_invalid_items(
  158. scene["appearing_characters"], source_type="entity", id_key="entity_id"
  159. )
  160. global_ctx = {
  161. "worldview_skeleton": self._load_setting("世界观"),
  162. "power_system_skeleton": self._load_setting("力量体系"),
  163. "style_contract_ref": self._load_setting("风格契约"),
  164. }
  165. preferences = self._load_json_optional(self.config.webnovel_dir / "preferences.json")
  166. memory = self._load_json_optional(self.config.webnovel_dir / "project_memory.json")
  167. story_skeleton = self._load_story_skeleton(chapter)
  168. alert_slice = max(0, int(self.config.context_alerts_slice))
  169. reader_signal = self._load_reader_signal(chapter)
  170. genre_profile = self._load_genre_profile(state)
  171. writing_guidance = self._build_writing_guidance(chapter, reader_signal, genre_profile)
  172. return {
  173. "meta": {"chapter": chapter},
  174. "core": core,
  175. "scene": scene,
  176. "global": global_ctx,
  177. "reader_signal": reader_signal,
  178. "genre_profile": genre_profile,
  179. "writing_guidance": writing_guidance,
  180. "story_skeleton": story_skeleton,
  181. "preferences": preferences,
  182. "memory": memory,
  183. "alerts": {
  184. "disambiguation_warnings": (
  185. state.get("disambiguation_warnings", [])[-alert_slice:] if alert_slice else []
  186. ),
  187. "disambiguation_pending": (
  188. state.get("disambiguation_pending", [])[-alert_slice:] if alert_slice else []
  189. ),
  190. },
  191. }
  192. def _load_reader_signal(self, chapter: int) -> Dict[str, Any]:
  193. if not getattr(self.config, "context_reader_signal_enabled", True):
  194. return {}
  195. recent_limit = max(1, int(getattr(self.config, "context_reader_signal_recent_limit", 5)))
  196. pattern_window = max(1, int(getattr(self.config, "context_reader_signal_window_chapters", 20)))
  197. review_window = max(1, int(getattr(self.config, "context_reader_signal_review_window", 5)))
  198. include_debt = bool(getattr(self.config, "context_reader_signal_include_debt", False))
  199. recent_power = self.index_manager.get_recent_reading_power(limit=recent_limit)
  200. pattern_stats = self.index_manager.get_pattern_usage_stats(last_n_chapters=pattern_window)
  201. hook_stats = self.index_manager.get_hook_type_stats(last_n_chapters=pattern_window)
  202. review_trend = self.index_manager.get_review_trend_stats(last_n=review_window)
  203. low_score_ranges: List[Dict[str, Any]] = []
  204. for row in review_trend.get("recent_ranges", []):
  205. score = row.get("overall_score")
  206. if isinstance(score, (int, float)) and float(score) < 75:
  207. low_score_ranges.append(
  208. {
  209. "start_chapter": row.get("start_chapter"),
  210. "end_chapter": row.get("end_chapter"),
  211. "overall_score": score,
  212. }
  213. )
  214. signal: Dict[str, Any] = {
  215. "recent_reading_power": recent_power,
  216. "pattern_usage": pattern_stats,
  217. "hook_type_usage": hook_stats,
  218. "review_trend": review_trend,
  219. "low_score_ranges": low_score_ranges,
  220. "next_chapter": chapter,
  221. }
  222. if include_debt:
  223. signal["debt_summary"] = self.index_manager.get_debt_summary()
  224. return signal
  225. def _load_genre_profile(self, state: Dict[str, Any]) -> Dict[str, Any]:
  226. if not getattr(self.config, "context_genre_profile_enabled", True):
  227. return {}
  228. fallback = str(getattr(self.config, "context_genre_profile_fallback", "shuangwen") or "shuangwen")
  229. genre = str((state.get("project") or {}).get("genre") or fallback)
  230. profile_path = self.config.project_root / ".claude" / "references" / "genre-profiles.md"
  231. taxonomy_path = self.config.project_root / ".claude" / "references" / "reading-power-taxonomy.md"
  232. profile_text = profile_path.read_text(encoding="utf-8") if profile_path.exists() else ""
  233. taxonomy_text = taxonomy_path.read_text(encoding="utf-8") if taxonomy_path.exists() else ""
  234. profile_excerpt = self._extract_genre_section(profile_text, genre)
  235. taxonomy_excerpt = self._extract_genre_section(taxonomy_text, genre)
  236. refs = self._extract_markdown_refs(
  237. profile_excerpt,
  238. max_items=int(getattr(self.config, "context_genre_profile_max_refs", 8)),
  239. )
  240. return {
  241. "genre": genre,
  242. "profile_excerpt": profile_excerpt,
  243. "taxonomy_excerpt": taxonomy_excerpt,
  244. "reference_hints": refs,
  245. }
  246. def _build_writing_guidance(
  247. self,
  248. chapter: int,
  249. reader_signal: Dict[str, Any],
  250. genre_profile: Dict[str, Any],
  251. ) -> Dict[str, Any]:
  252. if not getattr(self.config, "context_writing_guidance_enabled", True):
  253. return {}
  254. guidance: List[str] = []
  255. limit = max(1, int(getattr(self.config, "context_writing_guidance_max_items", 6)))
  256. low_score_threshold = float(
  257. getattr(self.config, "context_writing_guidance_low_score_threshold", 75.0)
  258. )
  259. low_ranges = reader_signal.get("low_score_ranges") or []
  260. if low_ranges:
  261. worst = min(
  262. low_ranges,
  263. key=lambda row: float(row.get("overall_score", 9999)),
  264. )
  265. guidance.append(
  266. f"第{chapter}章优先修复近期低分段问题:参考{worst.get('start_chapter')}-{worst.get('end_chapter')}章,强化冲突推进与结尾钩子。"
  267. )
  268. hook_usage = reader_signal.get("hook_type_usage") or {}
  269. if hook_usage and getattr(self.config, "context_writing_guidance_hook_diversify", True):
  270. dominant_hook = max(hook_usage.items(), key=lambda kv: kv[1])[0]
  271. guidance.append(
  272. f"近期钩子类型“{dominant_hook}”使用偏多,本章建议做钩子差异化,避免连续同构。"
  273. )
  274. pattern_usage = reader_signal.get("pattern_usage") or {}
  275. if pattern_usage:
  276. top_pattern = max(pattern_usage.items(), key=lambda kv: kv[1])[0]
  277. guidance.append(
  278. f"爽点模式“{top_pattern}”近期高频,本章可保留主爽点但叠加一个新爽点副轴。"
  279. )
  280. review_trend = reader_signal.get("review_trend") or {}
  281. overall_avg = review_trend.get("overall_avg")
  282. if isinstance(overall_avg, (int, float)) and float(overall_avg) < low_score_threshold:
  283. guidance.append(
  284. f"最近审查均分{overall_avg:.1f}低于阈值{low_score_threshold:.1f},建议先保稳:减少跳场、每段补动作结果闭环。"
  285. )
  286. genre = str(genre_profile.get("genre") or "").strip()
  287. refs = genre_profile.get("reference_hints") or []
  288. if genre:
  289. guidance.append(f"题材锚定:按“{genre}”叙事主线推进,保持题材读者预期稳定兑现。")
  290. if refs:
  291. guidance.append(f"题材策略可执行提示:{refs[0]}")
  292. if not guidance:
  293. guidance.append("本章执行默认高可读策略:冲突前置、信息后置、段末留钩。")
  294. return {
  295. "chapter": chapter,
  296. "guidance_items": guidance[:limit],
  297. "signals_used": {
  298. "has_low_score_ranges": bool(low_ranges),
  299. "hook_types": list(hook_usage.keys())[:3],
  300. "top_patterns": sorted(
  301. pattern_usage,
  302. key=pattern_usage.get,
  303. reverse=True,
  304. )[:3],
  305. "genre": genre,
  306. },
  307. }
  308. def _compact_json_text(self, content: Any, budget: Optional[int]) -> str:
  309. raw = json.dumps(content, ensure_ascii=False)
  310. if budget is None or len(raw) <= budget:
  311. return raw
  312. if not getattr(self.config, "context_compact_text_enabled", True):
  313. return raw[:budget]
  314. min_budget = max(1, int(getattr(self.config, "context_compact_min_budget", 120)))
  315. if budget <= min_budget:
  316. return raw[:budget]
  317. head_ratio = float(getattr(self.config, "context_compact_head_ratio", 0.65))
  318. head_budget = int(budget * max(0.2, min(0.9, head_ratio)))
  319. tail_budget = max(0, budget - head_budget - 10)
  320. compact = f"{raw[:head_budget]}…[TRUNCATED]{raw[-tail_budget:] if tail_budget else ''}"
  321. return compact[:budget]
  322. def _extract_genre_section(self, text: str, genre: str) -> str:
  323. if not text:
  324. return ""
  325. lines = text.splitlines()
  326. capture: List[str] = []
  327. active = False
  328. target = genre.strip().lower()
  329. for line in lines:
  330. normalized = line.strip().lower()
  331. if normalized.startswith("## "):
  332. if active:
  333. break
  334. active = target in normalized
  335. if active:
  336. capture.append(line)
  337. continue
  338. if active:
  339. capture.append(line)
  340. if capture:
  341. return "\n".join(capture).strip()
  342. return "\n".join(lines[:80]).strip()
  343. def _extract_markdown_refs(self, text: str, max_items: int = 8) -> List[str]:
  344. if not text:
  345. return []
  346. refs: List[str] = []
  347. for line in text.splitlines():
  348. row = line.strip().lstrip("-*").strip()
  349. if not row or row.startswith("#"):
  350. continue
  351. refs.append(row)
  352. if len(refs) >= max(1, max_items):
  353. break
  354. return refs
  355. def _load_state(self) -> Dict[str, Any]:
  356. path = self.config.state_file
  357. if not path.exists():
  358. return {}
  359. return json.loads(path.read_text(encoding="utf-8"))
  360. def _load_outline(self, chapter: int) -> str:
  361. outline_dir = self.config.outline_dir
  362. patterns = [
  363. f"第{chapter}章*.md",
  364. f"第{chapter:02d}章*.md",
  365. f"第{chapter:03d}章*.md",
  366. f"第{chapter:04d}章*.md",
  367. ]
  368. for pattern in patterns:
  369. matches = list(outline_dir.glob(pattern))
  370. if matches:
  371. return matches[0].read_text(encoding="utf-8")
  372. return f"[大纲未找到: 第{chapter}章]"
  373. def _load_recent_summaries(self, chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  374. summaries = []
  375. for ch in range(max(1, chapter - window), chapter):
  376. summary = self._load_summary_text(ch)
  377. if summary:
  378. summaries.append(summary)
  379. return summaries
  380. def _load_recent_meta(self, state: Dict[str, Any], chapter: int, window: int = 3) -> List[Dict[str, Any]]:
  381. meta = state.get("chapter_meta", {}) or {}
  382. results = []
  383. for ch in range(max(1, chapter - window), chapter):
  384. for key in (f"{ch:04d}", str(ch)):
  385. if key in meta:
  386. results.append({"chapter": ch, **meta.get(key, {})})
  387. break
  388. return results
  389. def _load_recent_appearances(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
  390. appearances = self.index_manager.get_recent_appearances(limit=limit)
  391. return appearances or []
  392. def _load_setting(self, keyword: str) -> str:
  393. settings_dir = self.config.settings_dir
  394. candidates = [
  395. settings_dir / f"{keyword}.md",
  396. ]
  397. for path in candidates:
  398. if path.exists():
  399. return path.read_text(encoding="utf-8")
  400. # fallback: any file containing keyword
  401. matches = list(settings_dir.glob(f"*{keyword}*.md"))
  402. if matches:
  403. return matches[0].read_text(encoding="utf-8")
  404. return f"[{keyword}设定未找到]"
  405. def _extract_summary_excerpt(self, text: str, max_chars: int) -> str:
  406. if not text:
  407. return ""
  408. match = self.SUMMARY_SECTION_RE.search(text)
  409. excerpt = match.group(1).strip() if match else text.strip()
  410. if max_chars > 0 and len(excerpt) > max_chars:
  411. return excerpt[:max_chars].rstrip()
  412. return excerpt
  413. def _load_summary_text(self, chapter: int, snippet_chars: Optional[int] = None) -> Optional[Dict[str, Any]]:
  414. summary_path = self.config.webnovel_dir / "summaries" / f"ch{chapter:04d}.md"
  415. if not summary_path.exists():
  416. return None
  417. text = summary_path.read_text(encoding="utf-8")
  418. if snippet_chars:
  419. summary_text = self._extract_summary_excerpt(text, snippet_chars)
  420. else:
  421. summary_text = text
  422. return {"chapter": chapter, "summary": summary_text}
  423. def _load_story_skeleton(self, chapter: int) -> List[Dict[str, Any]]:
  424. interval = max(1, int(self.config.context_story_skeleton_interval))
  425. max_samples = max(0, int(self.config.context_story_skeleton_max_samples))
  426. snippet_chars = int(self.config.context_story_skeleton_snippet_chars)
  427. if max_samples <= 0 or chapter <= interval:
  428. return []
  429. samples: List[Dict[str, Any]] = []
  430. cursor = chapter - interval
  431. while cursor >= 1 and len(samples) < max_samples:
  432. summary = self._load_summary_text(cursor, snippet_chars=snippet_chars)
  433. if summary and summary.get("summary"):
  434. samples.append(summary)
  435. cursor -= interval
  436. samples.reverse()
  437. return samples
  438. def _load_json_optional(self, path: Path) -> Dict[str, Any]:
  439. if not path.exists():
  440. return {}
  441. try:
  442. return json.loads(path.read_text(encoding="utf-8"))
  443. except json.JSONDecodeError:
  444. return {}
  445. def main():
  446. import argparse
  447. from .cli_output import print_success, print_error
  448. parser = argparse.ArgumentParser(description="Context Manager CLI")
  449. parser.add_argument("--project-root", type=str, help="项目根目录")
  450. parser.add_argument("--chapter", type=int, required=True)
  451. parser.add_argument("--template", type=str, default=ContextManager.DEFAULT_TEMPLATE)
  452. parser.add_argument("--no-snapshot", action="store_true")
  453. parser.add_argument("--max-chars", type=int, default=8000)
  454. args = parser.parse_args()
  455. config = None
  456. if args.project_root:
  457. from .config import DataModulesConfig
  458. config = DataModulesConfig.from_project_root(args.project_root)
  459. manager = ContextManager(config)
  460. try:
  461. payload = manager.build_context(
  462. chapter=args.chapter,
  463. template=args.template,
  464. use_snapshot=not args.no_snapshot,
  465. save_snapshot=True,
  466. max_chars=args.max_chars,
  467. )
  468. print_success(payload, message="context_built")
  469. try:
  470. manager.index_manager.log_tool_call("context_manager:build", True, chapter=args.chapter)
  471. except Exception:
  472. pass
  473. except Exception as exc:
  474. print_error("CONTEXT_BUILD_FAILED", str(exc), suggestion="请检查项目结构与依赖文件")
  475. try:
  476. manager.index_manager.log_tool_call(
  477. "context_manager:build", False, error_code="CONTEXT_BUILD_FAILED", error_message=str(exc), chapter=args.chapter
  478. )
  479. except Exception:
  480. pass
  481. if __name__ == "__main__":
  482. import sys
  483. if sys.platform == "win32":
  484. import io
  485. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
  486. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
  487. main()