Преглед изворни кода

feat: implement phase-a context ranking and workflow observability

lingfengQAQ пре 4 месеци
родитељ
комит
0cdf046259

+ 46 - 0
.claude/references/context-contract-v2.md

@@ -0,0 +1,46 @@
+# Context Contract v2
+
+## 目的
+- 为 `Context Agent`、`Writer`、`Review` 提供统一、可排序、可追踪的上下文契约。
+- 在不破坏旧调用方的前提下,增强上下文稳定性与命中率。
+
+## 输出结构
+- 根字段保持兼容:`meta`、`sections`、`template`、`weights`。
+- `meta` 新增:
+  - `context_contract_version`: 固定为 `v2`
+  - `ranker`: 当前排序器配置快照(用于复现)
+
+## Section 排序规则
+- `core.recent_summaries`
+  - 主要按章节新近度排序(越近越高)
+  - 含“钩子/悬念/反转/冲突”提示时额外加分
+- `core.recent_meta`
+  - 主要按章节新近度排序
+  - 有 `hook` 的条目优先
+- `scene.appearing_characters`
+  - 综合新近度 + 出场频次排序
+  - 带 `warning`(如 pending invalid)降权
+- `story_skeleton`
+  - 按新近度优先,兼顾摘要信息密度
+- `alerts`
+  - 优先 `critical/high` 或包含关键风险词的项
+
+## 兼容性约束
+- 不改变既有 key 名和字段语义。
+- 仅重排列表顺序;内容不删改(除已有过滤逻辑)。
+- 调用方若忽略 `meta.context_contract_version`,行为与 v1 等价。
+
+## 推荐调用时机
+- `Context Agent` 在 Step 1 聚合上下文时调用。
+- `webnovel-write`、`webnovel-review` 开始阶段调用。
+- 恢复流程(`webnovel-resume`)在 `detect` 后重建上下文时调用。
+
+## 配置项(DataModulesConfig)
+- `context_ranker_enabled`
+- `context_ranker_recency_weight`
+- `context_ranker_frequency_weight`
+- `context_ranker_hook_bonus`
+- `context_ranker_length_bonus_cap`
+- `context_ranker_alert_critical_keywords`
+- `context_ranker_debug`
+

+ 2 - 0
.claude/scripts/data_modules/__init__.py

@@ -19,6 +19,7 @@ from .state_manager import StateManager, EntityState, Relationship, StateChange
 from .index_manager import IndexManager, ChapterMeta, SceneMeta, ReviewMetrics
 from .index_manager import IndexManager, ChapterMeta, SceneMeta, ReviewMetrics
 from .rag_adapter import RAGAdapter, SearchResult
 from .rag_adapter import RAGAdapter, SearchResult
 from .context_manager import ContextManager
 from .context_manager import ContextManager
+from .context_ranker import ContextRanker
 from .snapshot_manager import SnapshotManager
 from .snapshot_manager import SnapshotManager
 from .query_router import QueryRouter
 from .query_router import QueryRouter
 from .style_sampler import StyleSampler, StyleSample, SceneType
 from .style_sampler import StyleSampler, StyleSample, SceneType
@@ -48,6 +49,7 @@ __all__ = [
     "RAGAdapter",
     "RAGAdapter",
     "SearchResult",
     "SearchResult",
     "ContextManager",
     "ContextManager",
+    "ContextRanker",
     "SnapshotManager",
     "SnapshotManager",
     "QueryRouter",
     "QueryRouter",
     # Style Sampler
     # Style Sampler

+ 14 - 0
.claude/scripts/data_modules/config.py

@@ -134,6 +134,20 @@ class DataModulesConfig:
     context_story_skeleton_max_samples: int = 5
     context_story_skeleton_max_samples: int = 5
     context_story_skeleton_snippet_chars: int = 400
     context_story_skeleton_snippet_chars: int = 400
     context_extra_section_budget: int = 800
     context_extra_section_budget: int = 800
+    context_ranker_enabled: bool = True
+    context_ranker_recency_weight: float = 0.7
+    context_ranker_frequency_weight: float = 0.3
+    context_ranker_hook_bonus: float = 0.2
+    context_ranker_length_bonus_cap: float = 0.2
+    context_ranker_alert_critical_keywords: tuple[str, ...] = (
+        "冲突",
+        "矛盾",
+        "critical",
+        "break",
+        "违规",
+        "断裂",
+    )
+    context_ranker_debug: bool = False
 
 
     export_recent_changes_slice: int = 20
     export_recent_changes_slice: int = 20
     export_disambiguation_slice: int = 20
     export_disambiguation_slice: int = 20

+ 4 - 0
.claude/scripts/data_modules/context_manager.py

@@ -12,6 +12,7 @@ from typing import Any, Dict, List, Optional
 
 
 from .config import get_config
 from .config import get_config
 from .index_manager import IndexManager
 from .index_manager import IndexManager
+from .context_ranker import ContextRanker
 from .snapshot_manager import SnapshotManager, SnapshotVersionMismatch
 from .snapshot_manager import SnapshotManager, SnapshotVersionMismatch
 
 
 
 
@@ -31,6 +32,7 @@ class ContextManager:
         self.config = config or get_config()
         self.config = config or get_config()
         self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
         self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
         self.index_manager = IndexManager(self.config)
         self.index_manager = IndexManager(self.config)
+        self.context_ranker = ContextRanker(self.config)
 
 
     def _is_snapshot_compatible(self, cached: Dict[str, Any], template: str) -> bool:
     def _is_snapshot_compatible(self, cached: Dict[str, Any], template: str) -> bool:
         """判断快照是否可用于当前模板。"""
         """判断快照是否可用于当前模板。"""
@@ -70,6 +72,8 @@ class ContextManager:
                 pass
                 pass
 
 
         pack = self._build_pack(chapter)
         pack = self._build_pack(chapter)
+        if getattr(self.config, "context_ranker_enabled", True):
+            pack = self.context_ranker.rank_pack(pack, chapter)
         assembled = self.assemble_context(pack, template=template, max_chars=max_chars)
         assembled = self.assemble_context(pack, template=template, max_chars=max_chars)
 
 
         if save_snapshot:
         if save_snapshot:

+ 210 - 0
.claude/scripts/data_modules/context_ranker.py

@@ -0,0 +1,210 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Context ranker for Context Contract v2.
+
+Goals:
+- Prefer recency while keeping frequent entities stable.
+- Prioritize high-signal hook/alert items.
+- Keep output shape backward compatible (same keys, re-ordered lists).
+"""
+
+from __future__ import annotations
+
+import math
+from typing import Any, Dict, List, Optional
+
+from .config import get_config
+
+
+class ContextRanker:
+    """Rank context-pack sections with lightweight deterministic heuristics."""
+
+    SUMMARY_HOOK_HINTS = ("?", "?", "悬念", "钩子", "反转", "冲突")
+
+    def __init__(self, config=None):
+        self.config = config or get_config()
+
+    def rank_pack(self, pack: Dict[str, Any], chapter: int) -> Dict[str, Any]:
+        ranked = dict(pack)
+
+        core = dict(ranked.get("core") or {})
+        core["recent_summaries"] = self.rank_recent_summaries(core.get("recent_summaries") or [], chapter)
+        core["recent_meta"] = self.rank_recent_meta(core.get("recent_meta") or [], chapter)
+        ranked["core"] = core
+
+        scene = dict(ranked.get("scene") or {})
+        scene["appearing_characters"] = self.rank_appearances(scene.get("appearing_characters") or [], chapter)
+        ranked["scene"] = scene
+
+        ranked["story_skeleton"] = self.rank_story_skeleton(ranked.get("story_skeleton") or [], chapter)
+
+        alerts = dict(ranked.get("alerts") or {})
+        alerts["disambiguation_warnings"] = self.rank_alerts(alerts.get("disambiguation_warnings") or [], chapter)
+        alerts["disambiguation_pending"] = self.rank_alerts(alerts.get("disambiguation_pending") or [], chapter)
+        ranked["alerts"] = alerts
+
+        meta = dict(ranked.get("meta") or {})
+        meta.setdefault("context_contract_version", "v2")
+        meta["ranker"] = {
+            "enabled": True,
+            "recency_weight": float(self.config.context_ranker_recency_weight),
+            "frequency_weight": float(self.config.context_ranker_frequency_weight),
+            "hook_bonus": float(self.config.context_ranker_hook_bonus),
+        }
+        ranked["meta"] = meta
+        return ranked
+
+    def rank_recent_summaries(self, items: List[Dict[str, Any]], current_chapter: int) -> List[Dict[str, Any]]:
+        scored = []
+        for raw in items:
+            item = dict(raw)
+            chapter = self._as_int(item.get("chapter"))
+            summary = str(item.get("summary") or "")
+
+            recency = self._recency_score(chapter, current_chapter)
+            frequency = self._length_score(summary)
+            hook_bonus = float(self.config.context_ranker_hook_bonus) if self._has_hook_hint(summary) else 0.0
+            score = self._combine_score(recency, frequency, hook_bonus)
+            scored.append(self._with_debug_score(item, score, recency, frequency, hook_bonus))
+
+        scored.sort(key=lambda row: row[0], reverse=True)
+        return [row[1] for row in scored]
+
+    def rank_recent_meta(self, items: List[Dict[str, Any]], current_chapter: int) -> List[Dict[str, Any]]:
+        scored = []
+        for raw in items:
+            item = dict(raw)
+            chapter = self._as_int(item.get("chapter"))
+            hook = str(item.get("hook") or "")
+            hook_bonus = float(self.config.context_ranker_hook_bonus) if hook else 0.0
+            recency = self._recency_score(chapter, current_chapter)
+            frequency = self._length_score(hook)
+            score = self._combine_score(recency, frequency, hook_bonus)
+            scored.append(self._with_debug_score(item, score, recency, frequency, hook_bonus))
+
+        scored.sort(key=lambda row: row[0], reverse=True)
+        return [row[1] for row in scored]
+
+    def rank_appearances(self, items: List[Dict[str, Any]], current_chapter: int) -> List[Dict[str, Any]]:
+        scored = []
+        for raw in items:
+            item = dict(raw)
+            last_chapter = self._as_int(item.get("last_chapter") or item.get("chapter"))
+            total = self._as_int(item.get("total")) or 0
+            warning_penalty = 0.15 if item.get("warning") else 0.0
+
+            recency = self._recency_score(last_chapter, current_chapter)
+            frequency = self._frequency_score(total)
+            score = self._combine_score(recency, frequency, 0.0) - warning_penalty
+            scored.append(self._with_debug_score(item, score, recency, frequency, -warning_penalty))
+
+        scored.sort(key=lambda row: row[0], reverse=True)
+        return [row[1] for row in scored]
+
+    def rank_story_skeleton(self, items: List[Dict[str, Any]], current_chapter: int) -> List[Dict[str, Any]]:
+        scored = []
+        for raw in items:
+            item = dict(raw)
+            chapter = self._as_int(item.get("chapter"))
+            summary = str(item.get("summary") or "")
+            recency = self._recency_score(chapter, current_chapter)
+            frequency = self._length_score(summary)
+            score = self._combine_score(recency, frequency, 0.0)
+            scored.append(self._with_debug_score(item, score, recency, frequency, 0.0))
+
+        scored.sort(key=lambda row: row[0], reverse=True)
+        return [row[1] for row in scored]
+
+    def rank_alerts(self, alerts: List[Any], current_chapter: int) -> List[Any]:
+        scored = []
+        keywords = tuple(self.config.context_ranker_alert_critical_keywords)
+
+        for raw in alerts:
+            if isinstance(raw, dict):
+                item: Any = dict(raw)
+                chapter = self._as_int(item.get("chapter"))
+                text = str(item.get("message") or item.get("content") or json_safe(item))
+                severity = str(item.get("severity") or "").lower()
+                critical_bonus = 0.3 if severity in {"critical", "high"} else 0.0
+            else:
+                item = raw
+                chapter = None
+                text = str(raw)
+                critical_bonus = 0.0
+
+            recency = self._recency_score(chapter, current_chapter)
+            keyword_bonus = 0.3 if any(word and word in text for word in keywords) else 0.0
+            score = recency + critical_bonus + keyword_bonus
+
+            if isinstance(item, dict):
+                scored.append(self._with_debug_score(item, score, recency, critical_bonus, keyword_bonus))
+            else:
+                scored.append((score, item))
+
+        scored.sort(key=lambda row: row[0], reverse=True)
+        return [row[1] for row in scored]
+
+    def _combine_score(self, recency: float, frequency: float, bonus: float) -> float:
+        return (
+            recency * float(self.config.context_ranker_recency_weight)
+            + frequency * float(self.config.context_ranker_frequency_weight)
+            + bonus
+        )
+
+    def _recency_score(self, source_chapter: Optional[int], current_chapter: int) -> float:
+        if source_chapter is None:
+            return 0.0
+        gap = max(0, int(current_chapter) - int(source_chapter))
+        return 1.0 / (1.0 + gap)
+
+    def _frequency_score(self, total: int) -> float:
+        if total <= 0:
+            return 0.0
+        # log scale to avoid over-favoring very frequent entities
+        return min(1.0, math.log(1.0 + float(total)) / math.log(11.0))
+
+    def _length_score(self, text: str) -> float:
+        if not text:
+            return 0.0
+        ratio = min(len(text) / 1200.0, 1.0)
+        cap = float(self.config.context_ranker_length_bonus_cap)
+        return ratio * cap
+
+    def _has_hook_hint(self, text: str) -> bool:
+        return any(token in text for token in self.SUMMARY_HOOK_HINTS)
+
+    def _as_int(self, value: Any) -> Optional[int]:
+        if value is None:
+            return None
+        try:
+            return int(value)
+        except (TypeError, ValueError):
+            return None
+
+    def _with_debug_score(
+        self,
+        item: Dict[str, Any],
+        score: float,
+        recency: float,
+        frequency: float,
+        bonus: float,
+    ) -> tuple[float, Dict[str, Any]]:
+        if getattr(self.config, "context_ranker_debug", False):
+            item["_context_score"] = round(score, 6)
+            item["_context_score_detail"] = {
+                "recency": round(recency, 6),
+                "frequency": round(frequency, 6),
+                "bonus": round(bonus, 6),
+            }
+        return score, item
+
+
+def json_safe(value: Any) -> str:
+    try:
+        import json
+
+        return json.dumps(value, ensure_ascii=False)
+    except Exception:
+        return str(value)
+

+ 28 - 0
.claude/scripts/data_modules/tests/test_context_manager.py

@@ -106,3 +106,31 @@ def test_context_snapshot_respects_template(temp_project):
 
 
     assert plot_payload.get("template") == "plot"
     assert plot_payload.get("template") == "plot"
     assert battle_payload.get("template") == "battle"
     assert battle_payload.get("template") == "battle"
+
+
+def test_context_manager_applies_ranker_and_contract_meta(temp_project):
+    state = {
+        "protagonist_state": {"name": "萧炎"},
+        "chapter_meta": {
+            "0002": {"hook": "平稳"},
+            "0003": {"hook": "留下悬念"},
+        },
+        "disambiguation_warnings": [
+            {"chapter": 1, "message": "普通告警"},
+            {"chapter": 3, "message": "critical 冲突告警", "severity": "high"},
+        ],
+        "disambiguation_pending": [],
+    }
+    temp_project.state_file.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
+
+    manager = ContextManager(temp_project)
+    payload = manager.build_context(4, use_snapshot=False, save_snapshot=False)
+
+    assert payload["meta"].get("context_contract_version") == "v2"
+    recent_meta = payload["sections"]["core"]["content"]["recent_meta"]
+    if recent_meta:
+        assert recent_meta[0]["chapter"] == 3
+
+    warnings = payload["sections"]["alerts"]["content"]["disambiguation_warnings"]
+    if warnings and isinstance(warnings[0], dict):
+        assert "critical" in str(warnings[0].get("message", "")) or warnings[0].get("severity") == "high"

+ 55 - 0
.claude/scripts/data_modules/tests/test_context_ranker.py

@@ -0,0 +1,55 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from data_modules.config import DataModulesConfig
+from data_modules.context_ranker import ContextRanker
+
+
+def test_rank_recent_summaries_prefers_recency_and_hook(tmp_path):
+    cfg = DataModulesConfig.from_project_root(tmp_path)
+    ranker = ContextRanker(cfg)
+
+    items = [
+        {"chapter": 8, "summary": "平稳推进"},
+        {"chapter": 9, "summary": "最后留下悬念?"},
+        {"chapter": 7, "summary": "老信息"},
+    ]
+
+    ranked = ranker.rank_recent_summaries(items, current_chapter=10)
+    assert ranked[0]["chapter"] == 9
+    assert ranked[-1]["chapter"] == 7
+
+
+def test_rank_appearances_uses_recency_and_frequency(tmp_path):
+    cfg = DataModulesConfig.from_project_root(tmp_path)
+    ranker = ContextRanker(cfg)
+
+    items = [
+        {"entity_id": "a", "last_chapter": 9, "total": 1},
+        {"entity_id": "b", "last_chapter": 8, "total": 8},
+        {"entity_id": "c", "last_chapter": 9, "total": 3},
+    ]
+
+    ranked = ranker.rank_appearances(items, current_chapter=10)
+    ids = [item["entity_id"] for item in ranked]
+    assert ids[0] == "c"
+    assert ids[-1] in {"a", "b"}
+
+
+def test_rank_pack_adds_context_contract_meta(tmp_path):
+    cfg = DataModulesConfig.from_project_root(tmp_path)
+    ranker = ContextRanker(cfg)
+
+    pack = {
+        "meta": {"chapter": 12},
+        "core": {"recent_summaries": [{"chapter": 11, "summary": "x"}], "recent_meta": []},
+        "scene": {"appearing_characters": []},
+        "global": {},
+        "story_skeleton": [],
+        "alerts": {"disambiguation_warnings": [], "disambiguation_pending": []},
+    }
+
+    ranked = ranker.rank_pack(pack, chapter=12)
+    assert ranked["meta"]["context_contract_version"] == "v2"
+    assert ranked["meta"]["ranker"]["enabled"] is True
+

+ 78 - 0
.claude/scripts/data_modules/tests/test_workflow_manager.py

@@ -0,0 +1,78 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import json
+import sys
+from pathlib import Path
+
+
+def _load_module():
+    scripts_dir = Path(__file__).resolve().parents[2]
+    if str(scripts_dir) not in sys.path:
+        sys.path.insert(0, str(scripts_dir))
+    import workflow_manager
+
+    return workflow_manager
+
+
+def test_workflow_lifecycle_and_trace(tmp_path, monkeypatch):
+    module = _load_module()
+    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
+
+    webnovel_dir = tmp_path / ".webnovel"
+    webnovel_dir.mkdir(parents=True, exist_ok=True)
+
+    module.start_task("webnovel-write", {"chapter_num": 7})
+    module.start_step("Step 1", "Context")
+    module.complete_step("Step 1", json.dumps({"state_json_modified": True}, ensure_ascii=False))
+    module.complete_task(json.dumps({"review_completed": True}, ensure_ascii=False))
+
+    state = module.load_state()
+    assert state["current_task"] is None
+    assert state["history"][-1]["status"] == module.TASK_STATUS_COMPLETED
+    assert state["last_stable_state"]["artifacts"]["review_completed"] is True
+
+    trace_path = module.get_call_trace_path()
+    assert trace_path.exists()
+    lines = trace_path.read_text(encoding="utf-8").strip().splitlines()
+    events = [json.loads(line)["event"] for line in lines if line.strip()]
+    assert "task_started" in events
+    assert "step_started" in events
+    assert "step_completed" in events
+    assert "task_completed" in events
+
+
+def test_start_task_reentry_increments_retry(tmp_path, monkeypatch):
+    module = _load_module()
+    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
+
+    webnovel_dir = tmp_path / ".webnovel"
+    webnovel_dir.mkdir(parents=True, exist_ok=True)
+
+    module.start_task("webnovel-write", {"chapter_num": 8})
+    module.start_task("webnovel-write", {"chapter_num": 8})
+
+    state = module.load_state()
+    task = state["current_task"]
+    assert task is not None
+    assert task["status"] == module.TASK_STATUS_RUNNING
+    assert int(task.get("retry_count", 0)) >= 1
+
+
+def test_complete_step_rejects_mismatch_step_id(tmp_path, monkeypatch):
+    module = _load_module()
+    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
+
+    webnovel_dir = tmp_path / ".webnovel"
+    webnovel_dir.mkdir(parents=True, exist_ok=True)
+
+    module.start_task("webnovel-write", {"chapter_num": 9})
+    module.start_step("Step 2A", "Draft")
+    module.complete_step("Step 2B")
+
+    state = module.load_state()
+    current_step = state["current_task"]["current_step"]
+    assert current_step is not None
+    assert current_step["id"] == "Step 2A"
+    assert current_step["status"] == module.STEP_STATUS_RUNNING
+

+ 494 - 330
.claude/scripts/workflow_manager.py

@@ -1,275 +1,431 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
 """
 """
-工作流状态管理器
-- 追踪命令执行状态
-- 检测中断点
-- 提供恢复策略
+Workflow state manager
+- Track write/review task execution status
+- Detect interruption points
+- Provide recovery options
+- Emit call traces for observability
 """
 """
 
 
+from __future__ import annotations
+
 import json
 import json
 import os
 import os
-import sys
 import subprocess
 import subprocess
+import sys
 from datetime import datetime
 from datetime import datetime
 from pathlib import Path
 from pathlib import Path
+from typing import Any, Dict, Optional
 
 
-# ============================================================================
-# 安全修复:导入安全工具函数(P1 MEDIUM)
-# ============================================================================
-from security_utils import create_secure_directory, atomic_write_json
-from project_locator import resolve_project_root
 from chapter_paths import default_chapter_draft_path, find_chapter_file
 from chapter_paths import default_chapter_draft_path, find_chapter_file
+from project_locator import resolve_project_root
+from security_utils import atomic_write_json, create_secure_directory
+
 
 
-# UTF-8 编码修复(Windows兼容)
-if sys.platform == 'win32':
+# UTF-8 output for Windows console (CLI run only, avoid pytest capture issues)
+if sys.platform == "win32" and __name__ == "__main__" and not os.environ.get("PYTEST_CURRENT_TEST"):
     import io
     import io
-    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
-    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
 
 
-def find_project_root():
-    """解析项目根目录(包含 .webnovel/state.json)"""
+    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
+    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
+
+
+TASK_STATUS_RUNNING = "running"
+TASK_STATUS_COMPLETED = "completed"
+TASK_STATUS_FAILED = "failed"
+
+STEP_STATUS_STARTED = "started"
+STEP_STATUS_RUNNING = "running"
+STEP_STATUS_COMPLETED = "completed"
+STEP_STATUS_FAILED = "failed"
+
+
+def now_iso() -> str:
+    return datetime.now().isoformat()
+
+
+def find_project_root() -> Path:
+    """Resolve project root (containing .webnovel/state.json)."""
     return resolve_project_root()
     return resolve_project_root()
 
 
-def get_workflow_state_path():
-    """获取 workflow_state.json 的完整路径"""
+
+def get_workflow_state_path() -> Path:
+    """Absolute path to workflow_state.json."""
+    project_root = find_project_root()
+    return project_root / ".webnovel" / "workflow_state.json"
+
+
+def get_call_trace_path() -> Path:
     project_root = find_project_root()
     project_root = find_project_root()
-    return project_root / '.webnovel' / 'workflow_state.json'
+    return project_root / ".webnovel" / "observability" / "call_trace.jsonl"
+
+
+def append_call_trace(event: str, payload: Optional[Dict[str, Any]] = None):
+    """Append workflow call trace event (best effort)."""
+    payload = payload or {}
+    trace_path = get_call_trace_path()
+    create_secure_directory(str(trace_path.parent))
+    row = {
+        "timestamp": now_iso(),
+        "event": event,
+        "payload": payload,
+    }
+    with open(trace_path, "a", encoding="utf-8") as f:
+        f.write(json.dumps(row, ensure_ascii=False) + "\n")
+
+
+def safe_append_call_trace(event: str, payload: Optional[Dict[str, Any]] = None):
+    try:
+        append_call_trace(event, payload)
+    except Exception:
+        pass
+
+
+def _new_task(command: str, args: Dict[str, Any]) -> Dict[str, Any]:
+    started_at = now_iso()
+    return {
+        "command": command,
+        "args": args,
+        "started_at": started_at,
+        "last_heartbeat": started_at,
+        "status": TASK_STATUS_RUNNING,
+        "current_step": None,
+        "completed_steps": [],
+        "failed_steps": [],
+        "pending_steps": get_pending_steps(command),
+        "retry_count": 0,
+        "artifacts": {
+            "chapter_file": {},
+            "git_status": {},
+            "state_json_modified": False,
+            "entities_appeared": False,
+            "review_completed": False,
+        },
+    }
+
+
+def _finalize_current_step_as_failed(task: Dict[str, Any], reason: str):
+    current_step = task.get("current_step")
+    if not current_step:
+        return
+    if current_step.get("status") in {STEP_STATUS_COMPLETED, STEP_STATUS_FAILED}:
+        return
+
+    current_step = dict(current_step)
+    current_step["status"] = STEP_STATUS_FAILED
+    current_step["failed_at"] = now_iso()
+    current_step["failure_reason"] = reason
+    task.setdefault("failed_steps", []).append(current_step)
+    task["current_step"] = None
+
+
+def _mark_task_failed(state: Dict[str, Any], reason: str):
+    task = state.get("current_task")
+    if not task:
+        return
+
+    _finalize_current_step_as_failed(task, reason=reason)
+    task["status"] = TASK_STATUS_FAILED
+    task["failed_at"] = now_iso()
+    task["failure_reason"] = reason
+
 
 
 def start_task(command, args):
 def start_task(command, args):
-    """开始新任务"""
+    """Start a new task."""
     state = load_state()
     state = load_state()
-    state['current_task'] = {
-        'command': command,
-        'args': args,
-        'started_at': datetime.now().isoformat(),
-        'last_heartbeat': datetime.now().isoformat(),
-        'status': 'running',
-        'current_step': None,
-        'completed_steps': [],
-        'pending_steps': get_pending_steps(command),
-        'artifacts': {
-            'chapter_file': {},
-            'git_status': {},
-            'state_json_modified': False,
-            'entities_appeared': False,
-            'review_completed': False
-        }
-    }
+    current = state.get("current_task")
+
+    if current and current.get("status") == TASK_STATUS_RUNNING:
+        current["retry_count"] = int(current.get("retry_count", 0)) + 1
+        current["last_heartbeat"] = now_iso()
+        state["current_task"] = current
+        save_state(state)
+        safe_append_call_trace(
+            "task_reentered",
+            {
+                "command": current.get("command"),
+                "chapter": current.get("args", {}).get("chapter_num"),
+                "retry_count": current["retry_count"],
+            },
+        )
+        print(f"ℹ️ 任务已在运行,执行重入标记: {current.get('command')}")
+        return
+
+    state["current_task"] = _new_task(command, args)
     save_state(state)
     save_state(state)
+    safe_append_call_trace("task_started", {"command": command, "args": args})
     print(f"✅ 任务已启动: {command} {json.dumps(args, ensure_ascii=False)}")
     print(f"✅ 任务已启动: {command} {json.dumps(args, ensure_ascii=False)}")
 
 
+
 def start_step(step_id, step_name, progress_note=None):
 def start_step(step_id, step_name, progress_note=None):
-    """标记Step开始"""
+    """Mark step started."""
     state = load_state()
     state = load_state()
-    if not state.get('current_task'):
+    task = state.get("current_task")
+    if not task:
         print("⚠️ 无活动任务,请先使用 start-task")
         print("⚠️ 无活动任务,请先使用 start-task")
         return
         return
 
 
-    state['current_task']['current_step'] = {
-        'id': step_id,
-        'name': step_name,
-        'status': 'in_progress',
-        'started_at': datetime.now().isoformat(),
-        'progress_note': progress_note
+    _finalize_current_step_as_failed(task, reason="step_replaced_before_completion")
+
+    started_at = now_iso()
+    task["current_step"] = {
+        "id": step_id,
+        "name": step_name,
+        "status": STEP_STATUS_STARTED,
+        "started_at": started_at,
+        "running_at": started_at,
+        "attempt": int(task.get("retry_count", 0)) + 1,
+        "progress_note": progress_note,
     }
     }
-    state['current_task']['last_heartbeat'] = datetime.now().isoformat()
+    task["current_step"]["status"] = STEP_STATUS_RUNNING
+    task["status"] = TASK_STATUS_RUNNING
+    task["last_heartbeat"] = now_iso()
+
     save_state(state)
     save_state(state)
-    print(f"▶️  {step_id} 开始: {step_name}")
+    safe_append_call_trace(
+        "step_started",
+        {
+            "step_id": step_id,
+            "step_name": step_name,
+            "command": task.get("command"),
+            "chapter": task.get("args", {}).get("chapter_num"),
+            "progress_note": progress_note,
+        },
+    )
+    print(f"▶️ {step_id} 开始: {step_name}")
+
 
 
 def complete_step(step_id, artifacts_json=None):
 def complete_step(step_id, artifacts_json=None):
-    """标记Step完成"""
+    """Mark step completed."""
     state = load_state()
     state = load_state()
-    if not state.get('current_task') or not state['current_task'].get('current_step'):
-        print("⚠️ 无活动Step")
+    task = state.get("current_task")
+    if not task or not task.get("current_step"):
+        print("⚠️ 无活动 Step")
         return
         return
 
 
-    current_step = state['current_task']['current_step']
-    current_step['status'] = 'completed'
-    current_step['completed_at'] = datetime.now().isoformat()
+    current_step = task["current_step"]
+    if current_step.get("id") != step_id:
+        print(f"⚠️ 当前 Step 为 {current_step.get('id')},与 {step_id} 不一致,拒绝完成")
+        safe_append_call_trace(
+            "step_complete_rejected",
+            {
+                "requested_step_id": step_id,
+                "active_step_id": current_step.get("id"),
+                "command": task.get("command"),
+            },
+        )
+        return
+
+    current_step["status"] = STEP_STATUS_COMPLETED
+    current_step["completed_at"] = now_iso()
 
 
     if artifacts_json:
     if artifacts_json:
         try:
         try:
             artifacts = json.loads(artifacts_json)
             artifacts = json.loads(artifacts_json)
-            current_step['artifacts'] = artifacts
-            # 更新task级别的artifacts
-            state['current_task']['artifacts'].update(artifacts)
-        except json.JSONDecodeError as e:
-            print(f"⚠️ Artifacts JSON解析失败: {e}")
-
-    state['current_task']['completed_steps'].append(current_step)
-    state['current_task']['current_step'] = None
-    state['current_task']['last_heartbeat'] = datetime.now().isoformat()
+            current_step["artifacts"] = artifacts
+            task["artifacts"].update(artifacts)
+        except json.JSONDecodeError as exc:
+            print(f"⚠️ Artifacts JSON 解析失败: {exc}")
+
+    task["completed_steps"].append(current_step)
+    task["current_step"] = None
+    task["last_heartbeat"] = now_iso()
+
     save_state(state)
     save_state(state)
+    safe_append_call_trace(
+        "step_completed",
+        {
+            "step_id": step_id,
+            "command": task.get("command"),
+            "chapter": task.get("args", {}).get("chapter_num"),
+        },
+    )
     print(f"✅ {step_id} 完成")
     print(f"✅ {step_id} 完成")
 
 
+
 def complete_task(final_artifacts_json=None):
 def complete_task(final_artifacts_json=None):
-    """标记任务完成"""
+    """Mark task completed."""
     state = load_state()
     state = load_state()
-    if not state.get('current_task'):
+    task = state.get("current_task")
+    if not task:
         print("⚠️ 无活动任务")
         print("⚠️ 无活动任务")
         return
         return
 
 
-    state['current_task']['status'] = 'completed'
-    state['current_task']['completed_at'] = datetime.now().isoformat()
+    _finalize_current_step_as_failed(task, reason="task_completed_with_active_step")
+
+    task["status"] = TASK_STATUS_COMPLETED
+    task["completed_at"] = now_iso()
 
 
     if final_artifacts_json:
     if final_artifacts_json:
         try:
         try:
             final_artifacts = json.loads(final_artifacts_json)
             final_artifacts = json.loads(final_artifacts_json)
-            state['current_task']['artifacts'].update(final_artifacts)
-        except json.JSONDecodeError as e:
-            print(f"⚠️ Final artifacts JSON解析失败: {e}")
-
-    # 保存到历史记录
-    state['last_stable_state'] = extract_stable_state(state['current_task'])
-    if 'history' not in state:
-        state['history'] = []
-    state['history'].append({
-        'task_id': f"task_{len(state['history']) + 1:03d}",
-        'command': state['current_task']['command'],
-        'chapter': state['current_task']['args'].get('chapter_num'),
-        'status': 'completed',
-        'completed_at': state['current_task']['completed_at']
-    })
-
-    # 清除当前任务
-    state['current_task'] = None
+            task["artifacts"].update(final_artifacts)
+        except json.JSONDecodeError as exc:
+            print(f"⚠️ Final artifacts JSON 解析失败: {exc}")
+
+    state["last_stable_state"] = extract_stable_state(task)
+    if "history" not in state:
+        state["history"] = []
+    state["history"].append(
+        {
+            "task_id": f"task_{len(state['history']) + 1:03d}",
+            "command": task["command"],
+            "chapter": task["args"].get("chapter_num"),
+            "status": TASK_STATUS_COMPLETED,
+            "completed_at": task["completed_at"],
+        }
+    )
+
+    state["current_task"] = None
     save_state(state)
     save_state(state)
-    print(f"🎉 任务完成")
+    safe_append_call_trace(
+        "task_completed",
+        {
+            "command": task.get("command"),
+            "chapter": task.get("args", {}).get("chapter_num"),
+            "completed_steps": len(task.get("completed_steps", [])),
+            "failed_steps": len(task.get("failed_steps", [])),
+        },
+    )
+    print("🎀 任务完成")
+
 
 
 def detect_interruption():
 def detect_interruption():
-    """检测中断状态"""
+    """Detect interruption state."""
     state = load_state()
     state = load_state()
-    if not state or 'current_task' not in state or state['current_task'] is None:
-        return None  # 无中断任务
+    if not state or "current_task" not in state or state["current_task"] is None:
+        return None
 
 
-    task = state['current_task']
-    if task['status'] == 'completed':
-        return None  # 任务已完成
+    task = state["current_task"]
+    if task.get("status") == TASK_STATUS_COMPLETED:
+        return None
 
 
-    # 判断中断原因
-    last_heartbeat = datetime.fromisoformat(task['last_heartbeat'])
+    last_heartbeat = datetime.fromisoformat(task["last_heartbeat"])
     elapsed = (datetime.now() - last_heartbeat).total_seconds()
     elapsed = (datetime.now() - last_heartbeat).total_seconds()
 
 
     interrupt_info = {
     interrupt_info = {
-        'command': task['command'],
-        'args': task['args'],
-        'current_step': task['current_step'],
-        'completed_steps': task['completed_steps'],
-        'elapsed_seconds': elapsed,
-        'artifacts': task['artifacts'],
-        'started_at': task['started_at']
+        "command": task["command"],
+        "args": task["args"],
+        "task_status": task.get("status"),
+        "current_step": task.get("current_step"),
+        "completed_steps": task.get("completed_steps", []),
+        "failed_steps": task.get("failed_steps", []),
+        "elapsed_seconds": elapsed,
+        "artifacts": task.get("artifacts", {}),
+        "started_at": task.get("started_at"),
+        "retry_count": int(task.get("retry_count", 0)),
     }
     }
 
 
+    safe_append_call_trace(
+        "interruption_detected",
+        {
+            "command": task.get("command"),
+            "chapter": task.get("args", {}).get("chapter_num"),
+            "task_status": task.get("status"),
+            "current_step": (task.get("current_step") or {}).get("id"),
+            "elapsed_seconds": elapsed,
+        },
+    )
     return interrupt_info
     return interrupt_info
 
 
+
 def analyze_recovery_options(interrupt_info):
 def analyze_recovery_options(interrupt_info):
-    """分析恢复选项(基于中断点)"""
-    current_step = interrupt_info['current_step']
-    command = interrupt_info['command']
-    chapter_num = interrupt_info['args'].get('chapter_num', '?')
+    """Analyze recovery options based on interruption point."""
+    current_step = interrupt_info["current_step"]
+    command = interrupt_info["command"]
+    chapter_num = interrupt_info["args"].get("chapter_num", "?")
 
 
     if not current_step:
     if not current_step:
-        # 任务刚开始就中断
-        return [{
-            'option': 'A',
-            'label': '从头开始',
-            'risk': 'low',
-            'description': '重新执行完整流程',
-            'actions': [
-                f"删除 workflow_state.json 当前任务",
-                f"执行 /{command} {chapter_num}"
-            ]
-        }]
-
-    step_id = current_step['id']
-
-    # 基于Step ID的恢复策略
-    if step_id in {'Step 1', 'Step 1.5'}:
-        # Step 1中断:无副作用
-        return [{
-            'option': 'A',
-            'label': '从Step 1重新开始',
-            'risk': 'low',
-            'description': '重新加载上下文',
-            'actions': [
-                f"清理中断状态",
-                f"执行 /{command} {chapter_num}"
-            ]
-        }]
-
-    elif step_id in {'Step 2', 'Step 2A', 'Step 2B'}:
-        # Step 2A/2B 中断:可能有半成品文件
-        chapter_file = interrupt_info['artifacts'].get('chapter_file', {})
-
-        # 使用 chapter_paths 模块定位章节文件(兼容新旧目录结构)
+        return [
+            {
+                "option": "A",
+                "label": "从头开始",
+                "risk": "low",
+                "description": "重新执行完整流程",
+                "actions": [
+                    "删除 workflow_state.json 当前任务",
+                    f"执行 /{command} {chapter_num}",
+                ],
+            }
+        ]
+
+    step_id = current_step["id"]
+
+    if step_id in {"Step 1", "Step 1.5"}:
+        return [
+            {
+                "option": "A",
+                "label": "从 Step 1 重新开始",
+                "risk": "low",
+                "description": "重新加载上下文",
+                "actions": [
+                    "清理中断状态",
+                    f"执行 /{command} {chapter_num}",
+                ],
+            }
+        ]
+
+    if step_id in {"Step 2", "Step 2A", "Step 2B"}:
         project_root = find_project_root()
         project_root = find_project_root()
         existing_chapter = find_chapter_file(project_root, chapter_num)
         existing_chapter = find_chapter_file(project_root, chapter_num)
         draft_path = None
         draft_path = None
         if existing_chapter:
         if existing_chapter:
             chapter_path = str(existing_chapter.relative_to(project_root))
             chapter_path = str(existing_chapter.relative_to(project_root))
         else:
         else:
-            # 如果不存在,使用新格式的默认路径
             draft_path = default_chapter_draft_path(project_root, chapter_num)
             draft_path = default_chapter_draft_path(project_root, chapter_num)
             chapter_path = str(draft_path.relative_to(project_root))
             chapter_path = str(draft_path.relative_to(project_root))
 
 
-        options = [{
-            'option': 'A',
-            'label': '删除半成品,从Step 1重新开始',
-            'risk': 'low',
-            'description': f"清理 {chapter_path},重新生成章节",
-            'actions': [
-                f"删除 {chapter_path}(如存在)",
-                f"清理 Git 暂存区",
-                f"清理中断状态",
-                f"执行 /{command} {chapter_num}"
-            ]
-        }]
-
-        # 检查文件是否存在
+        options = [
+            {
+                "option": "A",
+                "label": "删除半成品,从 Step 1 重启",
+                "risk": "low",
+                "description": f"清理 {chapter_path},重新生成章节",
+                "actions": [
+                    f"删除 {chapter_path}(如存在)",
+                    "清理 Git 暂存区",
+                    "清理中断状态",
+                    f"执行 /{command} {chapter_num}",
+                ],
+            }
+        ]
+
         candidate = existing_chapter or draft_path
         candidate = existing_chapter or draft_path
         if candidate and candidate.exists():
         if candidate and candidate.exists():
-            options.append({
-                'option': 'B',
-                'label': '回滚到上一章',
-                'risk': 'medium',
-                'description': '丢弃所有当前章节进度',
-                'actions': [
-                    f"git reset --hard ch{(chapter_num-1):04d}",
-                    f"清理中断状态",
-                    "重新决定是否继续Ch{chapter_num}"
-                ]
-            })
-
+            options.append(
+                {
+                    "option": "B",
+                    "label": "回滚到上一章",
+                    "risk": "medium",
+                    "description": "丢弃当前章节进度",
+                    "actions": [
+                        f"git reset --hard ch{(chapter_num - 1):04d}",
+                        "清理中断状态",
+                        f"重新决定是否继续 Ch{chapter_num}",
+                    ],
+                }
+            )
         return options
         return options
 
 
-    elif step_id == 'Step 3':
-        # Step 3 中断:审查未完成
+    if step_id == "Step 3":
         return [
         return [
             {
             {
-                'option': 'A',
-                'label': '重新执行审查',
-                'risk': 'medium',
-                'description': '重新调用6个审查员(并行)',
-                'actions': [
-                    "重新调用6个审查员(并行)",
-                    "生成审查报告",
-                    "继续 Step 4 润色"
-                ]
+                "option": "A",
+                "label": "重新执行审查",
+                "risk": "medium",
+                "description": "重新调用审查员并生成报告",
+                "actions": ["重新执行审查", "生成审查报告", "继续 Step 4 润色"],
             },
             },
             {
             {
-                'option': 'B',
-                'label': '跳过审查,直接润色',
-                'risk': 'low',
-                'description': '不进行审查,可后续用 /webnovel-review 补审',
-                'actions': [
-                    "标记审查为已跳过",
-                    "继续 Step 4 润色"
-                ]
-            }
+                "option": "B",
+                "label": "跳过审查直接润色",
+                "risk": "low",
+                "description": "后续可用 /webnovel-review 补审",
+                "actions": ["标记审查已跳过", "继续 Step 4 润色"],
+            },
         ]
         ]
 
 
-    elif step_id == 'Step 4':
-        # Step 4 中断:润色中
+    if step_id == "Step 4":
         project_root = find_project_root()
         project_root = find_project_root()
         existing_chapter = find_chapter_file(project_root, chapter_num)
         existing_chapter = find_chapter_file(project_root, chapter_num)
         draft_path = None
         draft_path = None
@@ -281,92 +437,69 @@ def analyze_recovery_options(interrupt_info):
 
 
         return [
         return [
             {
             {
-                'option': 'A',
-                'label': '继续润色',
-                'risk': 'low',
-                'description': f"继续润色 {chapter_path},完成后进入 Step 5",
-                'actions': [
-                    f"打开并继续润色 {chapter_path}",
-                    "保存文件",
-                    "继续 Step 5(Data Agent)"
-                ]
+                "option": "A",
+                "label": "继续润色",
+                "risk": "low",
+                "description": f"继续润色 {chapter_path},完成后进入 Step 5",
+                "actions": [f"打开并继续润色 {chapter_path}", "保存文件", "继续 Step 5(Data Agent)"],
             },
             },
             {
             {
-                'option': 'B',
-                'label': '删除润色稿,从 Step 2A 重写',
-                'risk': 'medium',
-                'description': f"删除 {chapter_path},重新生成章节内容",
-                'actions': [
-                    f"删除 {chapter_path}",
-                    "清理 Git 暂存区",
-                    "清理中断状态",
-                    f"执行 /{command} {chapter_num}"
-                ]
+                "option": "B",
+                "label": "删除润色稿,从 Step 2A 重写",
+                "risk": "medium",
+                "description": f"删除 {chapter_path} 并重新生成章节内容",
+                "actions": [f"删除 {chapter_path}", "清理 Git 暂存区", "清理中断状态", f"执行 /{command} {chapter_num}"],
+            },
+        ]
+
+    if step_id == "Step 5":
+        return [
+            {
+                "option": "A",
+                "label": "从 Step 5 重新开始",
+                "risk": "low",
+                "description": "重新运行 Data Agent(幂等)",
+                "actions": ["重新调用 Data Agent", "继续 Step 6(Git 备份)"],
             }
             }
         ]
         ]
 
 
-    elif step_id == 'Step 5':
-        # Step 5 中断:Data Agent 处理中
-        return [{
-            'option': 'A',
-            'label': '从 Step 5 重新开始',
-            'risk': 'low',
-            'description': '重新运行 Data Agent(幂等操作)',
-            'actions': [
-                "重新调用 Data Agent",
-                "继续 Step 6(Git 备份)"
-            ]
-        }]
-
-    elif step_id == 'Step 6':
-        # Step 6 中断:Git 未提交
+    if step_id == "Step 6":
         return [
         return [
             {
             {
-                'option': 'A',
-                'label': '继续 Git 提交',
-                'risk': 'low',
-                'description': '完成未完成的 Git commit + tag',
-                'actions': [
-                    "检查 Git 暂存区",
-                    "重新执行 backup_manager.py",
-                    "继续完成工作流追踪(complete-task)"
-                ]
+                "option": "A",
+                "label": "继续 Git 提交",
+                "risk": "low",
+                "description": "完成未完成的 Git commit + tag",
+                "actions": ["检查 Git 暂存区", "重新执行 backup_manager.py", "继续 complete-task"],
             },
             },
             {
             {
-                'option': 'B',
-                'label': '回滚 Git 改动',
-                'risk': 'medium',
-                'description': '丢弃暂存区所有改动',
-                'actions': [
-                    "git reset HEAD .",
-                    f"删除第{chapter_num}章文件",
-                    "清理中断状态"
-                ]
-            }
+                "option": "B",
+                "label": "回滚 Git 改动",
+                "risk": "medium",
+                "description": "丢弃暂存区所有改动",
+                "actions": ["git reset HEAD .", f"删除第{chapter_num}章文件", "清理中断状态"],
+            },
         ]
         ]
 
 
-    # 默认选项
-    return [{
-        'option': 'A',
-        'label': '从头开始',
-        'risk': 'low',
-        'description': '重新执行完整流程',
-        'actions': [
-            f"清理所有中断artifacts",
-            f"执行 /{command} {chapter_num}"
-        ]
-    }]
+    return [
+        {
+            "option": "A",
+            "label": "从头开始",
+            "risk": "low",
+            "description": "重新执行完整流程",
+            "actions": ["清理所有中断 artifacts", f"执行 /{command} {chapter_num}"],
+        }
+    ]
+
 
 
 def cleanup_artifacts(chapter_num):
 def cleanup_artifacts(chapter_num):
-    """清理半成品artifacts"""
+    """Cleanup partial artifacts."""
     artifacts_cleaned = []
     artifacts_cleaned = []
 
 
     project_root = find_project_root()
     project_root = find_project_root()
 
 
-    # 删除章节文件(兼容多种命名/目录结构)
     chapter_path = find_chapter_file(project_root, chapter_num)
     chapter_path = find_chapter_file(project_root, chapter_num)
     if chapter_path is None:
     if chapter_path is None:
-        # 可能是“草稿路径”但尚未重命名
         draft_path = default_chapter_draft_path(project_root, chapter_num)
         draft_path = default_chapter_draft_path(project_root, chapter_num)
         if draft_path.exists():
         if draft_path.exists():
             chapter_path = draft_path
             chapter_path = draft_path
@@ -375,132 +508,163 @@ def cleanup_artifacts(chapter_num):
         chapter_path.unlink()
         chapter_path.unlink()
         artifacts_cleaned.append(str(chapter_path.relative_to(project_root)))
         artifacts_cleaned.append(str(chapter_path.relative_to(project_root)))
 
 
-    # 清理Git暂存区
-    result = subprocess.run(
-        ['git', 'reset', 'HEAD', '.'],
-        cwd=project_root,
-        capture_output=True,
-        text=True
-    )
+    result = subprocess.run(["git", "reset", "HEAD", "."], cwd=project_root, capture_output=True, text=True)
     if result.returncode == 0:
     if result.returncode == 0:
-        artifacts_cleaned.append("Git暂存区已清理(project)")
-
+        artifacts_cleaned.append("Git 暂存区已清理(project)")
+
+    safe_append_call_trace(
+        "artifacts_cleaned",
+        {
+            "chapter": chapter_num,
+            "items": artifacts_cleaned,
+            "git_reset_ok": result.returncode == 0,
+        },
+    )
     return artifacts_cleaned
     return artifacts_cleaned
 
 
+
 def clear_current_task():
 def clear_current_task():
-    """清除当前中断任务"""
+    """Clear interrupted current task."""
     state = load_state()
     state = load_state()
-    if state.get('current_task'):
-        state['current_task'] = None
+    task = state.get("current_task")
+    if task:
+        safe_append_call_trace(
+            "task_cleared",
+            {
+                "command": task.get("command"),
+                "chapter": task.get("args", {}).get("chapter_num"),
+                "status": task.get("status"),
+            },
+        )
+        state["current_task"] = None
         save_state(state)
         save_state(state)
         print("✅ 中断任务已清除")
         print("✅ 中断任务已清除")
     else:
     else:
         print("⚠️ 无中断任务")
         print("⚠️ 无中断任务")
 
 
+
+def fail_current_task(reason: str = "manual_fail"):
+    """Mark current task as failed and keep state for diagnostics."""
+    state = load_state()
+    task = state.get("current_task")
+    if not task:
+        print("⚠️ 无活动任务")
+        return
+
+    _mark_task_failed(state, reason=reason)
+    save_state(state)
+    safe_append_call_trace(
+        "task_failed",
+        {
+            "command": task.get("command"),
+            "chapter": task.get("args", {}).get("chapter_num"),
+            "reason": reason,
+        },
+    )
+    print(f"⚠️ 任务已标记失败: {reason}")
+
+
 def load_state():
 def load_state():
-    """加载workflow状态"""
+    """Load workflow state."""
     state_file = get_workflow_state_path()
     state_file = get_workflow_state_path()
     if not state_file.exists():
     if not state_file.exists():
-        return {'current_task': None, 'last_stable_state': None, 'history': []}
-    with open(state_file, 'r', encoding='utf-8') as f:
-        return json.load(f)
+        return {"current_task": None, "last_stable_state": None, "history": []}
+    with open(state_file, "r", encoding="utf-8") as f:
+        state = json.load(f)
+
+    state.setdefault("current_task", None)
+    state.setdefault("last_stable_state", None)
+    state.setdefault("history", [])
+    if state.get("current_task"):
+        state["current_task"].setdefault("failed_steps", [])
+        state["current_task"].setdefault("retry_count", 0)
+    return state
+
 
 
 def save_state(state):
 def save_state(state):
-    """保存workflow状态(原子化写入)"""
+    """Save workflow state atomically."""
     state_file = get_workflow_state_path()
     state_file = get_workflow_state_path()
-    # ============================================================================
-    # 安全修复:使用原子化写入(P1 MEDIUM)
-    # ============================================================================
     create_secure_directory(str(state_file.parent))
     create_secure_directory(str(state_file.parent))
     atomic_write_json(state_file, state, use_lock=True, backup=False)
     atomic_write_json(state_file, state, use_lock=True, backup=False)
 
 
+
 def get_pending_steps(command):
 def get_pending_steps(command):
-    """获取待执行步骤列表(v5.2 引入,v5.4 沿用)"""
-    if command == 'webnovel-write':
-        # v5.2 引入的 8 步工作流(v5.4 沿用)
-        # Step 1: Context Agent 搜集上下文
-        # Step 1.5: 章节设计(开头/钩子/爽点模式)
-        # Step 2A: 生成粗稿
-        # Step 2B: 风格适配(可选)
-        # Step 3: 审查 (6个Agent并行,只报告)
-        # Step 4: 网文化润色
-        # Step 5: Data Agent 处理数据链
-        # Step 6: Git 备份
-        return ['Step 1', 'Step 1.5', 'Step 2A', 'Step 2B', 'Step 3', 'Step 4', 'Step 5', 'Step 6']
-    elif command == 'webnovel-review':
-        return ['Step 1', 'Step 2', 'Step 3', 'Step 4', 'Step 5', 'Step 6', 'Step 7', 'Step 8']
-    # 其他命令...
+    """Get command pending step list."""
+    if command == "webnovel-write":
+        return ["Step 1", "Step 1.5", "Step 2A", "Step 2B", "Step 3", "Step 4", "Step 5", "Step 6"]
+    if command == "webnovel-review":
+        return ["Step 1", "Step 2", "Step 3", "Step 4", "Step 5", "Step 6", "Step 7", "Step 8"]
     return []
     return []
 
 
+
 def extract_stable_state(task):
 def extract_stable_state(task):
-    """提取稳定状态快照"""
+    """Extract stable state snapshot."""
     return {
     return {
-        'command': task['command'],
-        'chapter_num': task['args'].get('chapter_num'),
-        'completed_at': task.get('completed_at'),
-        'artifacts': task.get('artifacts', {})
+        "command": task["command"],
+        "chapter_num": task["args"].get("chapter_num"),
+        "completed_at": task.get("completed_at"),
+        "artifacts": task.get("artifacts", {}),
     }
     }
 
 
-# CLI接口
-if __name__ == '__main__':
+
+if __name__ == "__main__":
     import argparse
     import argparse
-    parser = argparse.ArgumentParser(description='工作流状态管理')
-    subparsers = parser.add_subparsers(dest='action', help='操作类型')
 
 
-    # start-task
-    p_start_task = subparsers.add_parser('start-task', help='开始新任务')
-    p_start_task.add_argument('--command', required=True, help='命令名称')
-    p_start_task.add_argument('--chapter', type=int, help='章节号')
+    parser = argparse.ArgumentParser(description="工作流状态管理")
+    subparsers = parser.add_subparsers(dest="action", help="操作类型")
+
+    p_start_task = subparsers.add_parser("start-task", help="开始新任务")
+    p_start_task.add_argument("--command", required=True, help="命令名称")
+    p_start_task.add_argument("--chapter", type=int, help="章节号")
+
+    p_start_step = subparsers.add_parser("start-step", help="开始 Step")
+    p_start_step.add_argument("--step-id", required=True, help="Step ID")
+    p_start_step.add_argument("--step-name", required=True, help="Step 名称")
+    p_start_step.add_argument("--note", help="进度备注")
 
 
-    # start-step
-    p_start_step = subparsers.add_parser('start-step', help='开始Step')
-    p_start_step.add_argument('--step-id', required=True, help='Step ID')
-    p_start_step.add_argument('--step-name', required=True, help='Step名称')
-    p_start_step.add_argument('--note', help='进度备注')
+    p_complete_step = subparsers.add_parser("complete-step", help="完成 Step")
+    p_complete_step.add_argument("--step-id", required=True, help="Step ID")
+    p_complete_step.add_argument("--artifacts", help="Artifacts JSON")
 
 
-    # complete-step
-    p_complete_step = subparsers.add_parser('complete-step', help='完成Step')
-    p_complete_step.add_argument('--step-id', required=True, help='Step ID')
-    p_complete_step.add_argument('--artifacts', help='Artifacts JSON')
+    p_complete_task = subparsers.add_parser("complete-task", help="完成任务")
+    p_complete_task.add_argument("--artifacts", help="Final artifacts JSON")
 
 
-    # complete-task
-    p_complete_task = subparsers.add_parser('complete-task', help='完成任务')
-    p_complete_task.add_argument('--artifacts', help='Final artifacts JSON')
+    p_fail_task = subparsers.add_parser("fail-task", help="标记任务失败")
+    p_fail_task.add_argument("--reason", default="manual_fail", help="失败原因")
 
 
-    # detect
-    subparsers.add_parser('detect', help='检测中断')
+    subparsers.add_parser("detect", help="检测中断")
 
 
-    # cleanup
-    p_cleanup = subparsers.add_parser('cleanup', help='清理artifacts')
-    p_cleanup.add_argument('--chapter', type=int, required=True, help='章节号')
+    p_cleanup = subparsers.add_parser("cleanup", help="清理 artifacts")
+    p_cleanup.add_argument("--chapter", type=int, required=True, help="章节号")
 
 
-    # clear
-    subparsers.add_parser('clear', help='清除中断任务')
+    subparsers.add_parser("clear", help="清除中断任务")
 
 
     args = parser.parse_args()
     args = parser.parse_args()
 
 
-    if args.action == 'start-task':
-        start_task(args.command, {'chapter_num': args.chapter})
-    elif args.action == 'start-step':
+    if args.action == "start-task":
+        start_task(args.command, {"chapter_num": args.chapter})
+    elif args.action == "start-step":
         start_step(args.step_id, args.step_name, args.note)
         start_step(args.step_id, args.step_name, args.note)
-    elif args.action == 'complete-step':
+    elif args.action == "complete-step":
         complete_step(args.step_id, args.artifacts)
         complete_step(args.step_id, args.artifacts)
-    elif args.action == 'complete-task':
+    elif args.action == "complete-task":
         complete_task(args.artifacts)
         complete_task(args.artifacts)
-    elif args.action == 'detect':
+    elif args.action == "fail-task":
+        fail_current_task(args.reason)
+    elif args.action == "detect":
         interrupt = detect_interruption()
         interrupt = detect_interruption()
         if interrupt:
         if interrupt:
-            print("\n🔴 检测到中断任务:")
+            print("\n🔶 检测到中断任务:")
             print(json.dumps(interrupt, ensure_ascii=False, indent=2))
             print(json.dumps(interrupt, ensure_ascii=False, indent=2))
-            print("\n💡 恢复选项:")
+            print("\n📕 恢复选项:")
             options = analyze_recovery_options(interrupt)
             options = analyze_recovery_options(interrupt)
             print(json.dumps(options, ensure_ascii=False, indent=2))
             print(json.dumps(options, ensure_ascii=False, indent=2))
         else:
         else:
             print("✅ 无中断任务")
             print("✅ 无中断任务")
-    elif args.action == 'cleanup':
+    elif args.action == "cleanup":
         cleaned = cleanup_artifacts(args.chapter)
         cleaned = cleanup_artifacts(args.chapter)
         print(f"✅ 已清理: {', '.join(cleaned)}")
         print(f"✅ 已清理: {', '.join(cleaned)}")
-    elif args.action == 'clear':
+    elif args.action == "clear":
         clear_current_task()
         clear_current_task()
     else:
     else:
         parser.print_help()
         parser.print_help()

+ 9 - 0
README.md

@@ -758,6 +758,15 @@ git checkout ch0045
 - **审查趋势统计**:get-review-trend-stats 查询近期审查均值和短板
 - **审查趋势统计**:get-review-trend-stats 查询近期审查均值和短板
 - **故事骨架采样**:context_manager 每 N 章采样历史摘要,构建长篇感知
 - **故事骨架采样**:context_manager 每 N 章采样历史摘要,构建长篇感知
 - **上下文工程升级**:基于 Context Engineering Guide 优化
 - **上下文工程升级**:基于 Context Engineering Guide 优化
+
+### Context Contract v2(阶段 A)
+
+- 上下文契约升级为 v2:新增 `meta.context_contract_version = "v2"`
+- 新增上下文排序器:`data_modules/context_ranker.py`
+- 排序策略:近期优先 + 频次稳定 + 钩子/风险信号加权
+- 工作流可观测性:`workflow_manager.py` 会写入 `.webnovel/observability/call_trace.jsonl`
+
+参考文档:`.claude/references/context-contract-v2.md`
 - **invalid_facts 表**:追踪无效事实,支持 pending/confirmed 状态
 - **invalid_facts 表**:追踪无效事实,支持 pending/confirmed 状态
 - **父子向量索引**:parent_chunk_id 支持摘要-场景层级检索
 - **父子向量索引**:parent_chunk_id 支持摘要-场景层级检索
 - **Token 预算管理**:ContextManager 实现 40%/35%/25% 优先级分配
 - **Token 预算管理**:ContextManager 实现 40%/35%/25% 优先级分配