#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ContextManager - assemble context packs with weighted priorities. """ from __future__ import annotations import json import re import sys import logging from pathlib import Path from runtime_compat import enable_windows_utf8_stdio from typing import Any, Dict, List, Optional from .config import get_config from .index_manager import IndexManager, WritingChecklistScoreMeta from .context_ranker import ContextRanker from .snapshot_manager import SnapshotManager, SnapshotVersionMismatch from .context_weights import ( DEFAULT_TEMPLATE as CONTEXT_DEFAULT_TEMPLATE, TEMPLATE_WEIGHTS as CONTEXT_TEMPLATE_WEIGHTS, TEMPLATE_WEIGHTS_DYNAMIC_DEFAULT as CONTEXT_TEMPLATE_WEIGHTS_DYNAMIC_DEFAULT, ) from .genre_aliases import normalize_genre_token from .genre_profile_builder import ( build_composite_genre_hints, extract_genre_section, extract_markdown_refs, parse_genre_tokens, ) from .writing_guidance_builder import ( build_guidance_items, build_writing_checklist, is_checklist_item_completed, ) logger = logging.getLogger(__name__) class ContextManager: DEFAULT_TEMPLATE = CONTEXT_DEFAULT_TEMPLATE TEMPLATE_WEIGHTS = CONTEXT_TEMPLATE_WEIGHTS TEMPLATE_WEIGHTS_DYNAMIC = CONTEXT_TEMPLATE_WEIGHTS_DYNAMIC_DEFAULT EXTRA_SECTIONS = { "story_skeleton", "memory", "preferences", "alerts", "reader_signal", "genre_profile", "writing_guidance", } SECTION_ORDER = [ "core", "scene", "global", "reader_signal", "genre_profile", "writing_guidance", "story_skeleton", "memory", "preferences", "alerts", ] SUMMARY_SECTION_RE = re.compile(r"##\s*剧情摘要\s*\r?\n(.*?)(?=\r?\n##|\Z)", re.DOTALL) def __init__(self, config=None, snapshot_manager: Optional[SnapshotManager] = None): self.config = config or get_config() self.snapshot_manager = snapshot_manager or SnapshotManager(self.config) self.index_manager = IndexManager(self.config) self.context_ranker = ContextRanker(self.config) def _is_snapshot_compatible(self, cached: Dict[str, Any], template: str) -> bool: """判断快照是否可用于当前模板。""" if not isinstance(cached, dict): return False meta = cached.get("meta") if not isinstance(meta, dict): # 兼容旧快照:未记录 template 时仅允许默认模板复用 return template == self.DEFAULT_TEMPLATE cached_template = meta.get("template") if not isinstance(cached_template, str): return template == self.DEFAULT_TEMPLATE return cached_template == template def build_context( self, chapter: int, template: str | None = None, use_snapshot: bool = True, save_snapshot: bool = True, max_chars: Optional[int] = None, ) -> Dict[str, Any]: template = template or self.DEFAULT_TEMPLATE self._active_template = template if template not in self.TEMPLATE_WEIGHTS: template = self.DEFAULT_TEMPLATE self._active_template = template if use_snapshot: try: cached = self.snapshot_manager.load_snapshot(chapter) if cached and self._is_snapshot_compatible(cached, template): return cached.get("payload", cached) except SnapshotVersionMismatch: # Snapshot incompatible; rebuild below. pass pack = self._build_pack(chapter) if getattr(self.config, "context_ranker_enabled", True): pack = self.context_ranker.rank_pack(pack, chapter) assembled = self.assemble_context(pack, template=template, max_chars=max_chars) if save_snapshot: meta = {"template": template} self.snapshot_manager.save_snapshot(chapter, assembled, meta=meta) return assembled def assemble_context( self, pack: Dict[str, Any], template: str = DEFAULT_TEMPLATE, max_chars: Optional[int] = None, ) -> Dict[str, Any]: chapter = int((pack.get("meta") or {}).get("chapter") or 0) weights = self._resolve_template_weights(template=template, chapter=chapter) max_chars = max_chars or 8000 extra_budget = int(self.config.context_extra_section_budget or 0) sections = {} for section_name in self.SECTION_ORDER: if section_name in pack: sections[section_name] = pack[section_name] assembled: Dict[str, Any] = {"meta": pack.get("meta", {}), "sections": {}} for name, content in sections.items(): weight = weights.get(name, 0.0) if weight > 0: budget = int(max_chars * weight) elif name in self.EXTRA_SECTIONS and extra_budget > 0: budget = extra_budget else: budget = None text = self._compact_json_text(content, budget) assembled["sections"][name] = {"content": content, "text": text, "budget": budget} assembled["template"] = template assembled["weights"] = weights if chapter > 0: assembled.setdefault("meta", {})["context_weight_stage"] = self._resolve_context_stage(chapter) return assembled def filter_invalid_items(self, items: List[Dict[str, Any]], source_type: str, id_key: str) -> List[Dict[str, Any]]: confirmed = self.index_manager.get_invalid_ids(source_type, status="confirmed") pending = self.index_manager.get_invalid_ids(source_type, status="pending") result = [] for item in items: item_id = str(item.get(id_key, "")) if item_id in confirmed: continue if item_id in pending: item = dict(item) item["warning"] = "pending_invalid" result.append(item) return result def apply_confidence_filter(self, items: List[Dict[str, Any]], min_confidence: float) -> List[Dict[str, Any]]: filtered: List[Dict[str, Any]] = [] for item in items: conf = item.get("confidence") if conf is None or conf >= min_confidence: filtered.append(item) return filtered def _build_pack(self, chapter: int) -> Dict[str, Any]: state = self._load_state() core = { "chapter_outline": self._load_outline(chapter), "protagonist_snapshot": state.get("protagonist_state", {}), "recent_summaries": self._load_recent_summaries( chapter, window=self.config.context_recent_summaries_window, ), "recent_meta": self._load_recent_meta( state, chapter, window=self.config.context_recent_meta_window, ), } scene = { "location_context": state.get("protagonist_state", {}).get("location", {}), "appearing_characters": self._load_recent_appearances( limit=self.config.context_max_appearing_characters, ), } scene["appearing_characters"] = self.filter_invalid_items( scene["appearing_characters"], source_type="entity", id_key="entity_id" ) global_ctx = { "worldview_skeleton": self._load_setting("世界观"), "power_system_skeleton": self._load_setting("力量体系"), "style_contract_ref": self._load_setting("风格契约"), } preferences = self._load_json_optional(self.config.webnovel_dir / "preferences.json") memory = self._load_json_optional(self.config.webnovel_dir / "project_memory.json") story_skeleton = self._load_story_skeleton(chapter) alert_slice = max(0, int(self.config.context_alerts_slice)) reader_signal = self._load_reader_signal(chapter) genre_profile = self._load_genre_profile(state) writing_guidance = self._build_writing_guidance(chapter, reader_signal, genre_profile) return { "meta": {"chapter": chapter}, "core": core, "scene": scene, "global": global_ctx, "reader_signal": reader_signal, "genre_profile": genre_profile, "writing_guidance": writing_guidance, "story_skeleton": story_skeleton, "preferences": preferences, "memory": memory, "alerts": { "disambiguation_warnings": ( state.get("disambiguation_warnings", [])[-alert_slice:] if alert_slice else [] ), "disambiguation_pending": ( state.get("disambiguation_pending", [])[-alert_slice:] if alert_slice else [] ), }, } def _load_reader_signal(self, chapter: int) -> Dict[str, Any]: if not getattr(self.config, "context_reader_signal_enabled", True): return {} recent_limit = max(1, int(getattr(self.config, "context_reader_signal_recent_limit", 5))) pattern_window = max(1, int(getattr(self.config, "context_reader_signal_window_chapters", 20))) review_window = max(1, int(getattr(self.config, "context_reader_signal_review_window", 5))) include_debt = bool(getattr(self.config, "context_reader_signal_include_debt", False)) recent_power = self.index_manager.get_recent_reading_power(limit=recent_limit) pattern_stats = self.index_manager.get_pattern_usage_stats(last_n_chapters=pattern_window) hook_stats = self.index_manager.get_hook_type_stats(last_n_chapters=pattern_window) review_trend = self.index_manager.get_review_trend_stats(last_n=review_window) low_score_ranges: List[Dict[str, Any]] = [] for row in review_trend.get("recent_ranges", []): score = row.get("overall_score") if isinstance(score, (int, float)) and float(score) < 75: low_score_ranges.append( { "start_chapter": row.get("start_chapter"), "end_chapter": row.get("end_chapter"), "overall_score": score, } ) signal: Dict[str, Any] = { "recent_reading_power": recent_power, "pattern_usage": pattern_stats, "hook_type_usage": hook_stats, "review_trend": review_trend, "low_score_ranges": low_score_ranges, "next_chapter": chapter, } if include_debt: signal["debt_summary"] = self.index_manager.get_debt_summary() return signal def _load_genre_profile(self, state: Dict[str, Any]) -> Dict[str, Any]: if not getattr(self.config, "context_genre_profile_enabled", True): return {} fallback = str(getattr(self.config, "context_genre_profile_fallback", "shuangwen") or "shuangwen") project = state.get("project") or {} project_info = state.get("project_info") or {} genre_raw = str(project.get("genre") or project_info.get("genre") or fallback) genres = self._parse_genre_tokens(genre_raw) if not genres: genres = [fallback] max_genres = max(1, int(getattr(self.config, "context_genre_profile_max_genres", 2))) genres = genres[:max_genres] primary_genre = genres[0] secondary_genres = genres[1:] composite = len(genres) > 1 profile_path = self.config.project_root / ".claude" / "references" / "genre-profiles.md" taxonomy_path = self.config.project_root / ".claude" / "references" / "reading-power-taxonomy.md" profile_text = profile_path.read_text(encoding="utf-8") if profile_path.exists() else "" taxonomy_text = taxonomy_path.read_text(encoding="utf-8") if taxonomy_path.exists() else "" profile_excerpt = self._extract_genre_section(profile_text, primary_genre) taxonomy_excerpt = self._extract_genre_section(taxonomy_text, primary_genre) secondary_profiles: List[str] = [] secondary_taxonomies: List[str] = [] for extra in secondary_genres: secondary_profiles.append(self._extract_genre_section(profile_text, extra)) secondary_taxonomies.append(self._extract_genre_section(taxonomy_text, extra)) refs = self._extract_markdown_refs( "\n".join([profile_excerpt] + secondary_profiles), max_items=int(getattr(self.config, "context_genre_profile_max_refs", 8)), ) composite_hints = self._build_composite_genre_hints(genres, refs) return { "genre": primary_genre, "genre_raw": genre_raw, "genres": genres, "composite": composite, "secondary_genres": secondary_genres, "profile_excerpt": profile_excerpt, "taxonomy_excerpt": taxonomy_excerpt, "secondary_profile_excerpts": secondary_profiles, "secondary_taxonomy_excerpts": secondary_taxonomies, "reference_hints": refs, "composite_hints": composite_hints, } def _build_writing_guidance( self, chapter: int, reader_signal: Dict[str, Any], genre_profile: Dict[str, Any], ) -> Dict[str, Any]: if not getattr(self.config, "context_writing_guidance_enabled", True): return {} limit = max(1, int(getattr(self.config, "context_writing_guidance_max_items", 6))) low_score_threshold = float( getattr(self.config, "context_writing_guidance_low_score_threshold", 75.0) ) guidance_bundle = build_guidance_items( chapter=chapter, reader_signal=reader_signal, genre_profile=genre_profile, low_score_threshold=low_score_threshold, hook_diversify_enabled=bool( getattr(self.config, "context_writing_guidance_hook_diversify", True) ), ) guidance = list(guidance_bundle.get("guidance") or []) checklist = self._build_writing_checklist( chapter=chapter, guidance_items=guidance, reader_signal=reader_signal, genre_profile=genre_profile, ) checklist_score = self._compute_writing_checklist_score( chapter=chapter, checklist=checklist, reader_signal=reader_signal, ) if getattr(self.config, "context_writing_score_persist_enabled", True): self._persist_writing_checklist_score(checklist_score) low_ranges = guidance_bundle.get("low_ranges") or [] hook_usage = guidance_bundle.get("hook_usage") or {} pattern_usage = guidance_bundle.get("pattern_usage") or {} genre = str(guidance_bundle.get("genre") or genre_profile.get("genre") or "").strip() hook_types = list(hook_usage.keys())[:3] if isinstance(hook_usage, dict) else [] top_patterns = ( sorted(pattern_usage, key=pattern_usage.get, reverse=True)[:3] if isinstance(pattern_usage, dict) else [] ) return { "chapter": chapter, "guidance_items": guidance[:limit], "checklist": checklist, "checklist_score": checklist_score, "signals_used": { "has_low_score_ranges": bool(low_ranges), "hook_types": hook_types, "top_patterns": top_patterns, "genre": genre, }, } def _compute_writing_checklist_score( self, chapter: int, checklist: List[Dict[str, Any]], reader_signal: Dict[str, Any], ) -> Dict[str, Any]: total_items = len(checklist) required_items = 0 completed_items = 0 completed_required = 0 total_weight = 0.0 completed_weight = 0.0 pending_labels: List[str] = [] for item in checklist: if not isinstance(item, dict): continue required = bool(item.get("required")) weight = float(item.get("weight") or 1.0) total_weight += weight if required: required_items += 1 completed = self._is_checklist_item_completed(item, reader_signal) if completed: completed_items += 1 completed_weight += weight if required: completed_required += 1 else: pending_labels.append(str(item.get("label") or item.get("id") or "未命名项")) completion_rate = (completed_items / total_items) if total_items > 0 else 1.0 weighted_rate = (completed_weight / total_weight) if total_weight > 0 else completion_rate required_rate = (completed_required / required_items) if required_items > 0 else 1.0 score = 100.0 * (0.5 * weighted_rate + 0.3 * required_rate + 0.2 * completion_rate) if getattr(self.config, "context_writing_score_include_reader_trend", True): trend_window = max(1, int(getattr(self.config, "context_writing_score_trend_window", 10))) trend = self.index_manager.get_writing_checklist_score_trend(last_n=trend_window) baseline = float(trend.get("score_avg") or 0.0) if baseline > 0: score += max(-10.0, min(10.0, (score - baseline) * 0.1)) score = round(max(0.0, min(100.0, score)), 2) return { "chapter": chapter, "score": score, "completion_rate": round(completion_rate, 4), "weighted_completion_rate": round(weighted_rate, 4), "required_completion_rate": round(required_rate, 4), "total_items": total_items, "required_items": required_items, "completed_items": completed_items, "completed_required": completed_required, "total_weight": round(total_weight, 2), "completed_weight": round(completed_weight, 2), "pending_items": pending_labels, "trend_window": int(getattr(self.config, "context_writing_score_trend_window", 10)), } def _is_checklist_item_completed(self, item: Dict[str, Any], reader_signal: Dict[str, Any]) -> bool: return is_checklist_item_completed(item, reader_signal) def _persist_writing_checklist_score(self, checklist_score: Dict[str, Any]) -> None: if not checklist_score: return try: self.index_manager.save_writing_checklist_score( WritingChecklistScoreMeta( chapter=int(checklist_score.get("chapter") or 0), template=str(getattr(self, "_active_template", self.DEFAULT_TEMPLATE) or self.DEFAULT_TEMPLATE), total_items=int(checklist_score.get("total_items") or 0), required_items=int(checklist_score.get("required_items") or 0), completed_items=int(checklist_score.get("completed_items") or 0), completed_required=int(checklist_score.get("completed_required") or 0), total_weight=float(checklist_score.get("total_weight") or 0.0), completed_weight=float(checklist_score.get("completed_weight") or 0.0), completion_rate=float(checklist_score.get("completion_rate") or 0.0), score=float(checklist_score.get("score") or 0.0), score_breakdown={ "weighted_completion_rate": checklist_score.get("weighted_completion_rate"), "required_completion_rate": checklist_score.get("required_completion_rate"), "trend_window": checklist_score.get("trend_window"), }, pending_items=list(checklist_score.get("pending_items") or []), source="context_manager", ) ) except Exception as exc: logger.warning("failed to persist writing checklist score: %s", exc) def _resolve_context_stage(self, chapter: int) -> str: early = max(1, int(getattr(self.config, "context_dynamic_budget_early_chapter", 30))) late = max(early + 1, int(getattr(self.config, "context_dynamic_budget_late_chapter", 120))) if chapter <= early: return "early" if chapter >= late: return "late" return "mid" def _resolve_template_weights(self, template: str, chapter: int) -> Dict[str, float]: template_key = template if template in self.TEMPLATE_WEIGHTS else self.DEFAULT_TEMPLATE base = dict(self.TEMPLATE_WEIGHTS.get(template_key, self.TEMPLATE_WEIGHTS[self.DEFAULT_TEMPLATE])) if not getattr(self.config, "context_dynamic_budget_enabled", True): return base stage = self._resolve_context_stage(chapter) dynamic_weights = getattr(self.config, "context_template_weights_dynamic", None) if not isinstance(dynamic_weights, dict): dynamic_weights = self.TEMPLATE_WEIGHTS_DYNAMIC stage_weights = dynamic_weights.get(stage, {}) if isinstance(dynamic_weights.get(stage, {}), dict) else {} staged = stage_weights.get(template_key) if isinstance(staged, dict): return dict(staged) return base def _parse_genre_tokens(self, genre_raw: str) -> List[str]: support_composite = bool(getattr(self.config, "context_genre_profile_support_composite", True)) separators_raw = getattr(self.config, "context_genre_profile_separators", ("+", "/", "|", ",")) separators = tuple(str(token) for token in separators_raw if str(token)) return parse_genre_tokens( genre_raw, support_composite=support_composite, separators=separators, ) def _normalize_genre_token(self, token: str) -> str: return normalize_genre_token(token) def _build_composite_genre_hints(self, genres: List[str], refs: List[str]) -> List[str]: return build_composite_genre_hints(genres, refs) def _build_writing_checklist( self, chapter: int, guidance_items: List[str], reader_signal: Dict[str, Any], genre_profile: Dict[str, Any], ) -> List[Dict[str, Any]]: _ = chapter if not getattr(self.config, "context_writing_checklist_enabled", True): return [] min_items = max(1, int(getattr(self.config, "context_writing_checklist_min_items", 3))) max_items = max(min_items, int(getattr(self.config, "context_writing_checklist_max_items", 6))) default_weight = float(getattr(self.config, "context_writing_checklist_default_weight", 1.0)) if default_weight <= 0: default_weight = 1.0 return build_writing_checklist( guidance_items=guidance_items, reader_signal=reader_signal, genre_profile=genre_profile, min_items=min_items, max_items=max_items, default_weight=default_weight, ) def _compact_json_text(self, content: Any, budget: Optional[int]) -> str: raw = json.dumps(content, ensure_ascii=False) if budget is None or len(raw) <= budget: return raw if not getattr(self.config, "context_compact_text_enabled", True): return raw[:budget] min_budget = max(1, int(getattr(self.config, "context_compact_min_budget", 120))) if budget <= min_budget: return raw[:budget] head_ratio = float(getattr(self.config, "context_compact_head_ratio", 0.65)) head_budget = int(budget * max(0.2, min(0.9, head_ratio))) tail_budget = max(0, budget - head_budget - 10) compact = f"{raw[:head_budget]}…[TRUNCATED]{raw[-tail_budget:] if tail_budget else ''}" return compact[:budget] def _extract_genre_section(self, text: str, genre: str) -> str: return extract_genre_section(text, genre) def _extract_markdown_refs(self, text: str, max_items: int = 8) -> List[str]: return extract_markdown_refs(text, max_items=max_items) def _load_state(self) -> Dict[str, Any]: path = self.config.state_file if not path.exists(): return {} return json.loads(path.read_text(encoding="utf-8")) def _load_outline(self, chapter: int) -> str: outline_dir = self.config.outline_dir patterns = [ f"第{chapter}章*.md", f"第{chapter:02d}章*.md", f"第{chapter:03d}章*.md", f"第{chapter:04d}章*.md", ] for pattern in patterns: matches = list(outline_dir.glob(pattern)) if matches: return matches[0].read_text(encoding="utf-8") return f"[大纲未找到: 第{chapter}章]" def _load_recent_summaries(self, chapter: int, window: int = 3) -> List[Dict[str, Any]]: summaries = [] for ch in range(max(1, chapter - window), chapter): summary = self._load_summary_text(ch) if summary: summaries.append(summary) return summaries def _load_recent_meta(self, state: Dict[str, Any], chapter: int, window: int = 3) -> List[Dict[str, Any]]: meta = state.get("chapter_meta", {}) or {} results = [] for ch in range(max(1, chapter - window), chapter): for key in (f"{ch:04d}", str(ch)): if key in meta: results.append({"chapter": ch, **meta.get(key, {})}) break return results def _load_recent_appearances(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: appearances = self.index_manager.get_recent_appearances(limit=limit) return appearances or [] def _load_setting(self, keyword: str) -> str: settings_dir = self.config.settings_dir candidates = [ settings_dir / f"{keyword}.md", ] for path in candidates: if path.exists(): return path.read_text(encoding="utf-8") # fallback: any file containing keyword matches = list(settings_dir.glob(f"*{keyword}*.md")) if matches: return matches[0].read_text(encoding="utf-8") return f"[{keyword}设定未找到]" def _extract_summary_excerpt(self, text: str, max_chars: int) -> str: if not text: return "" match = self.SUMMARY_SECTION_RE.search(text) excerpt = match.group(1).strip() if match else text.strip() if max_chars > 0 and len(excerpt) > max_chars: return excerpt[:max_chars].rstrip() return excerpt def _load_summary_text(self, chapter: int, snippet_chars: Optional[int] = None) -> Optional[Dict[str, Any]]: summary_path = self.config.webnovel_dir / "summaries" / f"ch{chapter:04d}.md" if not summary_path.exists(): return None text = summary_path.read_text(encoding="utf-8") if snippet_chars: summary_text = self._extract_summary_excerpt(text, snippet_chars) else: summary_text = text return {"chapter": chapter, "summary": summary_text} def _load_story_skeleton(self, chapter: int) -> List[Dict[str, Any]]: interval = max(1, int(self.config.context_story_skeleton_interval)) max_samples = max(0, int(self.config.context_story_skeleton_max_samples)) snippet_chars = int(self.config.context_story_skeleton_snippet_chars) if max_samples <= 0 or chapter <= interval: return [] samples: List[Dict[str, Any]] = [] cursor = chapter - interval while cursor >= 1 and len(samples) < max_samples: summary = self._load_summary_text(cursor, snippet_chars=snippet_chars) if summary and summary.get("summary"): samples.append(summary) cursor -= interval samples.reverse() return samples def _load_json_optional(self, path: Path) -> Dict[str, Any]: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: return {} def main(): import argparse from .cli_output import print_success, print_error parser = argparse.ArgumentParser(description="Context Manager CLI") parser.add_argument("--project-root", type=str, help="项目根目录") parser.add_argument("--chapter", type=int, required=True) parser.add_argument("--template", type=str, default=ContextManager.DEFAULT_TEMPLATE) parser.add_argument("--no-snapshot", action="store_true") parser.add_argument("--max-chars", type=int, default=8000) args = parser.parse_args() config = None if args.project_root: from .config import DataModulesConfig config = DataModulesConfig.from_project_root(args.project_root) manager = ContextManager(config) try: payload = manager.build_context( chapter=args.chapter, template=args.template, use_snapshot=not args.no_snapshot, save_snapshot=True, max_chars=args.max_chars, ) print_success(payload, message="context_built") try: manager.index_manager.log_tool_call("context_manager:build", True, chapter=args.chapter) except Exception as exc: logger.warning("failed to log successful tool call: %s", exc) except Exception as exc: print_error("CONTEXT_BUILD_FAILED", str(exc), suggestion="请检查项目结构与依赖文件") try: manager.index_manager.log_tool_call( "context_manager:build", False, error_code="CONTEXT_BUILD_FAILED", error_message=str(exc), chapter=args.chapter ) except Exception as log_exc: logger.warning("failed to log failed tool call: %s", log_exc) if __name__ == "__main__": import sys if sys.platform == "win32": enable_windows_utf8_stdio() main()