Ver código fonte

Refactor index manager and harden observability/runtime compatibility

lingfengQAQ 4 meses atrás
pai
commit
9a000b4cc1

+ 4 - 4
.claude/scripts/archive_manager.py

@@ -39,6 +39,8 @@ import argparse
 from datetime import datetime
 from pathlib import Path
 
+from runtime_compat import enable_windows_utf8_stdio
+
 # ============================================================================
 # 安全修复:导入安全工具函数(P1 MEDIUM)
 # ============================================================================
@@ -54,10 +56,8 @@ except ImportError:
     from scripts.data_modules.config import get_config
 
 # Windows UTF-8 编码修复
-if sys.platform == 'win32':
-    import io
-    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
-    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
+if sys.platform == "win32":
+    enable_windows_utf8_stdio()
 
 
 class ArchiveManager:

+ 4 - 4
.claude/scripts/backup_manager.py

@@ -52,6 +52,8 @@ import os
 import sys
 import shutil
 from pathlib import Path
+
+from runtime_compat import enable_windows_utf8_stdio
 from datetime import datetime
 from typing import Optional, List, Tuple
 
@@ -62,10 +64,8 @@ from security_utils import sanitize_commit_message, is_git_available, is_git_rep
 from project_locator import resolve_project_root
 
 # Windows 编码兼容性修复
-if sys.platform == 'win32':
-    import io
-    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
-    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
+if sys.platform == "win32":
+    enable_windows_utf8_stdio()
 
 class GitBackupManager:
     """基于 Git 的备份管理器(支持优雅降级)"""

+ 3 - 3
.claude/scripts/data_modules/context_manager.py

@@ -9,6 +9,8 @@ import json
 import re
 import sys
 from pathlib import Path
+
+from runtime_compat import enable_windows_utf8_stdio
 from typing import Any, Dict, List, Optional
 
 from .config import get_config
@@ -731,7 +733,5 @@ def main():
 if __name__ == "__main__":
     import sys
     if sys.platform == "win32":
-        import io
-        sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
-        sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+        enable_windows_utf8_stdio()
     main()

+ 9 - 8
.claude/scripts/data_modules/entity_linker.py

@@ -19,6 +19,7 @@ from dataclasses import dataclass, field
 
 from .config import get_config
 from .index_manager import IndexManager
+from .observability import safe_log_tool_call
 
 
 @dataclass
@@ -221,17 +222,17 @@ def main():
 
     def emit_success(data=None, message: str = "ok"):
         print_success(data, message=message)
-        try:
-            logger.log_tool_call(tool_name, True)
-        except Exception:
-            pass
+        safe_log_tool_call(logger, tool_name=tool_name, success=True)
 
     def emit_error(code: str, message: str, suggestion: str | None = None):
         print_error(code, message, suggestion=suggestion)
-        try:
-            logger.log_tool_call(tool_name, False, error_code=code, error_message=message)
-        except Exception:
-            pass
+        safe_log_tool_call(
+            logger,
+            tool_name=tool_name,
+            success=False,
+            error_code=code,
+            error_message=message,
+        )
 
     if args.command == "register-alias":
         entity_type = getattr(args, "type", "角色")

+ 302 - 0
.claude/scripts/data_modules/index_chapter_mixin.py

@@ -0,0 +1,302 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+IndexChapterMixin extracted from IndexManager.
+"""
+
+from __future__ import annotations
+
+import json
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+
+
+class IndexChapterMixin:
+    def add_chapter(self, meta: ChapterMeta):
+        """添加/更新章节元数据"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT OR REPLACE INTO chapters
+                (chapter, title, location, word_count, characters, summary)
+                VALUES (?, ?, ?, ?, ?, ?)
+            """,
+                (
+                    meta.chapter,
+                    meta.title,
+                    meta.location,
+                    meta.word_count,
+                    json.dumps(meta.characters, ensure_ascii=False),
+                    meta.summary,
+                ),
+            )
+            conn.commit()
+
+    def get_chapter(self, chapter: int) -> Optional[Dict]:
+        """获取章节元数据"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute("SELECT * FROM chapters WHERE chapter = ?", (chapter,))
+            row = cursor.fetchone()
+            if row:
+                return self._row_to_dict(row, parse_json=["characters"])
+            return None
+
+    def get_recent_chapters(self, limit: int = None) -> List[Dict]:
+        """获取最近章节"""
+        if limit is None:
+            limit = self.config.query_recent_chapters_limit
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM chapters
+                ORDER BY chapter DESC
+                LIMIT ?
+            """,
+                (limit,),
+            )
+            return [
+                self._row_to_dict(row, parse_json=["characters"])
+                for row in cursor.fetchall()
+            ]
+
+    # ==================== 场景操作 ====================
+
+    def add_scenes(self, chapter: int, scenes: List[SceneMeta]):
+        """添加章节场景"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            # 先删除该章节旧场景
+            cursor.execute("DELETE FROM scenes WHERE chapter = ?", (chapter,))
+
+            # 插入新场景
+            for scene in scenes:
+                cursor.execute(
+                    """
+                    INSERT INTO scenes
+                    (chapter, scene_index, start_line, end_line, location, summary, characters)
+                    VALUES (?, ?, ?, ?, ?, ?, ?)
+                """,
+                    (
+                        scene.chapter,
+                        scene.scene_index,
+                        scene.start_line,
+                        scene.end_line,
+                        scene.location,
+                        scene.summary,
+                        json.dumps(scene.characters, ensure_ascii=False),
+                    ),
+                )
+
+            conn.commit()
+
+    def get_scenes(self, chapter: int) -> List[Dict]:
+        """获取章节场景"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM scenes
+                WHERE chapter = ?
+                ORDER BY scene_index
+            """,
+                (chapter,),
+            )
+            return [
+                self._row_to_dict(row, parse_json=["characters"])
+                for row in cursor.fetchall()
+            ]
+
+    def search_scenes_by_location(self, location: str, limit: int = None) -> List[Dict]:
+        """按地点搜索场景"""
+        if limit is None:
+            limit = self.config.query_scenes_by_location_limit
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM scenes
+                WHERE location LIKE ?
+                ORDER BY chapter DESC
+                LIMIT ?
+            """,
+                (f"%{location}%", limit),
+            )
+            return [
+                self._row_to_dict(row, parse_json=["characters"])
+                for row in cursor.fetchall()
+            ]
+
+    # ==================== 出场记录操作 ====================
+
+    def record_appearance(
+        self,
+        entity_id: str,
+        chapter: int,
+        mentions: List[str],
+        confidence: float = 1.0,
+        skip_if_exists: bool = False,
+    ):
+        """记录实体出场
+
+        Args:
+            entity_id: 实体ID
+            chapter: 章节号
+            mentions: 提及列表
+            confidence: 置信度
+            skip_if_exists: 如果为True,当记录已存在时跳过(避免覆盖已有mentions)
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            if skip_if_exists:
+                # 先检查是否已存在
+                cursor.execute(
+                    "SELECT 1 FROM appearances WHERE entity_id = ? AND chapter = ?",
+                    (entity_id, chapter),
+                )
+                if cursor.fetchone():
+                    return  # 已存在,跳过
+
+            cursor.execute(
+                """
+                INSERT OR REPLACE INTO appearances
+                (entity_id, chapter, mentions, confidence)
+                VALUES (?, ?, ?, ?)
+            """,
+                (
+                    entity_id,
+                    chapter,
+                    json.dumps(mentions, ensure_ascii=False),
+                    confidence,
+                ),
+            )
+            conn.commit()
+
+    def get_entity_appearances(self, entity_id: str, limit: int = None) -> List[Dict]:
+        """获取实体出场记录"""
+        if limit is None:
+            limit = self.config.query_entity_appearances_limit
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM appearances
+                WHERE entity_id = ?
+                ORDER BY chapter DESC
+                LIMIT ?
+            """,
+                (entity_id, limit),
+            )
+            return [
+                self._row_to_dict(row, parse_json=["mentions"])
+                for row in cursor.fetchall()
+            ]
+
+    def get_recent_appearances(self, limit: int = None) -> List[Dict]:
+        """获取最近出场的实体"""
+        if limit is None:
+            limit = self.config.query_recent_appearances_limit
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT entity_id, MAX(chapter) as last_chapter, COUNT(*) as total
+                FROM appearances
+                GROUP BY entity_id
+                ORDER BY last_chapter DESC
+                LIMIT ?
+            """,
+                (limit,),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    def get_chapter_appearances(self, chapter: int) -> List[Dict]:
+        """获取某章所有出场实体"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM appearances
+                WHERE chapter = ?
+                ORDER BY confidence DESC
+            """,
+                (chapter,),
+            )
+            return [
+                self._row_to_dict(row, parse_json=["mentions"])
+                for row in cursor.fetchall()
+            ]
+
+    # ==================== v5.1 实体操作 ====================
+
+    def process_chapter_data(
+        self,
+        chapter: int,
+        title: str,
+        location: str,
+        word_count: int,
+        entities: List[Dict],
+        scenes: List[Dict],
+    ) -> Dict[str, int]:
+        """
+        处理章节数据,批量写入索引
+
+        返回写入统计
+        """
+        from .index_manager import ChapterMeta, SceneMeta
+
+        stats = {"chapters": 0, "scenes": 0, "appearances": 0}
+
+        # 提取出场角色
+        characters = [e.get("id") for e in entities if e.get("type") == "角色"]
+
+        # 写入章节元数据
+        self.add_chapter(
+            ChapterMeta(
+                chapter=chapter,
+                title=title,
+                location=location,
+                word_count=word_count,
+                characters=characters,
+                summary="",  # 可后续由 Data Agent 生成
+            )
+        )
+        stats["chapters"] = 1
+
+        # 写入场景
+        scene_metas = []
+        for s in scenes:
+            scene_metas.append(
+                SceneMeta(
+                    chapter=chapter,
+                    scene_index=s.get("index", 0),
+                    start_line=s.get("start_line", 0),
+                    end_line=s.get("end_line", 0),
+                    location=s.get("location", ""),
+                    summary=s.get("summary", ""),
+                    characters=s.get("characters", []),
+                )
+            )
+        self.add_scenes(chapter, scene_metas)
+        stats["scenes"] = len(scene_metas)
+
+        # 写入出场记录
+        for entity in entities:
+            entity_id = entity.get("id")
+            if entity_id and entity_id != "NEW":
+                self.record_appearance(
+                    entity_id=entity_id,
+                    chapter=chapter,
+                    mentions=entity.get("mentions", []),
+                    confidence=entity.get("confidence", 1.0),
+                )
+                stats["appearances"] += 1
+
+        return stats
+
+    # ==================== 辅助方法 ====================
+

+ 504 - 0
.claude/scripts/data_modules/index_debt_mixin.py

@@ -0,0 +1,504 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+IndexDebtMixin extracted from IndexManager.
+"""
+
+from __future__ import annotations
+
+import json
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+
+
+class IndexDebtMixin:
+    def create_override_contract(self, contract: OverrideContractMeta) -> int:
+        """
+        创建或更新 Override Contract
+
+        使用 SQLite 的 INSERT ... ON CONFLICT ... DO UPDATE 实现原子 UPSERT:
+        - 并发安全,无需显式锁
+        - 保持 id 不变,避免 chase_debt.override_contract_id 悬挂
+        - 完全冻结终态:已 fulfilled/cancelled 的合约所有字段都不会被修改
+
+        兼容性:支持 SQLite 3.24+(ON CONFLICT 语法),不依赖 RETURNING(3.35+)
+
+        返回合约 ID
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            # 使用 ON CONFLICT 实现原子 UPSERT(SQLite 3.24+)
+            # 终态完全冻结:fulfilled/cancelled 状态下所有字段都保持不变
+            cursor.execute(
+                """
+                INSERT INTO override_contracts
+                (chapter, constraint_type, constraint_id, rationale_type,
+                 rationale_text, payback_plan, due_chapter, status)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+                ON CONFLICT(chapter, constraint_type, constraint_id) DO UPDATE SET
+                    rationale_type = CASE
+                        WHEN override_contracts.status IN ('fulfilled', 'cancelled')
+                        THEN override_contracts.rationale_type
+                        ELSE excluded.rationale_type
+                    END,
+                    rationale_text = CASE
+                        WHEN override_contracts.status IN ('fulfilled', 'cancelled')
+                        THEN override_contracts.rationale_text
+                        ELSE excluded.rationale_text
+                    END,
+                    payback_plan = CASE
+                        WHEN override_contracts.status IN ('fulfilled', 'cancelled')
+                        THEN override_contracts.payback_plan
+                        ELSE excluded.payback_plan
+                    END,
+                    due_chapter = CASE
+                        WHEN override_contracts.status IN ('fulfilled', 'cancelled')
+                        THEN override_contracts.due_chapter
+                        ELSE excluded.due_chapter
+                    END,
+                    status = CASE
+                        WHEN override_contracts.status IN ('fulfilled', 'cancelled')
+                        THEN override_contracts.status
+                        ELSE excluded.status
+                    END
+            """,
+                (
+                    contract.chapter,
+                    contract.constraint_type,
+                    contract.constraint_id,
+                    contract.rationale_type,
+                    contract.rationale_text,
+                    contract.payback_plan,
+                    contract.due_chapter,
+                    contract.status,
+                ),
+            )
+
+            # 不使用 RETURNING(需要 SQLite 3.35+),改用查询获取 id
+            cursor.execute(
+                """
+                SELECT id FROM override_contracts
+                WHERE chapter = ? AND constraint_type = ? AND constraint_id = ?
+            """,
+                (contract.chapter, contract.constraint_type, contract.constraint_id),
+            )
+            row = cursor.fetchone()
+            if not row:
+                # UPSERT 后查不到记录是异常情况,不应发生
+                raise RuntimeError(
+                    f"Override Contract UPSERT 后无法获取 id: "
+                    f"chapter={contract.chapter}, type={contract.constraint_type}, "
+                    f"id={contract.constraint_id}"
+                )
+            contract_id = row[0]
+
+            conn.commit()
+            return contract_id
+
+    def get_pending_overrides(self, before_chapter: int = None) -> List[Dict]:
+        """获取待偿还的Override Contracts"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            if before_chapter:
+                cursor.execute(
+                    """
+                    SELECT * FROM override_contracts
+                    WHERE status = 'pending' AND due_chapter <= ?
+                    ORDER BY due_chapter ASC
+                """,
+                    (before_chapter,),
+                )
+            else:
+                cursor.execute("""
+                    SELECT * FROM override_contracts
+                    WHERE status = 'pending'
+                    ORDER BY due_chapter ASC
+                """)
+            return [dict(row) for row in cursor.fetchall()]
+
+    def get_overdue_overrides(self, current_chapter: int) -> List[Dict]:
+        """获取已逾期的Override Contracts"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM override_contracts
+                WHERE status = 'pending' AND due_chapter < ?
+                ORDER BY due_chapter ASC
+            """,
+                (current_chapter,),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    def fulfill_override(self, contract_id: int) -> bool:
+        """标记Override Contract为已偿还"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                UPDATE override_contracts SET
+                    status = 'fulfilled',
+                    fulfilled_at = CURRENT_TIMESTAMP
+                WHERE id = ?
+            """,
+                (contract_id,),
+            )
+            conn.commit()
+            return cursor.rowcount > 0
+
+    def get_chapter_overrides(self, chapter: int) -> List[Dict]:
+        """获取某章创建的Override Contracts"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM override_contracts WHERE chapter = ?
+            """,
+                (chapter,),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    # ==================== v5.3 追读力债务操作 ====================
+
+    def create_debt(self, debt: ChaseDebtMeta) -> int:
+        """
+        创建追读力债务
+
+        返回债务 ID
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO chase_debt
+                (debt_type, original_amount, current_amount, interest_rate,
+                 source_chapter, due_chapter, override_contract_id, status)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+            """,
+                (
+                    debt.debt_type,
+                    debt.original_amount,
+                    debt.current_amount,
+                    debt.interest_rate,
+                    debt.source_chapter,
+                    debt.due_chapter,
+                    debt.override_contract_id if debt.override_contract_id else None,
+                    debt.status,
+                ),
+            )
+            conn.commit()
+            debt_id = cursor.lastrowid
+
+            # 记录创建事件
+            self._record_debt_event(
+                cursor,
+                debt_id,
+                "created",
+                debt.original_amount,
+                debt.source_chapter,
+                f"创建债务: {debt.debt_type}",
+            )
+            conn.commit()
+            return debt_id
+
+    def get_active_debts(self) -> List[Dict]:
+        """获取所有活跃债务"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute("""
+                SELECT * FROM chase_debt
+                WHERE status = 'active'
+                ORDER BY due_chapter ASC
+            """)
+            return [dict(row) for row in cursor.fetchall()]
+
+    def get_overdue_debts(self, current_chapter: int) -> List[Dict]:
+        """获取已逾期的债务(包括 active 但已过期的,以及已标记为 overdue 的)"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM chase_debt
+                WHERE (status = 'overdue')
+                   OR (status = 'active' AND due_chapter < ?)
+                ORDER BY due_chapter ASC
+            """,
+                (current_chapter,),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    def get_total_debt_balance(self) -> float:
+        """获取总债务余额(包括 active 和 overdue)"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute("""
+                SELECT COALESCE(SUM(current_amount), 0) FROM chase_debt
+                WHERE status IN ('active', 'overdue')
+            """)
+            return cursor.fetchone()[0]
+
+    def accrue_interest(self, current_chapter: int) -> Dict[str, Any]:
+        """
+        计算利息(每章调用一次)
+
+        - 对 active 和 overdue 债务都计息(逾期债务继续累积利息)
+        - 使用 debt_events 表防止同一章重复计息
+        - 检查逾期并更新状态
+
+        返回: {debts_processed, total_interest, new_overdues, skipped_already_processed}
+        """
+        result = {
+            "debts_processed": 0,
+            "total_interest": 0.0,
+            "new_overdues": 0,
+            "skipped_already_processed": 0,
+        }
+
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            # 获取所有未偿还债务(active + overdue 都继续计息)
+            cursor.execute("""
+                SELECT * FROM chase_debt WHERE status IN ('active', 'overdue')
+            """)
+            debts = cursor.fetchall()
+
+            for debt in debts:
+                debt_id = debt["id"]
+                current_amount = debt["current_amount"]
+                interest_rate = debt["interest_rate"]
+                due_chapter = debt["due_chapter"]
+                debt_status = debt["status"]
+
+                # 检查本章是否已计息(防止重复调用)
+                cursor.execute(
+                    """
+                    SELECT 1 FROM debt_events
+                    WHERE debt_id = ? AND chapter = ? AND event_type = 'interest_accrued'
+                """,
+                    (debt_id, current_chapter),
+                )
+                if cursor.fetchone():
+                    result["skipped_already_processed"] += 1
+                    continue
+
+                # 计算利息
+                interest = current_amount * interest_rate
+                new_amount = current_amount + interest
+
+                # 更新债务
+                cursor.execute(
+                    """
+                    UPDATE chase_debt SET
+                        current_amount = ?,
+                        updated_at = CURRENT_TIMESTAMP
+                    WHERE id = ?
+                """,
+                    (new_amount, debt_id),
+                )
+
+                # 记录利息事件
+                self._record_debt_event(
+                    cursor,
+                    debt_id,
+                    "interest_accrued",
+                    interest,
+                    current_chapter,
+                    f"利息: {interest:.2f} (利率: {interest_rate * 100:.0f}%)",
+                )
+
+                result["debts_processed"] += 1
+                result["total_interest"] += interest
+
+                # 检查是否逾期(仅对 active 状态的债务)
+                if debt_status == "active" and current_chapter > due_chapter:
+                    cursor.execute(
+                        """
+                        UPDATE chase_debt SET status = 'overdue'
+                        WHERE id = ? AND status = 'active'
+                    """,
+                        (debt_id,),
+                    )
+                    if cursor.rowcount > 0:
+                        result["new_overdues"] += 1
+                        self._record_debt_event(
+                            cursor,
+                            debt_id,
+                            "overdue",
+                            new_amount,
+                            current_chapter,
+                            f"债务逾期 (截止: 第{due_chapter}章)",
+                        )
+
+            conn.commit()
+
+        return result
+
+    def pay_debt(self, debt_id: int, amount: float, chapter: int) -> Dict[str, Any]:
+        """
+        偿还债务
+
+        - 校验 amount > 0
+        - 完全偿还时,使用原子 UPDATE 检查并标记关联 Override 为 fulfilled
+          (并发安全:用 NOT EXISTS 子查询确保所有债务都已清零)
+
+        返回: {remaining, fully_paid, override_fulfilled}
+        """
+        # 校验偿还金额
+        if amount <= 0:
+            return {
+                "remaining": 0,
+                "fully_paid": False,
+                "error": "偿还金额必须大于0",
+            }
+
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            cursor.execute(
+                "SELECT current_amount, override_contract_id FROM chase_debt WHERE id = ?",
+                (debt_id,),
+            )
+            row = cursor.fetchone()
+            if not row:
+                return {"remaining": 0, "fully_paid": False, "error": "债务不存在"}
+
+            current = row["current_amount"]
+            override_contract_id = row["override_contract_id"]
+            remaining = max(0, current - amount)
+            override_fulfilled = False
+
+            if remaining == 0:
+                # 完全偿还
+                cursor.execute(
+                    """
+                    UPDATE chase_debt SET
+                        current_amount = 0,
+                        status = 'paid',
+                        updated_at = CURRENT_TIMESTAMP
+                    WHERE id = ?
+                """,
+                    (debt_id,),
+                )
+                self._record_debt_event(
+                    cursor, debt_id, "full_payment", amount, chapter, "债务已完全偿还"
+                )
+
+                # 原子检查并标记 Override 为 fulfilled
+                # 使用 NOT EXISTS 子查询确保并发安全:只有当确实没有未清债务时才更新
+                if override_contract_id:
+                    cursor.execute(
+                        """
+                        UPDATE override_contracts SET
+                            status = 'fulfilled',
+                            fulfilled_at = CURRENT_TIMESTAMP
+                        WHERE id = ?
+                          AND status = 'pending'
+                          AND NOT EXISTS (
+                              SELECT 1 FROM chase_debt
+                              WHERE override_contract_id = ?
+                                AND status IN ('active', 'overdue')
+                          )
+                    """,
+                        (override_contract_id, override_contract_id),
+                    )
+                    if cursor.rowcount > 0:
+                        override_fulfilled = True
+            else:
+                # 部分偿还
+                cursor.execute(
+                    """
+                    UPDATE chase_debt SET
+                        current_amount = ?,
+                        updated_at = CURRENT_TIMESTAMP
+                    WHERE id = ?
+                """,
+                    (remaining, debt_id),
+                )
+                self._record_debt_event(
+                    cursor,
+                    debt_id,
+                    "partial_payment",
+                    amount,
+                    chapter,
+                    f"部分偿还,剩余: {remaining:.2f}",
+                )
+
+            conn.commit()
+            return {
+                "remaining": remaining,
+                "fully_paid": remaining == 0,
+                "override_fulfilled": override_fulfilled,
+            }
+
+    def _record_debt_event(
+        self,
+        cursor,
+        debt_id: int,
+        event_type: str,
+        amount: float,
+        chapter: int,
+        note: str = "",
+    ):
+        """记录债务事件(内部方法)"""
+        cursor.execute(
+            """
+            INSERT INTO debt_events (debt_id, event_type, amount, chapter, note)
+            VALUES (?, ?, ?, ?, ?)
+        """,
+            (debt_id, event_type, amount, chapter, note),
+        )
+
+    def get_debt_history(self, debt_id: int) -> List[Dict]:
+        """获取债务的事件历史"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM debt_events
+                WHERE debt_id = ?
+                ORDER BY created_at ASC
+            """,
+                (debt_id,),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    # ==================== v5.3 章节追读力元数据操作 ====================
+
+    def get_debt_summary(self) -> Dict[str, Any]:
+        """获取债务汇总信息"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            # 活跃债务
+            cursor.execute("""
+                SELECT COUNT(*) as count, COALESCE(SUM(current_amount), 0) as total
+                FROM chase_debt WHERE status = 'active'
+            """)
+            active = cursor.fetchone()
+
+            # 逾期债务
+            cursor.execute("""
+                SELECT COUNT(*) as count, COALESCE(SUM(current_amount), 0) as total
+                FROM chase_debt WHERE status = 'overdue'
+            """)
+            overdue = cursor.fetchone()
+
+            # 待偿还Override
+            cursor.execute("""
+                SELECT COUNT(*) FROM override_contracts WHERE status = 'pending'
+            """)
+            pending_overrides = cursor.fetchone()[0]
+
+            return {
+                "active_debts": active["count"],
+                "active_total": active["total"],
+                "overdue_debts": overdue["count"],
+                "overdue_total": overdue["total"],
+                "pending_overrides": pending_overrides,
+                "total_balance": active["total"] + overdue["total"],
+            }
+
+    # ==================== 批量操作 ====================
+

+ 508 - 0
.claude/scripts/data_modules/index_entity_mixin.py

@@ -0,0 +1,508 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+IndexEntityMixin extracted from IndexManager.
+"""
+
+from __future__ import annotations
+
+import json
+import sys
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+
+
+class IndexEntityMixin:
+    def upsert_entity(self, entity: EntityMeta, update_metadata: bool = False) -> bool:
+        """
+        插入或更新实体 (智能合并)
+
+        - 新实体: 直接插入
+        - 已存在: 更新 current_json, last_appearance, updated_at
+        - update_metadata=True: 同时更新 canonical_name/tier/desc/is_protagonist/is_archived
+
+        返回是否为新实体
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            # 检查是否存在
+            cursor.execute(
+                "SELECT id, current_json FROM entities WHERE id = ?", (entity.id,)
+            )
+            existing = cursor.fetchone()
+
+            if existing:
+                # 已存在: 智能合并 current_json
+                old_current = {}
+                if existing["current_json"]:
+                    try:
+                        old_current = json.loads(existing["current_json"])
+                    except json.JSONDecodeError as exc:
+                        print(f"[index_manager] failed to parse JSON in entities.current_json: {exc}", file=sys.stderr)
+
+                # 合并 current (新值覆盖旧值)
+                merged_current = {**old_current, **entity.current}
+
+                if update_metadata:
+                    # 完整更新(包括元数据)
+                    cursor.execute(
+                        """
+                        UPDATE entities SET
+                            canonical_name = ?,
+                            tier = ?,
+                            desc = ?,
+                            current_json = ?,
+                            last_appearance = ?,
+                            is_protagonist = ?,
+                            is_archived = ?,
+                            updated_at = CURRENT_TIMESTAMP
+                        WHERE id = ?
+                    """,
+                        (
+                            entity.canonical_name,
+                            entity.tier,
+                            entity.desc,
+                            json.dumps(merged_current, ensure_ascii=False),
+                            entity.last_appearance,
+                            1 if entity.is_protagonist else 0,
+                            1 if entity.is_archived else 0,
+                            entity.id,
+                        ),
+                    )
+                else:
+                    # 只更新 current 和 last_appearance
+                    cursor.execute(
+                        """
+                        UPDATE entities SET
+                            current_json = ?,
+                            last_appearance = ?,
+                            updated_at = CURRENT_TIMESTAMP
+                        WHERE id = ?
+                    """,
+                        (
+                            json.dumps(merged_current, ensure_ascii=False),
+                            entity.last_appearance,
+                            entity.id,
+                        ),
+                    )
+                conn.commit()
+                return False
+            else:
+                # 新实体: 插入
+                cursor.execute(
+                    """
+                    INSERT INTO entities
+                    (id, type, canonical_name, tier, desc, current_json,
+                     first_appearance, last_appearance, is_protagonist, is_archived)
+                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                """,
+                    (
+                        entity.id,
+                        entity.type,
+                        entity.canonical_name,
+                        entity.tier,
+                        entity.desc,
+                        json.dumps(entity.current, ensure_ascii=False),
+                        entity.first_appearance,
+                        entity.last_appearance,
+                        1 if entity.is_protagonist else 0,
+                        1 if entity.is_archived else 0,
+                    ),
+                )
+                conn.commit()
+                return True
+
+    def get_entity(self, entity_id: str) -> Optional[Dict]:
+        """获取单个实体"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute("SELECT * FROM entities WHERE id = ?", (entity_id,))
+            row = cursor.fetchone()
+            if row:
+                return self._row_to_dict(row, parse_json=["current_json"])
+            return None
+
+    def get_entities_by_type(
+        self, entity_type: str, include_archived: bool = False
+    ) -> List[Dict]:
+        """按类型获取实体"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            if include_archived:
+                cursor.execute(
+                    """
+                    SELECT * FROM entities WHERE type = ?
+                    ORDER BY last_appearance DESC
+                """,
+                    (entity_type,),
+                )
+            else:
+                cursor.execute(
+                    """
+                    SELECT * FROM entities WHERE type = ? AND is_archived = 0
+                    ORDER BY last_appearance DESC
+                """,
+                    (entity_type,),
+                )
+            return [
+                self._row_to_dict(row, parse_json=["current_json"])
+                for row in cursor.fetchall()
+            ]
+
+    def get_entities_by_tier(self, tier: str) -> List[Dict]:
+        """按重要度获取实体 (核心/重要/次要/装饰)"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM entities WHERE tier = ? AND is_archived = 0
+                ORDER BY last_appearance DESC
+            """,
+                (tier,),
+            )
+            return [
+                self._row_to_dict(row, parse_json=["current_json"])
+                for row in cursor.fetchall()
+            ]
+
+    def get_core_entities(self) -> List[Dict]:
+        """获取所有核心实体 (用于 Context Agent 全量加载)"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute("""
+                SELECT * FROM entities
+                WHERE (tier IN ('核心', '重要') OR is_protagonist = 1) AND is_archived = 0
+                ORDER BY is_protagonist DESC, tier, last_appearance DESC
+            """)
+            return [
+                self._row_to_dict(row, parse_json=["current_json"])
+                for row in cursor.fetchall()
+            ]
+
+    def get_protagonist(self) -> Optional[Dict]:
+        """获取主角实体"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute("SELECT * FROM entities WHERE is_protagonist = 1 LIMIT 1")
+            row = cursor.fetchone()
+            if row:
+                return self._row_to_dict(row, parse_json=["current_json"])
+            return None
+
+    def update_entity_current(self, entity_id: str, updates: Dict) -> bool:
+        """
+        增量更新实体的 current 字段 (不覆盖其他字段)
+
+        例如: update_entity_current("xiaoyan", {"realm": "斗师"})
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            cursor.execute(
+                "SELECT current_json FROM entities WHERE id = ?", (entity_id,)
+            )
+            row = cursor.fetchone()
+            if not row:
+                return False
+
+            current = {}
+            if row["current_json"]:
+                try:
+                    current = json.loads(row["current_json"])
+                except json.JSONDecodeError as exc:
+                    print(
+                        f"[index_manager] failed to parse JSON in update_entity_current current_json: {exc}",
+                        file=sys.stderr,
+                    )
+
+            current.update(updates)
+
+            cursor.execute(
+                """
+                UPDATE entities SET
+                    current_json = ?,
+                    updated_at = CURRENT_TIMESTAMP
+                WHERE id = ?
+            """,
+                (json.dumps(current, ensure_ascii=False), entity_id),
+            )
+            conn.commit()
+            return True
+
+    def archive_entity(self, entity_id: str) -> bool:
+        """归档实体 (不删除,只是标记)"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                UPDATE entities SET is_archived = 1, updated_at = CURRENT_TIMESTAMP
+                WHERE id = ?
+            """,
+                (entity_id,),
+            )
+            conn.commit()
+            return cursor.rowcount > 0
+
+    # ==================== v5.1 别名操作 ====================
+
+    def register_alias(self, alias: str, entity_id: str, entity_type: str) -> bool:
+        """
+        注册别名 (支持一对多)
+
+        同一别名可映射多个实体 (如 "天云宗" → 地点 + 势力)
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            try:
+                cursor.execute(
+                    """
+                    INSERT OR IGNORE INTO aliases (alias, entity_id, entity_type)
+                    VALUES (?, ?, ?)
+                """,
+                    (alias, entity_id, entity_type),
+                )
+                conn.commit()
+                return cursor.rowcount > 0
+            except sqlite3.IntegrityError:
+                return False
+
+    def get_entities_by_alias(self, alias: str) -> List[Dict]:
+        """
+        根据别名查找实体 (一对多)
+
+        返回所有匹配的实体 (可能有多个不同类型)
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT e.*, a.entity_type as alias_type
+                FROM entities e
+                JOIN aliases a ON e.id = a.entity_id
+                WHERE a.alias = ?
+            """,
+                (alias,),
+            )
+            return [
+                self._row_to_dict(row, parse_json=["current_json"])
+                for row in cursor.fetchall()
+            ]
+
+    def get_entity_aliases(self, entity_id: str) -> List[str]:
+        """获取实体的所有别名"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                "SELECT alias FROM aliases WHERE entity_id = ?", (entity_id,)
+            )
+            return [row["alias"] for row in cursor.fetchall()]
+
+    def remove_alias(self, alias: str, entity_id: str) -> bool:
+        """移除别名"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                "DELETE FROM aliases WHERE alias = ? AND entity_id = ?",
+                (alias, entity_id),
+            )
+            conn.commit()
+            return cursor.rowcount > 0
+
+    # ==================== v5.1 状态变化操作 ====================
+
+    def record_state_change(self, change: StateChangeMeta) -> int:
+        """
+        记录状态变化
+
+        返回记录 ID
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO state_changes
+                (entity_id, field, old_value, new_value, reason, chapter)
+                VALUES (?, ?, ?, ?, ?, ?)
+            """,
+                (
+                    change.entity_id,
+                    change.field,
+                    change.old_value,
+                    change.new_value,
+                    change.reason,
+                    change.chapter,
+                ),
+            )
+            conn.commit()
+            return cursor.lastrowid
+
+    def get_entity_state_changes(self, entity_id: str, limit: int = 20) -> List[Dict]:
+        """获取实体的状态变化历史"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM state_changes
+                WHERE entity_id = ?
+                ORDER BY chapter DESC, id DESC
+                LIMIT ?
+            """,
+                (entity_id, limit),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    def get_recent_state_changes(self, limit: int = 50) -> List[Dict]:
+        """获取最近的状态变化"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM state_changes
+                ORDER BY chapter DESC, id DESC
+                LIMIT ?
+            """,
+                (limit,),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    def get_chapter_state_changes(self, chapter: int) -> List[Dict]:
+        """获取某章的所有状态变化"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM state_changes
+                WHERE chapter = ?
+                ORDER BY id
+            """,
+                (chapter,),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    # ==================== v5.1 关系操作 ====================
+
+    def upsert_relationship(self, rel: RelationshipMeta) -> bool:
+        """
+        插入或更新关系
+
+        相同 (from, to, type) 会更新 description 和 chapter
+        返回是否为新关系
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            # 检查是否存在
+            cursor.execute(
+                """
+                SELECT id FROM relationships
+                WHERE from_entity = ? AND to_entity = ? AND type = ?
+            """,
+                (rel.from_entity, rel.to_entity, rel.type),
+            )
+            existing = cursor.fetchone()
+
+            if existing:
+                cursor.execute(
+                    """
+                    UPDATE relationships SET
+                        description = ?,
+                        chapter = ?
+                    WHERE id = ?
+                """,
+                    (rel.description, rel.chapter, existing["id"]),
+                )
+                conn.commit()
+                return False
+            else:
+                cursor.execute(
+                    """
+                    INSERT INTO relationships
+                    (from_entity, to_entity, type, description, chapter)
+                    VALUES (?, ?, ?, ?, ?)
+                """,
+                    (
+                        rel.from_entity,
+                        rel.to_entity,
+                        rel.type,
+                        rel.description,
+                        rel.chapter,
+                    ),
+                )
+                conn.commit()
+                return True
+
+    def get_entity_relationships(
+        self, entity_id: str, direction: str = "both"
+    ) -> List[Dict]:
+        """
+        获取实体的关系
+
+        direction: "from" | "to" | "both"
+        """
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            if direction == "from":
+                cursor.execute(
+                    """
+                    SELECT * FROM relationships WHERE from_entity = ?
+                    ORDER BY chapter DESC
+                """,
+                    (entity_id,),
+                )
+            elif direction == "to":
+                cursor.execute(
+                    """
+                    SELECT * FROM relationships WHERE to_entity = ?
+                    ORDER BY chapter DESC
+                """,
+                    (entity_id,),
+                )
+            else:  # both
+                cursor.execute(
+                    """
+                    SELECT * FROM relationships
+                    WHERE from_entity = ? OR to_entity = ?
+                    ORDER BY chapter DESC
+                """,
+                    (entity_id, entity_id),
+                )
+
+            return [dict(row) for row in cursor.fetchall()]
+
+    def get_relationship_between(self, entity1: str, entity2: str) -> List[Dict]:
+        """获取两个实体之间的所有关系"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM relationships
+                WHERE (from_entity = ? AND to_entity = ?)
+                   OR (from_entity = ? AND to_entity = ?)
+                ORDER BY chapter DESC
+            """,
+                (entity1, entity2, entity2, entity1),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    def get_recent_relationships(self, limit: int = 30) -> List[Dict]:
+        """获取最近建立的关系"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM relationships
+                ORDER BY chapter DESC, id DESC
+                LIMIT ?
+            """,
+                (limit,),
+            )
+            return [dict(row) for row in cursor.fetchall()]
+
+    # ==================== v5.3 Override Contract 操作 ====================
+
+
+    def update_entity_field(self, entity_id: str, field: str, value: Any) -> bool:
+        """Compatibility helper to update a single entity field in current_json."""
+        return self.update_entity_current(entity_id, {field: value})

Diferenças do arquivo suprimidas por serem muito extensas
+ 9 - 1828
.claude/scripts/data_modules/index_manager.py


+ 216 - 0
.claude/scripts/data_modules/index_observability_mixin.py

@@ -0,0 +1,216 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+IndexObservabilityMixin extracted from IndexManager.
+"""
+
+from __future__ import annotations
+
+import json
+import sys
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+
+
+class IndexObservabilityMixin:
+    def _row_to_dict(self, row: sqlite3.Row, parse_json: List[str] = None) -> Dict:
+        """将 Row 转换为字典"""
+        d = dict(row)
+        if parse_json:
+            for key in parse_json:
+                if key in d and d[key]:
+                    try:
+                        d[key] = json.loads(d[key])
+                    except json.JSONDecodeError as exc:
+                        print(f"[index_manager] failed to parse JSON field {key} in _row_to_dict: {exc}", file=sys.stderr)
+        return d
+
+    # ==================== 无效事实管理 ====================
+
+    def mark_invalid_fact(
+        self,
+        source_type: str,
+        source_id: str,
+        reason: str,
+        marked_by: str = "user",
+        chapter_discovered: Optional[int] = None,
+    ) -> int:
+        """标记无效事实(pending)"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO invalid_facts
+                (source_type, source_id, reason, status, marked_by, chapter_discovered)
+                VALUES (?, ?, ?, 'pending', ?, ?)
+            """,
+                (source_type, str(source_id), reason, marked_by, chapter_discovered),
+            )
+            conn.commit()
+            return int(cursor.lastrowid)
+
+    def resolve_invalid_fact(self, invalid_id: int, action: str) -> bool:
+        """确认或撤销无效标记"""
+        action = action.lower()
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            if action == "confirm":
+                cursor.execute(
+                    """
+                    UPDATE invalid_facts
+                    SET status = 'confirmed', confirmed_at = CURRENT_TIMESTAMP
+                    WHERE id = ?
+                """,
+                    (invalid_id,),
+                )
+            elif action == "dismiss":
+                cursor.execute("DELETE FROM invalid_facts WHERE id = ?", (invalid_id,))
+            else:
+                return False
+            conn.commit()
+            return cursor.rowcount > 0
+
+    def list_invalid_facts(self, status: Optional[str] = None) -> List[Dict]:
+        """列出无效事实"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            if status:
+                cursor.execute(
+                    "SELECT * FROM invalid_facts WHERE status = ? ORDER BY id DESC",
+                    (status,),
+                )
+            else:
+                cursor.execute("SELECT * FROM invalid_facts ORDER BY id DESC")
+            return [dict(r) for r in cursor.fetchall()]
+
+    def get_invalid_ids(self, source_type: str, status: str = "confirmed") -> set[str]:
+        """获取无效事实 ID 集合"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                "SELECT source_id FROM invalid_facts WHERE source_type = ? AND status = ?",
+                (source_type, status),
+            )
+            return {str(r[0]) for r in cursor.fetchall() if r and r[0] is not None}
+
+    # ==================== 日志记录 ====================
+
+    def log_rag_query(
+        self,
+        query: str,
+        query_type: str,
+        results_count: int,
+        hit_sources: Optional[str] = None,
+        latency_ms: Optional[int] = None,
+        chapter: Optional[int] = None,
+    ) -> None:
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO rag_query_log
+                (query, query_type, results_count, hit_sources, latency_ms, chapter)
+                VALUES (?, ?, ?, ?, ?, ?)
+            """,
+                (query, query_type, results_count, hit_sources, latency_ms, chapter),
+            )
+            conn.commit()
+
+    def log_tool_call(
+        self,
+        tool_name: str,
+        success: bool,
+        retry_count: int = 0,
+        error_code: Optional[str] = None,
+        error_message: Optional[str] = None,
+        chapter: Optional[int] = None,
+    ) -> None:
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO tool_call_stats
+                (tool_name, success, retry_count, error_code, error_message, chapter)
+                VALUES (?, ?, ?, ?, ?, ?)
+            """,
+                (tool_name, int(bool(success)), retry_count, error_code, error_message, chapter),
+            )
+            conn.commit()
+
+    def get_stats(self) -> Dict[str, int]:
+        """获取索引统计"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+
+            cursor.execute("SELECT COUNT(*) FROM chapters")
+            chapters = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(*) FROM scenes")
+            scenes = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(DISTINCT entity_id) FROM appearances")
+            appearances = cursor.fetchone()[0]
+
+            cursor.execute("SELECT MAX(chapter) FROM chapters")
+            max_chapter = cursor.fetchone()[0] or 0
+
+            # v5.1 引入统计
+            cursor.execute("SELECT COUNT(*) FROM entities")
+            entities = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(*) FROM entities WHERE is_archived = 0")
+            active_entities = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(*) FROM aliases")
+            aliases = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(*) FROM state_changes")
+            state_changes = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(*) FROM relationships")
+            relationships = cursor.fetchone()[0]
+
+            # v5.3 引入统计
+            cursor.execute("SELECT COUNT(*) FROM override_contracts")
+            override_contracts = cursor.fetchone()[0]
+
+            cursor.execute(
+                "SELECT COUNT(*) FROM override_contracts WHERE status = 'pending'"
+            )
+            pending_overrides = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(*) FROM chase_debt WHERE status = 'active'")
+            active_debts = cursor.fetchone()[0]
+
+            cursor.execute(
+                "SELECT COALESCE(SUM(current_amount), 0) FROM chase_debt WHERE status IN ('active', 'overdue')"
+            )
+            total_debt = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(*) FROM chapter_reading_power")
+            reading_power_records = cursor.fetchone()[0]
+
+            cursor.execute("SELECT COUNT(*) FROM review_metrics")
+            review_metrics = cursor.fetchone()[0]
+
+            return {
+                "chapters": chapters,
+                "scenes": scenes,
+                "appearances": appearances,
+                "max_chapter": max_chapter,
+                # v5.1 引入
+                "entities": entities,
+                "active_entities": active_entities,
+                "aliases": aliases,
+                "state_changes": state_changes,
+                "relationships": relationships,
+                # v5.3 引入
+                "override_contracts": override_contracts,
+                "pending_overrides": pending_overrides,
+                "active_debts": active_debts,
+                "total_debt": total_debt,
+                "reading_power_records": reading_power_records,
+                "review_metrics": review_metrics,
+            }
+
+

+ 382 - 0
.claude/scripts/data_modules/index_reading_mixin.py

@@ -0,0 +1,382 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+IndexReadingMixin extracted from IndexManager.
+"""
+
+from __future__ import annotations
+
+import json
+import sys
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+
+
+class IndexReadingMixin:
+    def save_chapter_reading_power(self, meta: ChapterReadingPowerMeta):
+        """保存章节追读力元数据"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT OR REPLACE INTO chapter_reading_power
+                (chapter, hook_type, hook_strength, coolpoint_patterns,
+                 micropayoffs, hard_violations, soft_suggestions,
+                 is_transition, override_count, debt_balance)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+            """,
+                (
+                    meta.chapter,
+                    meta.hook_type,
+                    meta.hook_strength,
+                    json.dumps(meta.coolpoint_patterns, ensure_ascii=False),
+                    json.dumps(meta.micropayoffs, ensure_ascii=False),
+                    json.dumps(meta.hard_violations, ensure_ascii=False),
+                    json.dumps(meta.soft_suggestions, ensure_ascii=False),
+                    1 if meta.is_transition else 0,
+                    meta.override_count,
+                    meta.debt_balance,
+                ),
+            )
+            conn.commit()
+
+    def get_chapter_reading_power(self, chapter: int) -> Optional[Dict]:
+        """获取章节追读力元数据"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                "SELECT * FROM chapter_reading_power WHERE chapter = ?", (chapter,)
+            )
+            row = cursor.fetchone()
+            if row:
+                return self._row_to_dict(
+                    row,
+                    parse_json=[
+                        "coolpoint_patterns",
+                        "micropayoffs",
+                        "hard_violations",
+                        "soft_suggestions",
+                    ],
+                )
+            return None
+
+    def get_recent_reading_power(self, limit: int = 10) -> List[Dict]:
+        """获取最近章节的追读力元数据"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM chapter_reading_power
+                ORDER BY chapter DESC
+                LIMIT ?
+            """,
+                (limit,),
+            )
+            return [
+                self._row_to_dict(
+                    row,
+                    parse_json=[
+                        "coolpoint_patterns",
+                        "micropayoffs",
+                        "hard_violations",
+                        "soft_suggestions",
+                    ],
+                )
+                for row in cursor.fetchall()
+            ]
+
+    def get_pattern_usage_stats(self, last_n_chapters: int = 20) -> Dict[str, int]:
+        """获取最近N章的爽点模式使用统计"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT coolpoint_patterns FROM chapter_reading_power
+                ORDER BY chapter DESC
+                LIMIT ?
+            """,
+                (last_n_chapters,),
+            )
+
+            stats = {}
+            for row in cursor.fetchall():
+                if row["coolpoint_patterns"]:
+                    try:
+                        patterns = json.loads(row["coolpoint_patterns"])
+                        for p in patterns:
+                            stats[p] = stats.get(p, 0) + 1
+                    except json.JSONDecodeError as exc:
+                        print(
+                            f"[index_manager] failed to parse JSON in chapter_reading_power.coolpoint_patterns: {exc}",
+                            file=sys.stderr,
+                        )
+            return stats
+
+    def get_hook_type_stats(self, last_n_chapters: int = 20) -> Dict[str, int]:
+        """获取最近N章的钩子类型使用统计"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT hook_type FROM chapter_reading_power
+                WHERE hook_type IS NOT NULL AND hook_type != ''
+                ORDER BY chapter DESC
+                LIMIT ?
+            """,
+                (last_n_chapters,),
+            )
+
+            stats = {}
+            for row in cursor.fetchall():
+                hook = row["hook_type"]
+                stats[hook] = stats.get(hook, 0) + 1
+            return stats
+
+    # ==================== v5.4 审查指标 ====================
+
+    def save_review_metrics(self, metrics: ReviewMetrics) -> None:
+        """保存审查指标记录"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO review_metrics
+                (start_chapter, end_chapter, overall_score, dimension_scores,
+                 severity_counts, critical_issues, report_file, notes, created_at, updated_at)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
+                ON CONFLICT(start_chapter, end_chapter)
+                DO UPDATE SET
+                    overall_score = excluded.overall_score,
+                    dimension_scores = excluded.dimension_scores,
+                    severity_counts = excluded.severity_counts,
+                    critical_issues = excluded.critical_issues,
+                    report_file = excluded.report_file,
+                    notes = excluded.notes,
+                    updated_at = CURRENT_TIMESTAMP
+            """,
+                (
+                    metrics.start_chapter,
+                    metrics.end_chapter,
+                    metrics.overall_score,
+                    json.dumps(metrics.dimension_scores, ensure_ascii=False),
+                    json.dumps(metrics.severity_counts, ensure_ascii=False),
+                    json.dumps(metrics.critical_issues, ensure_ascii=False),
+                    metrics.report_file,
+                    metrics.notes,
+                ),
+            )
+            conn.commit()
+
+    def get_recent_review_metrics(self, limit: int = 5) -> List[Dict]:
+        """获取最近审查记录"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM review_metrics
+                ORDER BY end_chapter DESC, start_chapter DESC
+                LIMIT ?
+            """,
+                (limit,),
+            )
+            return [
+                self._row_to_dict(
+                    row,
+                    parse_json=["dimension_scores", "severity_counts", "critical_issues"],
+                )
+                for row in cursor.fetchall()
+            ]
+
+    def get_review_trend_stats(self, last_n: int = 5) -> Dict[str, Any]:
+        """获取审查趋势统计"""
+        records = self.get_recent_review_metrics(last_n)
+        if not records:
+            return {
+                "count": 0,
+                "overall_avg": 0.0,
+                "dimension_avg": {},
+                "severity_totals": {},
+                "recent_ranges": [],
+            }
+
+        overall_scores: List[float] = []
+        dimension_totals: Dict[str, float] = {}
+        dimension_counts: Dict[str, int] = {}
+        severity_totals: Dict[str, int] = {}
+
+        for record in records:
+            score = record.get("overall_score")
+            if score is not None:
+                try:
+                    overall_scores.append(float(score))
+                except (TypeError, ValueError):
+                    pass
+
+            dimensions = record.get("dimension_scores") or {}
+            if isinstance(dimensions, dict):
+                for key, value in dimensions.items():
+                    try:
+                        val = float(value)
+                    except (TypeError, ValueError):
+                        continue
+                    dimension_totals[key] = dimension_totals.get(key, 0.0) + val
+                    dimension_counts[key] = dimension_counts.get(key, 0) + 1
+
+            severities = record.get("severity_counts") or {}
+            if isinstance(severities, dict):
+                for key, value in severities.items():
+                    try:
+                        count = int(value)
+                    except (TypeError, ValueError):
+                        continue
+                    severity_totals[key] = severity_totals.get(key, 0) + count
+
+        overall_avg = round(sum(overall_scores) / len(overall_scores), 2) if overall_scores else 0.0
+        dimension_avg = {
+            key: round(dimension_totals[key] / dimension_counts[key], 2)
+            for key in dimension_totals
+            if dimension_counts.get(key, 0) > 0
+        }
+        recent_ranges = [
+            {
+                "start_chapter": record.get("start_chapter"),
+                "end_chapter": record.get("end_chapter"),
+                "overall_score": record.get("overall_score", 0),
+            }
+            for record in records
+        ]
+
+        return {
+            "count": len(records),
+            "overall_avg": overall_avg,
+            "dimension_avg": dimension_avg,
+            "severity_totals": severity_totals,
+            "recent_ranges": recent_ranges,
+        }
+
+    # ==================== 写作清单评分(Phase F) ====================
+
+    def save_writing_checklist_score(self, meta: WritingChecklistScoreMeta) -> None:
+        """保存章节写作清单评分。"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO writing_checklist_scores (
+                    chapter, template, total_items, required_items,
+                    completed_items, completed_required,
+                    total_weight, completed_weight, completion_rate, score,
+                    score_breakdown, pending_items, source, notes
+                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                ON CONFLICT(chapter) DO UPDATE SET
+                    template=excluded.template,
+                    total_items=excluded.total_items,
+                    required_items=excluded.required_items,
+                    completed_items=excluded.completed_items,
+                    completed_required=excluded.completed_required,
+                    total_weight=excluded.total_weight,
+                    completed_weight=excluded.completed_weight,
+                    completion_rate=excluded.completion_rate,
+                    score=excluded.score,
+                    score_breakdown=excluded.score_breakdown,
+                    pending_items=excluded.pending_items,
+                    source=excluded.source,
+                    notes=excluded.notes,
+                    updated_at=CURRENT_TIMESTAMP
+            """,
+                (
+                    meta.chapter,
+                    meta.template,
+                    meta.total_items,
+                    meta.required_items,
+                    meta.completed_items,
+                    meta.completed_required,
+                    meta.total_weight,
+                    meta.completed_weight,
+                    meta.completion_rate,
+                    meta.score,
+                    json.dumps(meta.score_breakdown, ensure_ascii=False),
+                    json.dumps(meta.pending_items, ensure_ascii=False),
+                    meta.source,
+                    meta.notes,
+                ),
+            )
+            conn.commit()
+
+    def get_writing_checklist_score(self, chapter: int) -> Optional[Dict[str, Any]]:
+        """获取指定章节的写作清单评分。"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                "SELECT * FROM writing_checklist_scores WHERE chapter = ?",
+                (chapter,),
+            )
+            row = cursor.fetchone()
+            if not row:
+                return None
+            return self._row_to_dict(row, parse_json=["score_breakdown", "pending_items"])
+
+    def get_recent_writing_checklist_scores(self, limit: int = 10) -> List[Dict[str, Any]]:
+        """获取最近章节写作清单评分。"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                SELECT * FROM writing_checklist_scores
+                ORDER BY chapter DESC
+                LIMIT ?
+            """,
+                (limit,),
+            )
+            return [
+                self._row_to_dict(row, parse_json=["score_breakdown", "pending_items"])
+                for row in cursor.fetchall()
+            ]
+
+    def get_writing_checklist_score_trend(self, last_n: int = 10) -> Dict[str, Any]:
+        """获取写作清单评分趋势统计。"""
+        records = self.get_recent_writing_checklist_scores(limit=max(1, int(last_n)))
+        if not records:
+            return {
+                "count": 0,
+                "score_avg": 0.0,
+                "completion_avg": 0.0,
+                "required_completion_avg": 0.0,
+                "recent": [],
+            }
+
+        scores: List[float] = []
+        completion_rates: List[float] = []
+        required_rates: List[float] = []
+        for row in records:
+            try:
+                scores.append(float(row.get("score", 0.0)))
+            except (TypeError, ValueError):
+                pass
+            try:
+                completion_rates.append(float(row.get("completion_rate", 0.0)))
+            except (TypeError, ValueError):
+                pass
+
+            required_items = int(row.get("required_items") or 0)
+            completed_required = int(row.get("completed_required") or 0)
+            if required_items > 0:
+                required_rates.append(completed_required / required_items)
+            else:
+                required_rates.append(1.0)
+
+        return {
+            "count": len(records),
+            "score_avg": round(sum(scores) / len(scores), 2) if scores else 0.0,
+            "completion_avg": round(sum(completion_rates) / len(completion_rates), 4) if completion_rates else 0.0,
+            "required_completion_avg": round(sum(required_rates) / len(required_rates), 4) if required_rates else 0.0,
+            "recent": [
+                {
+                    "chapter": row.get("chapter"),
+                    "score": row.get("score"),
+                    "completion_rate": row.get("completion_rate"),
+                }
+                for row in records
+            ],
+        }
+

+ 37 - 0
.claude/scripts/data_modules/observability.py

@@ -0,0 +1,37 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Shared observability helpers for data modules.
+"""
+
+from __future__ import annotations
+
+import sys
+from typing import Optional
+
+
+def safe_log_tool_call(
+    logger,
+    *,
+    tool_name: str,
+    success: bool,
+    retry_count: int = 0,
+    error_code: Optional[str] = None,
+    error_message: Optional[str] = None,
+    chapter: Optional[int] = None,
+) -> None:
+    try:
+        logger.log_tool_call(
+            tool_name,
+            success,
+            retry_count=retry_count,
+            error_code=error_code,
+            error_message=error_message,
+            chapter=chapter,
+        )
+    except Exception as exc:
+        print(
+            f"[observability] failed to log tool call {tool_name}: {exc}",
+            file=sys.stderr,
+        )
+

+ 16 - 13
.claude/scripts/data_modules/rag_adapter.py

@@ -15,6 +15,8 @@ import sqlite3
 import json
 import math
 from pathlib import Path
+
+from runtime_compat import enable_windows_utf8_stdio
 from typing import Dict, List, Optional, Any, Tuple
 from dataclasses import dataclass
 from collections import Counter
@@ -26,6 +28,7 @@ import time
 from .config import get_config
 from .api_client import get_client
 from .index_manager import IndexManager
+from .observability import safe_log_tool_call
 
 
 @dataclass
@@ -356,8 +359,10 @@ class RAGAdapter:
                 latency_ms=latency_ms,
                 chapter=chapter,
             )
-        except Exception:
-            pass
+        except Exception as exc:
+            import sys
+
+            print(f"[rag_adapter] failed to log rag query: {exc}", file=sys.stderr)
 
     # ==================== BM25 索引 ====================
 
@@ -843,17 +848,17 @@ def main():
 
     def emit_success(data=None, message: str = "ok"):
         print_success(data, message=message)
-        try:
-            adapter.index_manager.log_tool_call(tool_name, True)
-        except Exception:
-            pass
+        safe_log_tool_call(adapter.index_manager, tool_name=tool_name, success=True)
 
     def emit_error(code: str, message: str, suggestion: str | None = None):
         print_error(code, message, suggestion=suggestion)
-        try:
-            adapter.index_manager.log_tool_call(tool_name, False, error_code=code, error_message=message)
-        except Exception:
-            pass
+        safe_log_tool_call(
+            adapter.index_manager,
+            tool_name=tool_name,
+            success=False,
+            error_code=code,
+            error_message=message,
+        )
 
     if args.command == "stats":
         stats = adapter.get_stats()
@@ -927,7 +932,5 @@ def main():
 if __name__ == "__main__":
     import sys
     if sys.platform == "win32":
-        import io
-        sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
-        sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+        enable_windows_utf8_stdio()
     main()

+ 9 - 8
.claude/scripts/data_modules/sql_state_manager.py

@@ -24,6 +24,7 @@ from .index_manager import (
     RelationshipMeta
 )
 from .config import get_config
+from .observability import safe_log_tool_call
 
 
 @dataclass
@@ -512,17 +513,17 @@ def main():
 
     def emit_success(data=None, message: str = "ok"):
         print_success(data, message=message)
-        try:
-            logger.log_tool_call(tool_name, True)
-        except Exception:
-            pass
+        safe_log_tool_call(logger, tool_name=tool_name, success=True)
 
     def emit_error(code: str, message: str, suggestion: str | None = None):
         print_error(code, message, suggestion=suggestion)
-        try:
-            logger.log_tool_call(tool_name, False, error_code=code, error_message=message)
-        except Exception:
-            pass
+        safe_log_tool_call(
+            logger,
+            tool_name=tool_name,
+            success=False,
+            error_code=code,
+            error_message=message,
+        )
 
     if args.command == "stats":
         stats = manager.get_stats()

+ 12 - 11
.claude/scripts/data_modules/state_manager.py

@@ -17,12 +17,15 @@ import json
 import sys
 from copy import deepcopy
 from pathlib import Path
+
+from runtime_compat import enable_windows_utf8_stdio
 from typing import Dict, List, Optional, Any
 from dataclasses import dataclass, field, asdict
 from datetime import datetime
 import filelock
 
 from .config import get_config
+from .observability import safe_log_tool_call
 
 try:
     # 当 scripts 目录在 sys.path 中(常见:从 scripts/ 运行)
@@ -1240,17 +1243,17 @@ def main():
 
     def emit_success(data=None, message: str = "ok"):
         print_success(data, message=message)
-        try:
-            logger.log_tool_call(tool_name, True)
-        except Exception:
-            pass
+        safe_log_tool_call(logger, tool_name=tool_name, success=True)
 
     def emit_error(code: str, message: str, suggestion: str | None = None):
         print_error(code, message, suggestion=suggestion)
-        try:
-            logger.log_tool_call(tool_name, False, error_code=code, error_message=message)
-        except Exception:
-            pass
+        safe_log_tool_call(
+            logger,
+            tool_name=tool_name,
+            success=False,
+            error_code=code,
+            error_message=message,
+        )
 
     if args.command == "get-progress":
         emit_success(manager._state.get("progress", {}), message="progress")
@@ -1304,7 +1307,5 @@ def main():
 
 if __name__ == "__main__":
     if sys.platform == "win32":
-        import io
-        sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
-        sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+        enable_windows_utf8_stdio()
     main()

+ 9 - 8
.claude/scripts/data_modules/style_sampler.py

@@ -19,6 +19,7 @@ from enum import Enum
 from contextlib import contextmanager
 
 from .config import get_config
+from .observability import safe_log_tool_call
 
 
 class SceneType(Enum):
@@ -349,17 +350,17 @@ def main():
 
     def emit_success(data=None, message: str = "ok"):
         print_success(data, message=message)
-        try:
-            logger.log_tool_call(tool_name, True)
-        except Exception:
-            pass
+        safe_log_tool_call(logger, tool_name=tool_name, success=True)
 
     def emit_error(code: str, message: str, suggestion: str | None = None):
         print_error(code, message, suggestion=suggestion)
-        try:
-            logger.log_tool_call(tool_name, False, error_code=code, error_message=message)
-        except Exception:
-            pass
+        safe_log_tool_call(
+            logger,
+            tool_name=tool_name,
+            success=False,
+            error_code=code,
+            error_message=message,
+        )
 
     if args.command == "stats":
         stats = sampler.get_stats()

+ 74 - 0
.claude/scripts/data_modules/tests/test_archive_manager.py

@@ -0,0 +1,74 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from pathlib import Path
+
+import pytest
+
+
+def _load_archive_module():
+    import sys
+
+    scripts_dir = Path(__file__).resolve().parents[2]
+    if str(scripts_dir) not in sys.path:
+        sys.path.insert(0, str(scripts_dir))
+
+    import archive_manager
+
+    return archive_manager
+
+
+@pytest.fixture
+def archive_env(tmp_path):
+    webnovel = tmp_path / ".webnovel"
+    webnovel.mkdir(parents=True, exist_ok=True)
+    state_path = webnovel / "state.json"
+    state_path.write_text(
+        '{"progress":{"current_chapter":10},"plot_threads":{},"review_checkpoints":[]}',
+        encoding="utf-8",
+    )
+    return tmp_path
+
+
+def test_archive_remove_from_state_missing_sections(archive_env):
+    module = _load_archive_module()
+    manager = module.ArchiveManager(project_root=archive_env)
+
+    state = {
+        "progress": {"current_chapter": 50},
+    }
+
+    updated = manager.remove_from_state(state, inactive_chars=[], resolved_threads=[], old_reviews=[])
+    assert updated.get("progress", {}).get("current_chapter") == 50
+
+
+def test_archive_check_trigger_conditions_edges(archive_env):
+    module = _load_archive_module()
+    manager = module.ArchiveManager(project_root=archive_env)
+
+    manager.config["chapter_trigger"] = 10
+    manager.config["file_size_trigger_mb"] = 9999.0
+
+    trigger = manager.check_trigger_conditions({"progress": {"current_chapter": 20}})
+    assert trigger["chapter_trigger"] is True
+    assert trigger["should_archive"] is True
+
+
+def test_archive_identify_old_reviews_handles_mixed_formats(archive_env):
+    module = _load_archive_module()
+    manager = module.ArchiveManager(project_root=archive_env)
+    manager.config["review_old_threshold"] = 5
+
+    state = {
+        "progress": {"current_chapter": 30},
+        "review_checkpoints": [
+            {"chapters": "20-22", "report": "r1.md"},
+            {"chapter_range": [10, 12], "date": "2026-01-01"},
+            {"report": "Review_Ch5-6.md"},
+        ],
+    }
+
+    results = manager.identify_old_reviews(state)
+    assert len(results) == 3
+    assert all(row["chapters_since_review"] >= 5 for row in results)
+

+ 14 - 0
.claude/scripts/data_modules/tests/test_rag_adapter.py

@@ -185,3 +185,17 @@ def test_rag_adapter_cli(temp_project, monkeypatch, capsys):
     run_cli(["--project-root", root, "search", "--query", "内容", "--mode", "hybrid", "--top-k", "5"])
 
     capsys.readouterr()
+
+
+def test_rag_adapter_log_query_failure_is_reported(temp_project, monkeypatch, capsys):
+    adapter = RAGAdapter(temp_project)
+
+    def _raise_log_error(*args, **kwargs):
+        raise RuntimeError("log write failed")
+
+    monkeypatch.setattr(adapter.index_manager, "log_rag_query", _raise_log_error)
+
+    adapter._log_query("q", "vector", [], 1)
+
+    captured = capsys.readouterr()
+    assert "failed to log rag query" in captured.err

+ 19 - 0
.claude/scripts/data_modules/tests/test_workflow_manager.py

@@ -113,3 +113,22 @@ def test_safe_append_call_trace_logs_failure(monkeypatch, capsys):
     captured = capsys.readouterr()
     assert "failed to append call trace" in captured.err
     assert "unit_test_event" in captured.err
+
+
+def test_workflow_reentry_does_not_duplicate_history(tmp_path, monkeypatch):
+    module = _load_module()
+    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
+
+    webnovel_dir = tmp_path / ".webnovel"
+    webnovel_dir.mkdir(parents=True, exist_ok=True)
+
+    module.start_task("webnovel-write", {"chapter_num": 20})
+    module.start_task("webnovel-write", {"chapter_num": 20})
+    module.start_task("webnovel-write", {"chapter_num": 20})
+
+    state = module.load_state()
+    assert isinstance(state.get("history"), list)
+    assert len(state.get("history")) == 0
+
+    task = state.get("current_task") or {}
+    assert int(task.get("retry_count", 0)) >= 2

+ 4 - 4
.claude/scripts/golden_three_checker.py

@@ -26,6 +26,8 @@ import re
 import json
 import argparse
 from pathlib import Path
+
+from runtime_compat import enable_windows_utf8_stdio
 from typing import Dict, List, Optional, Any
 
 # 导入项目定位和章节路径模块
@@ -33,10 +35,8 @@ from project_locator import resolve_project_root
 from chapter_paths import find_chapter_file
 
 # Windows UTF-8 输出修复
-if sys.platform == 'win32':
-    import io
-    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
-    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
+if sys.platform == "win32":
+    enable_windows_utf8_stdio()
 
 
 # ============================================================================

+ 3 - 4
.claude/scripts/init_project.py

@@ -21,6 +21,8 @@ import subprocess
 import sys
 from datetime import datetime
 from pathlib import Path
+
+from runtime_compat import enable_windows_utf8_stdio
 from typing import Any, Dict, List
 import re
 
@@ -30,10 +32,7 @@ from security_utils import sanitize_commit_message, atomic_write_json, is_git_av
 
 # Windows 编码兼容性修复
 if sys.platform == "win32":
-    import io
-
-    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
-    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+    enable_windows_utf8_stdio()
 
 
 def _read_text_if_exists(path: Path) -> str:

+ 39 - 0
.claude/scripts/runtime_compat.py

@@ -0,0 +1,39 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Runtime compatibility helpers.
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+
+
+def enable_windows_utf8_stdio(*, skip_in_pytest: bool = False) -> bool:
+    """Enable UTF-8 stdio wrappers on Windows.
+
+    Returns:
+        True if wrapping was applied, False otherwise.
+    """
+    if sys.platform != "win32":
+        return False
+    if skip_in_pytest and os.environ.get("PYTEST_CURRENT_TEST"):
+        return False
+
+    stdout_encoding = str(getattr(sys.stdout, "encoding", "") or "").lower()
+    stderr_encoding = str(getattr(sys.stderr, "encoding", "") or "").lower()
+    if stdout_encoding == "utf-8" and stderr_encoding == "utf-8":
+        return False
+
+    try:
+        import io
+
+        if hasattr(sys.stdout, "buffer"):
+            sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
+        if hasattr(sys.stderr, "buffer"):
+            sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+        return True
+    except Exception:
+        return False
+

+ 4 - 4
.claude/scripts/security_utils.py

@@ -14,6 +14,8 @@ import re
 import sys
 import tempfile
 from pathlib import Path
+
+from runtime_compat import enable_windows_utf8_stdio
 from typing import Any, Dict, Optional, Union
 
 # 尝试导入 filelock(可选依赖)
@@ -576,10 +578,8 @@ def _run_self_tests():
 
 if __name__ == "__main__":
     # Windows UTF-8 编码修复(必须在打印前执行)
-    if sys.platform == 'win32':
-        import io
-        sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
-        sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
+    if sys.platform == "win32":
+        enable_windows_utf8_stdio()
 
     # 运行自检测试
     _run_self_tests()

+ 2 - 16
.claude/scripts/status_reporter.py

@@ -88,6 +88,7 @@ from datetime import datetime
 from collections import defaultdict
 from project_locator import resolve_project_root
 from chapter_paths import extract_chapter_num_from_filename
+from runtime_compat import enable_windows_utf8_stdio
 
 # 导入配置
 try:
@@ -119,22 +120,7 @@ def _is_resolved_foreshadowing_status(raw_status: Any) -> bool:
 
 def _enable_windows_utf8_stdio() -> None:
     """在 Windows 下启用 UTF-8 输出;pytest 环境跳过以避免捕获冲突。"""
-    if sys.platform != "win32":
-        return
-    if os.environ.get("PYTEST_CURRENT_TEST"):
-        return
-
-    try:
-        import io
-
-        stdout_buffer = getattr(sys.stdout, "buffer", None)
-        stderr_buffer = getattr(sys.stderr, "buffer", None)
-        if stdout_buffer is not None:
-            sys.stdout = io.TextIOWrapper(stdout_buffer, encoding="utf-8")
-        if stderr_buffer is not None:
-            sys.stderr = io.TextIOWrapper(stderr_buffer, encoding="utf-8")
-    except Exception:
-        pass
+    enable_windows_utf8_stdio(skip_in_pytest=True)
 
 
 class StatusReporter:

+ 4 - 4
.claude/scripts/update_state.py

@@ -49,6 +49,8 @@ import sys
 import argparse
 import shutil
 from pathlib import Path
+
+from runtime_compat import enable_windows_utf8_stdio
 from datetime import datetime
 from typing import Dict, Any, Optional
 
@@ -63,10 +65,8 @@ from data_modules.state_validator import (
 )
 
 # Windows 编码兼容性修复
-if sys.platform == 'win32':
-    import io
-    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
-    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
+if sys.platform == "win32":
+    enable_windows_utf8_stdio()
 
 class StateUpdater:
     """state.json 安全更新器"""

+ 2 - 4
.claude/scripts/workflow_manager.py

@@ -19,15 +19,13 @@ from typing import Any, Dict, Optional
 
 from chapter_paths import default_chapter_draft_path, find_chapter_file
 from project_locator import resolve_project_root
+from runtime_compat import enable_windows_utf8_stdio
 from security_utils import atomic_write_json, create_secure_directory
 
 
 # UTF-8 output for Windows console (CLI run only, avoid pytest capture issues)
 if sys.platform == "win32" and __name__ == "__main__" and not os.environ.get("PYTEST_CURRENT_TEST"):
-    import io
-
-    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
-    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+    enable_windows_utf8_stdio(skip_in_pytest=True)
 
 
 TASK_STATUS_RUNNING = "running"

Alguns arquivos não foram mostrados porque muitos arquivos mudaram nesse diff