Просмотр исходного кода

feat(v5.4): 上下文工程基础设施(invalid_facts/日志表/CLI统一)

- index_manager: 新增 invalid_facts, rag_query_log, tool_call_stats 表
- index_manager: 新增 mark-invalid/resolve-invalid/list-invalid CLI 命令
- cli_output: 统一 CLI 输出格式 (CLIResponse)
- schemas: Pydantic schema 定义 (DataAgentOutput 等)
- consistency-checker: 新增 Step 5 自动标记 invalid_facts
- requirements: 添加 pydantic>=2.0.0

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
lingfengQAQ 4 месяцев назад
Родитель
Сommit
616068cf18

+ 16 - 0
.claude/agents/consistency-checker.md

@@ -163,6 +163,22 @@ Chapters {N} - {M}
 **Minor Issues**: {count} (Recommend fixing)
 ```
 
+### Step 5: 标记无效事实(新增)
+
+对于发现的 **CRITICAL** 级别问题,自动标记到 invalid_facts(pending):
+
+```bash
+python -m data_modules.index_manager mark-invalid \
+  --source-type entity \
+  --source-id {entity_id} \
+  --reason "{问题描述}" \
+  --marked-by consistency-checker \
+  --chapter {current_chapter} \
+  --project-root "."
+```
+
+> 注意:自动标记仅为 `pending`,需用户确认后才生效。
+
 ## Anti-Patterns (Forbidden)
 
 ❌ Approving chapters with POWER_CONFLICT (战力崩坏)

+ 69 - 0
.claude/scripts/data_modules/cli_output.py

@@ -0,0 +1,69 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+CLI output helpers for data_modules.
+
+All CLI tools should emit JSON payloads via these helpers.
+"""
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass
+from typing import Any, Dict, Optional
+
+
+@dataclass
+class ErrorPayload:
+    code: str
+    message: str
+    suggestion: Optional[str] = None
+    details: Optional[Dict[str, Any]] = None
+
+
+def build_success(data: Any = None, message: str = "ok", warnings: Optional[list] = None) -> Dict[str, Any]:
+    payload: Dict[str, Any] = {
+        "status": "success",
+        "message": message,
+    }
+    if data is not None:
+        payload["data"] = data
+    if warnings:
+        payload["warnings"] = warnings
+    return payload
+
+
+def build_error(
+    code: str,
+    message: str,
+    suggestion: Optional[str] = None,
+    details: Optional[Dict[str, Any]] = None,
+) -> Dict[str, Any]:
+    error: Dict[str, Any] = {
+        "code": code,
+        "message": message,
+    }
+    if suggestion:
+        error["suggestion"] = suggestion
+    if details:
+        error["details"] = details
+    return {
+        "status": "error",
+        "error": error,
+    }
+
+
+def print_json(payload: Dict[str, Any]) -> None:
+    print(json.dumps(payload, ensure_ascii=False))
+
+
+def print_success(data: Any = None, message: str = "ok", warnings: Optional[list] = None) -> None:
+    print_json(build_success(data=data, message=message, warnings=warnings))
+
+
+def print_error(
+    code: str,
+    message: str,
+    suggestion: Optional[str] = None,
+    details: Optional[Dict[str, Any]] = None,
+) -> None:
+    print_json(build_error(code=code, message=message, suggestion=suggestion, details=details))

+ 280 - 64
.claude/scripts/data_modules/index_manager.py

@@ -412,6 +412,70 @@ class IndexManager:
                 "CREATE INDEX IF NOT EXISTS idx_debt_events_chapter ON debt_events(chapter)"
             )
 
+            # ==================== v5.4 新增表:无效事实与日志 ====================
+
+            # 无效事实表
+            cursor.execute("""
+                CREATE TABLE IF NOT EXISTS invalid_facts (
+                    id INTEGER PRIMARY KEY,
+                    source_type TEXT NOT NULL,
+                    source_id TEXT NOT NULL,
+                    reason TEXT NOT NULL,
+                    status TEXT DEFAULT 'pending',
+                    marked_by TEXT NOT NULL,
+                    marked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+                    confirmed_at TIMESTAMP,
+                    chapter_discovered INTEGER
+                )
+            """)
+
+            cursor.execute(
+                "CREATE INDEX IF NOT EXISTS idx_invalid_status ON invalid_facts(status)"
+            )
+            cursor.execute(
+                "CREATE INDEX IF NOT EXISTS idx_invalid_source ON invalid_facts(source_type, source_id)"
+            )
+
+            # RAG 查询日志
+            cursor.execute("""
+                CREATE TABLE IF NOT EXISTS rag_query_log (
+                    id INTEGER PRIMARY KEY,
+                    query TEXT,
+                    query_type TEXT,
+                    results_count INTEGER,
+                    hit_sources TEXT,
+                    latency_ms INTEGER,
+                    chapter INTEGER,
+                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+                )
+            """)
+            cursor.execute(
+                "CREATE INDEX IF NOT EXISTS idx_rag_query_type ON rag_query_log(query_type)"
+            )
+            cursor.execute(
+                "CREATE INDEX IF NOT EXISTS idx_rag_query_chapter ON rag_query_log(chapter)"
+            )
+
+            # 工具调用统计
+            cursor.execute("""
+                CREATE TABLE IF NOT EXISTS tool_call_stats (
+                    id INTEGER PRIMARY KEY,
+                    tool_name TEXT,
+                    success BOOLEAN,
+                    retry_count INTEGER DEFAULT 0,
+                    error_code TEXT,
+                    error_message TEXT,
+                    chapter INTEGER,
+                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+                )
+            """)
+            cursor.execute(
+                "CREATE INDEX IF NOT EXISTS idx_tool_stats_name ON tool_call_stats(tool_name)"
+            )
+            cursor.execute(
+                "CREATE INDEX IF NOT EXISTS idx_tool_stats_chapter ON tool_call_stats(chapter)"
+            )
+
             conn.commit()
 
     @contextmanager
@@ -1816,6 +1880,118 @@ class IndexManager:
                         pass
         return d
 
+    # ==================== 无效事实管理 ====================
+
+    def mark_invalid_fact(
+        self,
+        source_type: str,
+        source_id: str,
+        reason: str,
+        marked_by: str = "user",
+        chapter_discovered: Optional[int] = None,
+    ) -> int:
+        """标记无效事实(pending)"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO invalid_facts
+                (source_type, source_id, reason, status, marked_by, chapter_discovered)
+                VALUES (?, ?, ?, 'pending', ?, ?)
+            """,
+                (source_type, str(source_id), reason, marked_by, chapter_discovered),
+            )
+            conn.commit()
+            return int(cursor.lastrowid)
+
+    def resolve_invalid_fact(self, invalid_id: int, action: str) -> bool:
+        """确认或撤销无效标记"""
+        action = action.lower()
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            if action == "confirm":
+                cursor.execute(
+                    """
+                    UPDATE invalid_facts
+                    SET status = 'confirmed', confirmed_at = CURRENT_TIMESTAMP
+                    WHERE id = ?
+                """,
+                    (invalid_id,),
+                )
+            elif action == "dismiss":
+                cursor.execute("DELETE FROM invalid_facts WHERE id = ?", (invalid_id,))
+            else:
+                return False
+            conn.commit()
+            return cursor.rowcount > 0
+
+    def list_invalid_facts(self, status: Optional[str] = None) -> List[Dict]:
+        """列出无效事实"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            if status:
+                cursor.execute(
+                    "SELECT * FROM invalid_facts WHERE status = ? ORDER BY id DESC",
+                    (status,),
+                )
+            else:
+                cursor.execute("SELECT * FROM invalid_facts ORDER BY id DESC")
+            return [dict(r) for r in cursor.fetchall()]
+
+    def get_invalid_ids(self, source_type: str, status: str = "confirmed") -> set[str]:
+        """获取无效事实 ID 集合"""
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                "SELECT source_id FROM invalid_facts WHERE source_type = ? AND status = ?",
+                (source_type, status),
+            )
+            return {str(r[0]) for r in cursor.fetchall() if r and r[0] is not None}
+
+    # ==================== 日志记录 ====================
+
+    def log_rag_query(
+        self,
+        query: str,
+        query_type: str,
+        results_count: int,
+        hit_sources: Optional[str] = None,
+        latency_ms: Optional[int] = None,
+        chapter: Optional[int] = None,
+    ) -> None:
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO rag_query_log
+                (query, query_type, results_count, hit_sources, latency_ms, chapter)
+                VALUES (?, ?, ?, ?, ?, ?)
+            """,
+                (query, query_type, results_count, hit_sources, latency_ms, chapter),
+            )
+            conn.commit()
+
+    def log_tool_call(
+        self,
+        tool_name: str,
+        success: bool,
+        retry_count: int = 0,
+        error_code: Optional[str] = None,
+        error_message: Optional[str] = None,
+        chapter: Optional[int] = None,
+    ) -> None:
+        with self._get_conn() as conn:
+            cursor = conn.cursor()
+            cursor.execute(
+                """
+                INSERT INTO tool_call_stats
+                (tool_name, success, retry_count, error_code, error_message, chapter)
+                VALUES (?, ?, ?, ?, ?, ?)
+            """,
+                (tool_name, int(bool(success)), retry_count, error_code, error_message, chapter),
+            )
+            conn.commit()
+
     def get_stats(self) -> Dict[str, int]:
         """获取索引统计"""
         with self._get_conn() as conn:
@@ -1894,6 +2070,7 @@ class IndexManager:
 
 def main():
     import argparse
+    from .cli_output import print_success, print_error
 
     parser = argparse.ArgumentParser(description="Index Manager CLI (v5.3)")
     parser.add_argument("--project-root", type=str, help="项目根目录")
@@ -1991,6 +2168,21 @@ def main():
         "--data", required=True, help="JSON 格式的状态变化数据"
     )
 
+    # ==================== v5.4 新增命令 ====================
+    invalid_parser = subparsers.add_parser("mark-invalid")
+    invalid_parser.add_argument("--source-type", required=True)
+    invalid_parser.add_argument("--source-id", required=True)
+    invalid_parser.add_argument("--reason", required=True)
+    invalid_parser.add_argument("--marked-by", default="user")
+    invalid_parser.add_argument("--chapter", type=int, default=None)
+
+    resolve_parser = subparsers.add_parser("resolve-invalid")
+    resolve_parser.add_argument("--id", type=int, required=True)
+    resolve_parser.add_argument("--action", choices=["confirm", "dismiss"], required=True)
+
+    list_invalid_parser = subparsers.add_parser("list-invalid")
+    list_invalid_parser.add_argument("--status", choices=["pending", "confirmed"], default=None)
+
     # ==================== v5.3 新增命令 ====================
 
     # 获取债务汇总
@@ -2067,36 +2259,43 @@ def main():
         config = DataModulesConfig.from_project_root(args.project_root)
 
     manager = IndexManager(config)
+    tool_name = f"index_manager:{args.command or 'unknown'}"
+
+    def emit_success(data=None, message: str = "ok", chapter: Optional[int] = None):
+        print_success(data, message=message)
+        try:
+            manager.log_tool_call(tool_name, True, chapter=chapter)
+        except Exception:
+            pass
+
+    def emit_error(code: str, message: str, suggestion: Optional[str] = None, chapter: Optional[int] = None):
+        print_error(code, message, suggestion=suggestion)
+        try:
+            manager.log_tool_call(tool_name, False, error_code=code, error_message=message, chapter=chapter)
+        except Exception:
+            pass
 
     if args.command == "stats":
-        stats = manager.get_stats()
-        print(json.dumps(stats, ensure_ascii=False, indent=2))
+        emit_success(manager.get_stats(), message="stats")
 
     elif args.command == "get-chapter":
         chapter = manager.get_chapter(args.chapter)
         if chapter:
-            print(json.dumps(chapter, ensure_ascii=False, indent=2))
+            emit_success(chapter, message="chapter")
         else:
-            print(f"未找到章节: {args.chapter}")
+            emit_error("NOT_FOUND", f"未找到章节: {args.chapter}")
 
     elif args.command == "recent-appearances":
         appearances = manager.get_recent_appearances(args.limit)
-        for a in appearances:
-            print(
-                f"{a['entity_id']}: 最后出场第 {a['last_chapter']} 章, 共 {a['total']} 次"
-            )
+        emit_success(appearances, message="recent_appearances")
 
     elif args.command == "entity-appearances":
         appearances = manager.get_entity_appearances(args.entity, args.limit)
-        print(f"{args.entity} 出场记录:")
-        for a in appearances:
-            print(f"  第 {a['chapter']} 章: {a['mentions']}")
+        emit_success({"entity": args.entity, "appearances": appearances}, message="entity_appearances")
 
     elif args.command == "search-scenes":
         scenes = manager.search_scenes_by_location(args.location, args.limit)
-        for s in scenes:
-            print(f"第 {s['chapter']} 章 场景 {s['scene_index']}: {s['location']}")
-            print(f"  {s['summary'][:50]}...")
+        emit_success(scenes, message="scenes")
 
     elif args.command == "process-chapter":
         entities = json.loads(args.entities)
@@ -2109,63 +2308,63 @@ def main():
             entities=entities,
             scenes=scenes,
         )
-        print(f"✓ 已处理第 {args.chapter} 章")
-        print(
-            f"  章节: {stats['chapters']}, 场景: {stats['scenes']}, 出场记录: {stats['appearances']}"
-        )
+        emit_success(stats, message="chapter_processed", chapter=args.chapter)
 
     # ==================== v5.1 新增命令处理 ====================
 
     elif args.command == "get-entity":
         entity = manager.get_entity(args.id)
         if entity:
-            print(json.dumps(entity, ensure_ascii=False, indent=2))
+            emit_success(entity, message="entity")
         else:
-            print(f"未找到实体: {args.id}")
+            emit_error("NOT_FOUND", f"未找到实体: {args.id}")
 
     elif args.command == "get-core-entities":
         entities = manager.get_core_entities()
-        print(json.dumps(entities, ensure_ascii=False, indent=2))
+        emit_success(entities, message="core_entities")
 
     elif args.command == "get-protagonist":
         protagonist = manager.get_protagonist()
         if protagonist:
-            print(json.dumps(protagonist, ensure_ascii=False, indent=2))
+            emit_success(protagonist, message="protagonist")
         else:
-            print("未设置主角")
+            emit_error("NOT_FOUND", "未设置主角")
 
     elif args.command == "get-entities-by-type":
         entities = manager.get_entities_by_type(args.type, args.include_archived)
-        print(json.dumps(entities, ensure_ascii=False, indent=2))
+        emit_success(entities, message="entities_by_type")
 
     elif args.command == "get-by-alias":
         entities = manager.get_entities_by_alias(args.alias)
         if entities:
-            print(json.dumps(entities, ensure_ascii=False, indent=2))
+            emit_success(entities, message="entities_by_alias")
         else:
-            print(f"未找到别名: {args.alias}")
+            emit_error("NOT_FOUND", f"未找到别名: {args.alias}")
 
     elif args.command == "get-aliases":
         aliases = manager.get_entity_aliases(args.entity)
         if aliases:
-            print(f"{args.entity} 的别名: {', '.join(aliases)}")
+            emit_success({"entity": args.entity, "aliases": aliases}, message="aliases")
         else:
-            print(f"{args.entity} 没有别名")
+            emit_error("NOT_FOUND", f"{args.entity} 没有别名")
 
     elif args.command == "register-alias":
         success = manager.register_alias(args.alias, args.entity, args.type)
         if success:
-            print(f"✓ 已注册别名: {args.alias} → {args.entity} ({args.type})")
+            emit_success(
+                {"alias": args.alias, "entity": args.entity, "type": args.type},
+                message="alias_registered",
+            )
         else:
-            print(f"别名已存在或注册失败: {args.alias}")
+            emit_error("ALIAS_EXISTS", f"别名已存在或注册失败: {args.alias}")
 
     elif args.command == "get-relationships":
         rels = manager.get_entity_relationships(args.entity, args.direction)
-        print(json.dumps(rels, ensure_ascii=False, indent=2))
+        emit_success(rels, message="relationships")
 
     elif args.command == "get-state-changes":
         changes = manager.get_entity_state_changes(args.entity, args.limit)
-        print(json.dumps(changes, ensure_ascii=False, indent=2))
+        emit_success(changes, message="state_changes")
 
     elif args.command == "upsert-entity":
         data = json.loads(args.data)
@@ -2182,7 +2381,7 @@ def main():
             is_archived=data.get("is_archived", False),
         )
         is_new = manager.upsert_entity(entity)
-        print(f"✓ {'新建' if is_new else '更新'}实体: {entity.id}")
+        emit_success({"id": entity.id, "created": is_new}, message="entity_upserted")
 
     elif args.command == "upsert-relationship":
         data = json.loads(args.data)
@@ -2194,8 +2393,9 @@ def main():
             chapter=data["chapter"],
         )
         is_new = manager.upsert_relationship(rel)
-        print(
-            f"✓ {'新建' if is_new else '更新'}关系: {rel.from_entity} → {rel.to_entity} ({rel.type})"
+        emit_success(
+            {"from": rel.from_entity, "to": rel.to_entity, "type": rel.type, "created": is_new},
+            message="relationship_upserted",
         )
 
     elif args.command == "record-state-change":
@@ -2209,69 +2409,82 @@ def main():
             chapter=data["chapter"],
         )
         record_id = manager.record_state_change(change)
-        print(f"✓ 已记录状态变化 #{record_id}: {change.entity_id}.{change.field}")
+        emit_success({"id": record_id, "entity": change.entity_id, "field": change.field}, message="state_change_recorded")
+
+    # ==================== v5.4 无效事实命令处理 ====================
+
+    elif args.command == "mark-invalid":
+        invalid_id = manager.mark_invalid_fact(
+            args.source_type,
+            args.source_id,
+            args.reason,
+            marked_by=args.marked_by,
+            chapter_discovered=args.chapter,
+        )
+        emit_success({"id": invalid_id}, message="invalid_marked")
+
+    elif args.command == "resolve-invalid":
+        ok = manager.resolve_invalid_fact(args.id, args.action)
+        if ok:
+            emit_success({"id": args.id, "action": args.action}, message="invalid_resolved")
+        else:
+            emit_error("INVALID_ACTION", f"无法处理 action: {args.action}")
+
+    elif args.command == "list-invalid":
+        rows = manager.list_invalid_facts(args.status)
+        emit_success(rows, message="invalid_list")
 
     # ==================== v5.3 新增命令处理 ====================
 
     elif args.command == "get-debt-summary":
         summary = manager.get_debt_summary()
-        print(json.dumps(summary, ensure_ascii=False, indent=2))
+        emit_success(summary, message="debt_summary")
 
     elif args.command == "get-recent-reading-power":
         records = manager.get_recent_reading_power(args.limit)
-        print(json.dumps(records, ensure_ascii=False, indent=2))
+        emit_success(records, message="recent_reading_power")
 
     elif args.command == "get-chapter-reading-power":
         record = manager.get_chapter_reading_power(args.chapter)
         if record:
-            print(json.dumps(record, ensure_ascii=False, indent=2))
+            emit_success(record, message="chapter_reading_power")
         else:
-            print(f"未找到第 {args.chapter} 章的追读力元数据")
+            emit_error("NOT_FOUND", f"未找到第 {args.chapter} 章的追读力元数据")
 
     elif args.command == "get-pattern-usage-stats":
         stats = manager.get_pattern_usage_stats(args.last_n)
-        print(json.dumps(stats, ensure_ascii=False, indent=2))
+        emit_success(stats, message="pattern_usage_stats")
 
     elif args.command == "get-hook-type-stats":
         stats = manager.get_hook_type_stats(args.last_n)
-        print(json.dumps(stats, ensure_ascii=False, indent=2))
+        emit_success(stats, message="hook_type_stats")
 
     elif args.command == "get-pending-overrides":
         overrides = manager.get_pending_overrides(args.before_chapter)
-        print(json.dumps(overrides, ensure_ascii=False, indent=2))
+        emit_success(overrides, message="pending_overrides")
 
     elif args.command == "get-overdue-overrides":
         overrides = manager.get_overdue_overrides(args.current_chapter)
-        print(json.dumps(overrides, ensure_ascii=False, indent=2))
+        emit_success(overrides, message="overdue_overrides")
 
     elif args.command == "get-active-debts":
         debts = manager.get_active_debts()
-        print(json.dumps(debts, ensure_ascii=False, indent=2))
+        emit_success(debts, message="active_debts")
 
     elif args.command == "get-overdue-debts":
         debts = manager.get_overdue_debts(args.current_chapter)
-        print(json.dumps(debts, ensure_ascii=False, indent=2))
+        emit_success(debts, message="overdue_debts")
 
     elif args.command == "accrue-interest":
         result = manager.accrue_interest(args.current_chapter)
-        print(f"✓ 利息计算完成")
-        print(f"  处理债务: {result['debts_processed']} 笔")
-        print(f"  总利息: {result['total_interest']:.2f}")
-        print(f"  新逾期: {result['new_overdues']} 笔")
-        if result.get("skipped_already_processed", 0) > 0:
-            print(f"  跳过(已计息): {result['skipped_already_processed']} 笔")
+        emit_success(result, message="interest_accrued", chapter=args.current_chapter)
 
     elif args.command == "pay-debt":
         result = manager.pay_debt(args.debt_id, args.amount, args.chapter)
         if "error" in result:
-            print(f"✗ {result['error']}")
-        elif result["fully_paid"]:
-            msg = f"✓ 债务 #{args.debt_id} 已完全偿还"
-            if result.get("override_fulfilled"):
-                msg += " (关联Override已标记fulfilled)"
-            print(msg)
+            emit_error("PAY_DEBT_FAILED", result["error"], chapter=args.chapter)
         else:
-            print(f"✓ 债务 #{args.debt_id} 部分偿还,剩余: {result['remaining']:.2f}")
+            emit_success(result, message="debt_payment", chapter=args.chapter)
 
     elif args.command == "create-override-contract":
         data = json.loads(args.data)
@@ -2286,7 +2499,7 @@ def main():
             status=data.get("status", "pending"),
         )
         contract_id = manager.create_override_contract(contract)
-        print(f"✓ 已创建Override Contract #{contract_id}")
+        emit_success({"id": contract_id}, message="override_contract_created")
 
     elif args.command == "create-debt":
         data = json.loads(args.data)
@@ -2301,14 +2514,14 @@ def main():
             status=data.get("status", "active"),
         )
         debt_id = manager.create_debt(debt)
-        print(f"✓ 已创建债务 #{debt_id}: {debt.debt_type}")
+        emit_success({"id": debt_id, "debt_type": debt.debt_type}, message="debt_created")
 
     elif args.command == "fulfill-override":
         success = manager.fulfill_override(args.contract_id)
         if success:
-            print(f"✓ Override Contract #{args.contract_id} 已标记为已偿还")
+            emit_success({"id": args.contract_id}, message="override_fulfilled")
         else:
-            print(f"✗ 未找到 Override Contract #{args.contract_id}")
+            emit_error("NOT_FOUND", f"未找到 Override Contract #{args.contract_id}")
 
     elif args.command == "save-chapter-reading-power":
         data = json.loads(args.data)
@@ -2325,7 +2538,10 @@ def main():
             debt_balance=data.get("debt_balance", 0.0),
         )
         manager.save_chapter_reading_power(meta)
-        print(f"✓ 已保存第 {meta.chapter} 章追读力元数据")
+        emit_success({"chapter": meta.chapter}, message="reading_power_saved")
+
+    else:
+        emit_error("UNKNOWN_COMMAND", "未指定有效命令", suggestion="请查看 --help")
 
 
 if __name__ == "__main__":

+ 125 - 0
.claude/scripts/data_modules/schemas.py

@@ -0,0 +1,125 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Pydantic schemas for data_modules outputs.
+"""
+from __future__ import annotations
+
+from typing import Any, Dict, List, Optional
+
+from pydantic import BaseModel, Field, ValidationError, ConfigDict
+
+
+class EntityAppeared(BaseModel):
+    model_config = ConfigDict(extra="allow")
+
+    id: str
+    type: str
+    mentions: List[str] = Field(default_factory=list)
+    confidence: float = 1.0
+
+
+class EntityNew(BaseModel):
+    model_config = ConfigDict(extra="allow")
+
+    suggested_id: str
+    name: str
+    type: str
+    tier: str = "装饰"
+
+
+class StateChange(BaseModel):
+    model_config = ConfigDict(extra="allow")
+
+    entity_id: str
+    field: str
+    old: Optional[str] = None
+    new: str
+    reason: Optional[str] = None
+
+
+class RelationshipNew(BaseModel):
+    model_config = ConfigDict(extra="allow", populate_by_name=True)
+
+    from_entity: str = Field(alias="from")
+    to_entity: str = Field(alias="to")
+    type: str
+    description: Optional[str] = None
+    chapter: Optional[int] = None
+
+
+class UncertainCandidate(BaseModel):
+    model_config = ConfigDict(extra="allow")
+
+    type: str
+    id: str
+
+
+class UncertainMention(BaseModel):
+    model_config = ConfigDict(extra="allow")
+
+    mention: str
+    candidates: List[UncertainCandidate] = Field(default_factory=list)
+    confidence: float = 0.0
+    adopted: Optional[str] = None
+
+
+class DataAgentOutput(BaseModel):
+    model_config = ConfigDict(extra="allow")
+
+    entities_appeared: List[EntityAppeared] = Field(default_factory=list)
+    entities_new: List[EntityNew] = Field(default_factory=list)
+    state_changes: List[StateChange] = Field(default_factory=list)
+    relationships_new: List[RelationshipNew] = Field(default_factory=list)
+    scenes_chunked: int = 0
+    uncertain: List[UncertainMention] = Field(default_factory=list)
+    warnings: List[str] = Field(default_factory=list)
+
+
+class ErrorSchema(BaseModel):
+    model_config = ConfigDict(extra="allow")
+
+    code: str
+    message: str
+    suggestion: Optional[str] = None
+    details: Optional[Dict[str, Any]] = None
+
+
+def validate_data_agent_output(payload: Dict[str, Any]) -> DataAgentOutput:
+    return DataAgentOutput.model_validate(payload)
+
+
+def format_validation_error(exc: ValidationError) -> Dict[str, Any]:
+    return {
+        "code": "SCHEMA_VALIDATION_FAILED",
+        "message": "数据结构校验失败",
+        "details": {"errors": exc.errors()},
+        "suggestion": "请检查 data-agent 输出字段是否完整且类型正确",
+    }
+
+
+def normalize_data_agent_output(payload: Dict[str, Any]) -> Dict[str, Any]:
+    if not isinstance(payload, dict):
+        return {}
+
+    def _ensure_list(key: str):
+        value = payload.get(key)
+        if value is None:
+            payload[key] = []
+        elif isinstance(value, list):
+            return
+        else:
+            payload[key] = [value]
+
+    for key in [
+        "entities_appeared",
+        "entities_new",
+        "state_changes",
+        "relationships_new",
+        "uncertain",
+        "warnings",
+    ]:
+        _ensure_list(key)
+
+    payload.setdefault("scenes_chunked", 0)
+    return payload

+ 1 - 0
.claude/scripts/requirements.txt

@@ -4,6 +4,7 @@
 # 核心依赖
 aiohttp>=3.8.0          # 异步 HTTP 客户端(API 调用)
 filelock>=3.0.0         # 文件锁(状态文件并发控制)
+pydantic>=2.0.0         # Schema 校验
 
 # 可选依赖(开发/测试)
 pytest>=7.0.0           # 单元测试