#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ extract_chapter_context.py - extract chapter writing context Features: - chapter outline snippet - previous chapter summaries (prefers .webnovel/summaries) - compact state summary - ContextManager contract sections (reader_signal / genre_profile / writing_guidance) """ from __future__ import annotations import argparse import json import re import sys from pathlib import Path from typing import Any, Dict, List from runtime_compat import enable_windows_utf8_stdio try: from chapter_paths import find_chapter_file except ImportError: # pragma: no cover from scripts.chapter_paths import find_chapter_file def _ensure_scripts_path(): scripts_dir = Path(__file__).resolve().parent if str(scripts_dir) not in sys.path: sys.path.insert(0, str(scripts_dir)) def find_project_root(start_path: Path | None = None) -> Path: """Find project root containing `.webnovel` directory.""" if start_path is None: start_path = Path.cwd() search_paths = [ start_path, start_path / "webnovel-project", start_path.parent, ] for path in search_paths: if (path / ".webnovel").exists(): return path raise FileNotFoundError("未找到 .webnovel 目录,请确认项目路径") def extract_chapter_outline(project_root: Path, chapter_num: int) -> str: """Extract chapter outline segment from volume outline file.""" volume_num = (chapter_num - 1) // 50 + 1 outline_file = project_root / "大纲" / f"第{volume_num}卷 详细大纲.md" if not outline_file.exists(): return f"⚠️ 大纲文件不存在: {outline_file}" content = outline_file.read_text(encoding="utf-8") pattern = rf"###\s*第\s*{chapter_num}\s*章[::]\s*(.+?)(?=###\s*第\s*\d+\s*章|##\s|$)" match = re.search(pattern, content, re.DOTALL) if not match: pattern2 = rf"###\s*第{chapter_num}章[::]\s*(.+?)(?=###\s*第\d+章|##\s|$)" match = re.search(pattern2, content, re.DOTALL) if match: outline = match.group(0).strip() if len(outline) > 1500: outline = outline[:1500] + "\n...(已截断)" return outline return f"⚠️ 未找到第 {chapter_num} 章的大纲" def _load_summary_file(project_root: Path, chapter_num: int) -> str: """Load summary section from `.webnovel/summaries/chNNNN.md`.""" summary_path = project_root / ".webnovel" / "summaries" / f"ch{chapter_num:04d}.md" if not summary_path.exists(): return "" text = summary_path.read_text(encoding="utf-8") summary_match = re.search(r"##\s*剧情摘要\s*\r?\n(.+?)(?=\r?\n##|$)", text, re.DOTALL) if summary_match: return summary_match.group(1).strip() return "" def extract_chapter_summary(project_root: Path, chapter_num: int) -> str: """Extract chapter summary, fallback to chapter body head.""" summary = _load_summary_file(project_root, chapter_num) if summary: return summary chapter_file = find_chapter_file(project_root, chapter_num) if not chapter_file or not chapter_file.exists(): return f"⚠️ 第{chapter_num}章文件不存在" content = chapter_file.read_text(encoding="utf-8") summary_match = re.search(r"##\s*本章摘要\s*\r?\n(.+?)(?=\r?\n##|$)", content, re.DOTALL) if summary_match: return summary_match.group(1).strip() stats_match = re.search(r"##\s*本章统计\s*\r?\n(.+?)(?=\r?\n##|$)", content, re.DOTALL) if stats_match: return f"[无摘要,仅统计]\n{stats_match.group(1).strip()}" lines = content.split("\n") text_lines = [line for line in lines if not line.startswith("#") and line.strip()] text = "\n".join(text_lines)[:500] return f"[自动截取前500字]\n{text}..." def extract_state_summary(project_root: Path) -> str: """Extract key fields from `.webnovel/state.json`.""" state_file = project_root / ".webnovel" / "state.json" if not state_file.exists(): return "⚠️ state.json 不存在" state = json.loads(state_file.read_text(encoding="utf-8")) summary_parts: List[str] = [] if "progress" in state: progress = state["progress"] summary_parts.append( f"**进度**: 第{progress.get('current_chapter', '?')}章 / {progress.get('total_words', '?')}字" ) if "protagonist_state" in state: ps = state["protagonist_state"] power = ps.get("power", {}) summary_parts.append(f"**主角实力**: {power.get('realm', '?')} {power.get('layer', '?')}层") summary_parts.append(f"**当前位置**: {ps.get('location', '?')}") golden_finger = ps.get("golden_finger", {}) summary_parts.append( f"**金手指**: {golden_finger.get('name', '?')} Lv.{golden_finger.get('level', '?')}" ) if "strand_tracker" in state: tracker = state["strand_tracker"] history = tracker.get("history", [])[-5:] if history: items: List[str] = [] for row in history: if not isinstance(row, dict): continue chapter = row.get("chapter", "?") strand = row.get("strand") or row.get("dominant") or "unknown" items.append(f"Ch{chapter}:{strand}") if items: summary_parts.append(f"**近5章Strand**: {', '.join(items)}") plot_threads = state.get("plot_threads", {}) if isinstance(state.get("plot_threads"), dict) else {} foreshadowing = plot_threads.get("foreshadowing", []) if isinstance(foreshadowing, list) and foreshadowing: active = [row for row in foreshadowing if row.get("status") in {"active", "未回收"}] urgent = [row for row in active if row.get("urgency", 0) > 50] if urgent: urgent_list = [ f"{row.get('content', '?')[:30]}... (紧急度:{row.get('urgency')})" for row in urgent[:3] ] summary_parts.append(f"**紧急伏笔**: {'; '.join(urgent_list)}") return "\n".join(summary_parts) def _load_contract_context(project_root: Path, chapter_num: int) -> Dict[str, Any]: """Build context via ContextManager and return selected sections.""" _ensure_scripts_path() from data_modules.config import DataModulesConfig from data_modules.context_manager import ContextManager config = DataModulesConfig.from_project_root(project_root) manager = ContextManager(config) payload = manager.build_context( chapter=chapter_num, template="plot", use_snapshot=True, save_snapshot=True, max_chars=8000, ) sections = payload.get("sections", {}) return { "context_contract_version": (payload.get("meta") or {}).get("context_contract_version"), "context_weight_stage": (payload.get("meta") or {}).get("context_weight_stage"), "reader_signal": (sections.get("reader_signal") or {}).get("content", {}), "genre_profile": (sections.get("genre_profile") or {}).get("content", {}), "writing_guidance": (sections.get("writing_guidance") or {}).get("content", {}), } def build_chapter_context_payload(project_root: Path, chapter_num: int) -> Dict[str, Any]: """Assemble full chapter context payload for text/json output.""" outline = extract_chapter_outline(project_root, chapter_num) prev_summaries = [] for prev_ch in range(max(1, chapter_num - 2), chapter_num): summary = extract_chapter_summary(project_root, prev_ch) prev_summaries.append(f"### 第{prev_ch}章摘要\n{summary}") state_summary = extract_state_summary(project_root) contract_context = _load_contract_context(project_root, chapter_num) return { "chapter": chapter_num, "outline": outline, "previous_summaries": prev_summaries, "state_summary": state_summary, "context_contract_version": contract_context.get("context_contract_version"), "context_weight_stage": contract_context.get("context_weight_stage"), "reader_signal": contract_context.get("reader_signal", {}), "genre_profile": contract_context.get("genre_profile", {}), "writing_guidance": contract_context.get("writing_guidance", {}), } def _render_text(payload: Dict[str, Any]) -> str: chapter_num = payload.get("chapter") lines: List[str] = [] lines.append(f"# 第 {chapter_num} 章创作上下文") lines.append("") lines.append("## 本章大纲") lines.append("") lines.append(str(payload.get("outline", ""))) lines.append("") lines.append("---") lines.append("") lines.append("## 前文摘要") lines.append("") for item in payload.get("previous_summaries", []): lines.append(item) lines.append("") lines.append("---") lines.append("") lines.append("## 当前状态") lines.append("") lines.append(str(payload.get("state_summary", ""))) lines.append("") contract_version = payload.get("context_contract_version") if contract_version: lines.append(f"## Contract ({contract_version})") lines.append("") stage = payload.get("context_weight_stage") if stage: lines.append(f"- 上下文阶段权重: {stage}") lines.append("") writing_guidance = payload.get("writing_guidance") or {} guidance_items = writing_guidance.get("guidance_items") or [] checklist = writing_guidance.get("checklist") or [] checklist_score = writing_guidance.get("checklist_score") or {} if guidance_items or checklist: lines.append("## 写作执行建议") lines.append("") for idx, item in enumerate(guidance_items, start=1): lines.append(f"{idx}. {item}") if checklist: total_weight = 0.0 required_count = 0 for row in checklist: if isinstance(row, dict): try: total_weight += float(row.get("weight") or 0) except (TypeError, ValueError): pass if row.get("required"): required_count += 1 lines.append("") lines.append("### 执行检查清单(可评分)") lines.append("") lines.append(f"- 项目数: {len(checklist)}") lines.append(f"- 总权重: {total_weight:.2f}") lines.append(f"- 必做项: {required_count}") lines.append("") for idx, row in enumerate(checklist, start=1): if not isinstance(row, dict): lines.append(f"{idx}. {row}") continue label = str(row.get("label") or "").strip() or "未命名项" weight = row.get("weight") required_tag = "必做" if row.get("required") else "可选" verify_hint = str(row.get("verify_hint") or "").strip() lines.append(f"{idx}. [{required_tag}][w={weight}] {label}") if verify_hint: lines.append(f" - 验收: {verify_hint}") if checklist_score: lines.append("") lines.append("### 执行评分") lines.append("") lines.append(f"- 评分: {checklist_score.get('score')}") lines.append(f"- 完成率: {checklist_score.get('completion_rate')}") lines.append(f"- 必做完成率: {checklist_score.get('required_completion_rate')}") lines.append("") reader_signal = payload.get("reader_signal") or {} review_trend = reader_signal.get("review_trend") or {} if review_trend: overall_avg = review_trend.get("overall_avg") lines.append("## 追读信号") lines.append("") lines.append(f"- 最近审查均分: {overall_avg}") low_ranges = reader_signal.get("low_score_ranges") or [] if low_ranges: lines.append(f"- 低分区间数: {len(low_ranges)}") lines.append("") genre_profile = payload.get("genre_profile") or {} if genre_profile.get("genre"): lines.append("## 题材锚定") lines.append("") lines.append(f"- 题材: {genre_profile.get('genre')}") genres = genre_profile.get("genres") or [] if len(genres) > 1: lines.append(f"- 复合题材: {' + '.join(str(token) for token in genres)}") composite_hints = genre_profile.get("composite_hints") or [] for row in composite_hints[:2]: lines.append(f"- {row}") refs = genre_profile.get("reference_hints") or [] for row in refs[:3]: lines.append(f"- {row}") lines.append("") return "\n".join(lines).rstrip() + "\n" def main(): parser = argparse.ArgumentParser(description="提取章节创作所需的精简上下文") parser.add_argument("--chapter", type=int, required=True, help="目标章节号") parser.add_argument("--project-root", type=str, help="项目根目录") parser.add_argument("--format", choices=["text", "json"], default="text", help="输出格式") args = parser.parse_args() try: project_root = Path(args.project_root) if args.project_root else find_project_root() payload = build_chapter_context_payload(project_root, args.chapter) if args.format == "json": print(json.dumps(payload, ensure_ascii=False, indent=2)) else: print(_render_text(payload), end="") except Exception as exc: print(f"❌ 错误: {exc}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": if sys.platform == "win32": enable_windows_utf8_stdio() main()