Sfoglia il codice sorgente

feat: 新审查 schema(v6)——结构化问题清单替代评分制

lingfengQAQ 2 mesi fa
parent
commit
ce6bf35262

+ 33 - 0
webnovel-writer/references/review-schema.md

@@ -0,0 +1,33 @@
+# 审查输出 Schema(v6)
+
+统一审查 Agent 输出格式。替代原 checker-output-schema.md 的评分制。
+
+## 核心变化
+
+- **无总分**:不再输出 overall_score,改为结构化问题清单
+- **blocking 语义**:替代原 timeline_gate,severity=critical 默认阻断
+- **单 agent**:不再区分 6 个 checker,统一由 reviewer agent 输出
+
+## Issue Schema
+
+| 字段 | 类型 | 必填 | 说明 |
+|------|------|------|------|
+| severity | critical/high/medium/low | ✅ | 严重度 |
+| category | continuity/setting/character/timeline/ai_flavor/logic/pacing/other | ✅ | 问题分类 |
+| location | string | ✅ | 位置(如"第3段") |
+| description | string | ✅ | 问题描述 |
+| evidence | string | ❌ | 原文引用或记忆对比 |
+| fix_hint | string | ❌ | 修复建议 |
+| blocking | bool | ❌ | 是否阻断(critical 默认 true) |
+
+## 阻断规则
+
+- 存在任何 `blocking=true` 的 issue → Step 4 不得开始
+- `severity=critical` 自动 `blocking=true`
+- 其余 severity 由审查 agent 根据上下文判断
+
+## 指标沉淀
+
+每次审查写入 `index.db.review_metrics`:
+- `chapter, issues_count, blocking_count, categories, timestamp`
+- 用于趋势观测,不用于 gate 决策

+ 100 - 0
webnovel-writer/scripts/data_modules/review_schema.py

@@ -0,0 +1,100 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+审查结果 schema(v6)。
+
+替代原 checker-output-schema.md 的评分制,改为结构化问题清单。
+"""
+from __future__ import annotations
+
+from dataclasses import asdict, dataclass, field
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+
+VALID_SEVERITIES = {"critical", "high", "medium", "low"}
+VALID_CATEGORIES = {
+    "continuity", "setting", "character", "timeline",
+    "ai_flavor", "logic", "pacing", "other",
+}
+
+
+@dataclass
+class ReviewIssue:
+    severity: str
+    category: str = "other"
+    location: str = ""
+    description: str = ""
+    evidence: str = ""
+    fix_hint: str = ""
+    blocking: Optional[bool] = None
+
+    def __post_init__(self):
+        if self.severity not in VALID_SEVERITIES:
+            self.severity = "medium"
+        if self.category not in VALID_CATEGORIES:
+            self.category = "other"
+        if self.blocking is None:
+            self.blocking = self.severity == "critical"
+
+    def to_dict(self) -> Dict[str, Any]:
+        return asdict(self)
+
+
+@dataclass
+class ReviewResult:
+    chapter: int
+    issues: List[ReviewIssue] = field(default_factory=list)
+    summary: str = ""
+
+    @property
+    def issues_count(self) -> int:
+        return len(self.issues)
+
+    @property
+    def blocking_count(self) -> int:
+        return sum(1 for i in self.issues if i.blocking)
+
+    @property
+    def has_blocking(self) -> bool:
+        return self.blocking_count > 0
+
+    def to_dict(self) -> Dict[str, Any]:
+        return {
+            "chapter": self.chapter,
+            "issues": [i.to_dict() for i in self.issues],
+            "issues_count": self.issues_count,
+            "blocking_count": self.blocking_count,
+            "has_blocking": self.has_blocking,
+            "summary": self.summary,
+        }
+
+    def to_metrics_dict(self) -> Dict[str, Any]:
+        categories = sorted(set(i.category for i in self.issues))
+        return {
+            "chapter": self.chapter,
+            "issues_count": self.issues_count,
+            "blocking_count": self.blocking_count,
+            "categories": categories,
+            "timestamp": datetime.now().isoformat(timespec="seconds"),
+        }
+
+
+def parse_review_output(chapter: int, raw: Dict[str, Any]) -> ReviewResult:
+    issues = []
+    for item in raw.get("issues", []):
+        if not isinstance(item, dict):
+            continue
+        issues.append(ReviewIssue(
+            severity=str(item.get("severity", "medium")),
+            category=str(item.get("category", "other")),
+            location=str(item.get("location", "")),
+            description=str(item.get("description", "")),
+            evidence=str(item.get("evidence", "")),
+            fix_hint=str(item.get("fix_hint", "")),
+            blocking=item.get("blocking"),
+        ))
+    return ReviewResult(
+        chapter=chapter,
+        issues=issues,
+        summary=str(raw.get("summary", "")),
+    )

+ 108 - 0
webnovel-writer/scripts/data_modules/tests/test_review_schema.py

@@ -0,0 +1,108 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""审查 schema 测试"""
+import pytest
+from data_modules.review_schema import ReviewIssue, ReviewResult, parse_review_output
+
+
+def test_review_issue_blocking_defaults():
+    """critical severity 默认 blocking=True"""
+    issue = ReviewIssue(
+        severity="critical",
+        category="continuity",
+        location="第3段",
+        description="主角使用了已失去的能力",
+    )
+    assert issue.blocking is True
+
+
+def test_review_issue_non_critical_not_blocking():
+    """非 critical 默认 blocking=False"""
+    issue = ReviewIssue(
+        severity="high",
+        category="setting",
+        location="第7段",
+        description="时间线矛盾",
+    )
+    assert issue.blocking is False
+
+
+def test_review_result_counts():
+    """blocking_count 自动计算"""
+    result = ReviewResult(
+        chapter=10,
+        issues=[
+            ReviewIssue(severity="critical", category="continuity", location="p1", description="d1"),
+            ReviewIssue(severity="high", category="setting", location="p2", description="d2"),
+            ReviewIssue(severity="high", category="timeline", location="p3", description="d3", blocking=True),
+        ],
+        summary="测试",
+    )
+    assert result.blocking_count == 2
+    assert result.issues_count == 3
+    assert result.has_blocking is True
+
+
+def test_review_result_no_issues():
+    result = ReviewResult(chapter=10, issues=[], summary="无问题")
+    assert result.blocking_count == 0
+    assert result.has_blocking is False
+
+
+def test_review_result_to_dict_roundtrip():
+    result = ReviewResult(
+        chapter=10,
+        issues=[
+            ReviewIssue(severity="medium", category="ai_flavor", location="p5", description="AI味重",
+                        evidence="'稳住心神'出现3次", fix_hint="替换为具体动作描写"),
+        ],
+        summary="1个AI味问题",
+    )
+    d = result.to_dict()
+    assert d["chapter"] == 10
+    assert d["blocking_count"] == 0
+    assert len(d["issues"]) == 1
+    assert d["issues"][0]["category"] == "ai_flavor"
+    assert d["issues"][0]["fix_hint"] == "替换为具体动作描写"
+
+
+def test_parse_review_output_from_dict():
+    raw = {
+        "issues": [
+            {"severity": "critical", "category": "continuity", "location": "p1",
+             "description": "矛盾", "evidence": "证据", "fix_hint": "修复"},
+        ],
+        "summary": "1个严重问题",
+    }
+    result = parse_review_output(chapter=5, raw=raw)
+    assert result.chapter == 5
+    assert result.blocking_count == 1
+
+
+def test_parse_review_output_tolerates_missing_fields():
+    raw = {
+        "issues": [
+            {"severity": "low", "description": "小问题"},
+        ],
+        "summary": "轻微",
+    }
+    result = parse_review_output(chapter=1, raw=raw)
+    assert result.issues[0].category == "other"
+    assert result.issues[0].location == ""
+
+
+def test_review_result_to_metrics_dict():
+    result = ReviewResult(
+        chapter=10,
+        issues=[
+            ReviewIssue(severity="critical", category="continuity", location="p1", description="d1"),
+            ReviewIssue(severity="high", category="ai_flavor", location="p2", description="d2"),
+        ],
+        summary="测试",
+    )
+    metrics = result.to_metrics_dict()
+    assert metrics["chapter"] == 10
+    assert metrics["issues_count"] == 2
+    assert metrics["blocking_count"] == 1
+    assert "continuity" in metrics["categories"]
+    assert "ai_flavor" in metrics["categories"]

+ 0 - 225
webnovel-writer/scripts/data_modules/tests/test_workflow_manager.py

@@ -1,225 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-import json
-import logging
-import sys
-from pathlib import Path
-from types import SimpleNamespace
-
-
-def _load_module():
-    scripts_dir = Path(__file__).resolve().parents[2]
-    if str(scripts_dir) not in sys.path:
-        sys.path.insert(0, str(scripts_dir))
-    import workflow_manager
-
-    return workflow_manager
-
-
-def test_workflow_lifecycle_and_trace(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    webnovel_dir = tmp_path / ".webnovel"
-    webnovel_dir.mkdir(parents=True, exist_ok=True)
-
-    module.start_task("webnovel-write", {"chapter_num": 7})
-    module.start_step("Step 1", "Context")
-    module.complete_step("Step 1", json.dumps({"state_json_modified": True}, ensure_ascii=False))
-    module.complete_task(json.dumps({"review_completed": True}, ensure_ascii=False))
-
-    state = module.load_state()
-    assert state["current_task"] is None
-    assert state["history"][-1]["status"] == module.TASK_STATUS_COMPLETED
-    assert state["last_stable_state"]["artifacts"]["review_completed"] is True
-
-    trace_path = module.get_call_trace_path()
-    assert trace_path.exists()
-    lines = trace_path.read_text(encoding="utf-8").strip().splitlines()
-    events = [json.loads(line)["event"] for line in lines if line.strip()]
-    assert "task_started" in events
-    assert "step_started" in events
-    assert "step_completed" in events
-    assert "task_completed" in events
-
-
-def test_start_task_reentry_increments_retry(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    webnovel_dir = tmp_path / ".webnovel"
-    webnovel_dir.mkdir(parents=True, exist_ok=True)
-
-    module.start_task("webnovel-write", {"chapter_num": 8})
-    module.start_task("webnovel-write", {"chapter_num": 8})
-
-    state = module.load_state()
-    task = state["current_task"]
-    assert task is not None
-    assert task["status"] == module.TASK_STATUS_RUNNING
-    assert int(task.get("retry_count", 0)) >= 1
-
-
-def test_complete_step_rejects_mismatch_step_id(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    webnovel_dir = tmp_path / ".webnovel"
-    webnovel_dir.mkdir(parents=True, exist_ok=True)
-
-    module.start_task("webnovel-write", {"chapter_num": 9})
-    module.start_step("Step 2A", "Draft")
-    module.complete_step("Step 2B")
-
-    state = module.load_state()
-    current_step = state["current_task"]["current_step"]
-    assert current_step is not None
-    assert current_step["id"] == "Step 2A"
-    assert current_step["status"] == module.STEP_STATUS_RUNNING
-
-
-def test_workflow_step_owner_and_order_violation_trace(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    webnovel_dir = tmp_path / ".webnovel"
-    webnovel_dir.mkdir(parents=True, exist_ok=True)
-
-    assert module.expected_step_owner("webnovel-write", "Step 1") == "context-agent"
-    assert module.expected_step_owner("webnovel-write", "Step 5") == "data-agent"
-
-    module.start_task("webnovel-write", {"chapter_num": 12})
-    module.start_step("Step 3", "Review")
-
-    trace_path = module.get_call_trace_path()
-    lines = [json.loads(line) for line in trace_path.read_text(encoding="utf-8").splitlines() if line.strip()]
-    events = [row.get("event") for row in lines]
-    assert "step_order_violation" in events
-
-    step_started = [row for row in lines if row.get("event") == "step_started"]
-    assert step_started
-    assert step_started[-1].get("payload", {}).get("expected_owner") == "review-agents"
-
-
-def test_safe_append_call_trace_logs_failure(monkeypatch, caplog):
-    module = _load_module()
-
-    def _raise_trace_error(event, payload=None):
-        raise RuntimeError("trace failure")
-
-    monkeypatch.setattr(module, "append_call_trace", _raise_trace_error)
-
-    with caplog.at_level(logging.WARNING):
-        module.safe_append_call_trace("unit_test_event", {"ok": True})
-
-    message_text = "\n".join(record.getMessage() for record in caplog.records)
-    assert "failed to append call trace" in message_text
-    assert "unit_test_event" in message_text
-
-
-def test_get_workflow_paths_support_zero_arg_find_project_root(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "_cli_project_root", None)
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    assert module.get_workflow_state_path() == tmp_path / ".webnovel" / "workflow_state.json"
-    assert module.get_call_trace_path() == tmp_path / ".webnovel" / "observability" / "call_trace.jsonl"
-
-
-def test_workflow_reentry_does_not_duplicate_history(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    webnovel_dir = tmp_path / ".webnovel"
-    webnovel_dir.mkdir(parents=True, exist_ok=True)
-
-    module.start_task("webnovel-write", {"chapter_num": 20})
-    module.start_task("webnovel-write", {"chapter_num": 20})
-    module.start_task("webnovel-write", {"chapter_num": 20})
-
-    state = module.load_state()
-    assert isinstance(state.get("history"), list)
-    assert len(state.get("history")) == 0
-
-    task = state.get("current_task") or {}
-    assert int(task.get("retry_count", 0)) >= 2
-
-
-def test_cleanup_artifacts_requires_confirm(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    webnovel_dir = tmp_path / ".webnovel"
-    webnovel_dir.mkdir(parents=True, exist_ok=True)
-
-    draft_path = module.default_chapter_draft_path(tmp_path, 7)
-    draft_path.parent.mkdir(parents=True, exist_ok=True)
-    draft_path.write_text("draft", encoding="utf-8")
-
-    git_called = {"count": 0}
-
-    def _fake_run(*args, **kwargs):
-        git_called["count"] += 1
-        return SimpleNamespace(returncode=0, stderr="", stdout="")
-
-    monkeypatch.setattr(module.subprocess, "run", _fake_run)
-
-    preview = module.cleanup_artifacts(7, confirm=False)
-
-    assert draft_path.exists()
-    assert git_called["count"] == 0
-    assert any(item.startswith("[预览]") for item in preview)
-
-
-def test_cleanup_artifacts_confirm_deletes_with_backup(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    webnovel_dir = tmp_path / ".webnovel"
-    webnovel_dir.mkdir(parents=True, exist_ok=True)
-
-    draft_path = module.default_chapter_draft_path(tmp_path, 8)
-    draft_path.parent.mkdir(parents=True, exist_ok=True)
-    draft_path.write_text("draft", encoding="utf-8")
-
-    git_called = {"count": 0, "cmd": None}
-
-    def _fake_run(cmd, **kwargs):
-        git_called["count"] += 1
-        git_called["cmd"] = cmd
-        return SimpleNamespace(returncode=0, stderr="", stdout="")
-
-    monkeypatch.setattr(module.subprocess, "run", _fake_run)
-
-    cleaned = module.cleanup_artifacts(8, confirm=True)
-
-    assert not draft_path.exists()
-    assert git_called["count"] == 1
-    assert git_called["cmd"] == ["git", "reset", "HEAD", "."]
-    assert any("Git 暂存区已清理" in item for item in cleaned)
-
-    backup_dir = tmp_path / ".webnovel" / "recovery_backups"
-    backups = list(backup_dir.glob("ch0008-*"))
-    assert backups
-
-
-def test_workflow_step_zero_point_five_is_registered(tmp_path, monkeypatch):
-    module = _load_module()
-    monkeypatch.setattr(module, "find_project_root", lambda: tmp_path)
-
-    webnovel_dir = tmp_path / ".webnovel"
-    webnovel_dir.mkdir(parents=True, exist_ok=True)
-
-    assert module.expected_step_owner("webnovel-write", "Step 0.5") == "webnovel-write-skill"
-    assert module.get_pending_steps("webnovel-write")[0] == "Step 0.5"
-
-    module.start_task("webnovel-write", {"chapter_num": 15})
-    module.start_step("Step 0.5", "节点预检")
-    module.complete_step("Step 0.5")
-    module.start_step("Step 1", "Context")
-
-    trace_path = module.get_call_trace_path()
-    rows = [json.loads(line) for line in trace_path.read_text(encoding="utf-8").splitlines() if line.strip()]
-    violation_events = [row for row in rows if row.get("event") == "step_order_violation"]
-    assert not violation_events

+ 0 - 833
webnovel-writer/scripts/workflow_manager.py

@@ -1,833 +0,0 @@
-#!/usr/bin/env python3
-"""
-Workflow state manager
-- Track write/review task execution status
-- Detect interruption points
-- Provide recovery options
-- Emit call traces for observability
-"""
-
-from __future__ import annotations
-
-import json
-import logging
-import os
-import shutil
-import subprocess
-import sys
-from datetime import datetime
-from pathlib import Path
-from typing import Any, Dict, Optional
-
-from chapter_paths import default_chapter_draft_path, find_chapter_file
-from project_locator import resolve_project_root
-from runtime_compat import enable_windows_utf8_stdio, normalize_windows_path
-from security_utils import atomic_write_json, create_secure_directory
-
-
-logger = logging.getLogger(__name__)
-
-
-# UTF-8 output for Windows console (CLI run only, avoid pytest capture issues)
-if sys.platform == "win32" and __name__ == "__main__" and not os.environ.get("PYTEST_CURRENT_TEST"):
-    enable_windows_utf8_stdio(skip_in_pytest=True)
-
-
-TASK_STATUS_RUNNING = "running"
-TASK_STATUS_COMPLETED = "completed"
-TASK_STATUS_FAILED = "failed"
-
-STEP_STATUS_STARTED = "started"
-STEP_STATUS_RUNNING = "running"
-STEP_STATUS_COMPLETED = "completed"
-STEP_STATUS_FAILED = "failed"
-
-WRITE_WORKFLOW_STEPS = [
-    "Step 0.5",
-    "Step 1",
-    "Step 2A",
-    "Step 2B",
-    "Step 3",
-    "Step 4",
-    "Step 5",
-    "Step 6",
-]
-
-def now_iso() -> str:
-    return datetime.now().isoformat()
-
-
-def find_project_root(override: Optional[Path] = None) -> Path:
-    """Resolve project root (containing .webnovel/state.json).
-
-    Args:
-        override: If provided, use this path directly instead of auto-detecting.
-    """
-    if override is not None:
-        # 允许传入“工作区根目录”,统一解析到真正的 book project_root(必须包含 .webnovel/state.json)
-        return resolve_project_root(str(override))
-    return resolve_project_root()
-
-
-# Global variable to hold CLI-provided project root
-_cli_project_root: Optional[Path] = None
-
-
-def _get_active_project_root() -> Path:
-    """Resolve workflow paths while兼容测试中无参 monkeypatch。"""
-    if _cli_project_root is not None:
-        return find_project_root(_cli_project_root)
-    return find_project_root()
-
-
-def get_workflow_state_path() -> Path:
-    """Absolute path to workflow_state.json."""
-    project_root = _get_active_project_root()
-    return project_root / ".webnovel" / "workflow_state.json"
-
-
-def get_call_trace_path() -> Path:
-    project_root = _get_active_project_root()
-    return project_root / ".webnovel" / "observability" / "call_trace.jsonl"
-
-
-def append_call_trace(event: str, payload: Optional[Dict[str, Any]] = None):
-    """Append workflow call trace event (best effort)."""
-    payload = payload or {}
-    trace_path = get_call_trace_path()
-    create_secure_directory(str(trace_path.parent))
-    row = {
-        "timestamp": now_iso(),
-        "event": event,
-        "payload": payload,
-    }
-    with open(trace_path, "a", encoding="utf-8") as f:
-        f.write(json.dumps(row, ensure_ascii=False) + "\n")
-
-
-def safe_append_call_trace(event: str, payload: Optional[Dict[str, Any]] = None):
-    try:
-        append_call_trace(event, payload)
-    except Exception as exc:
-        logger.warning("failed to append call trace for event '%s': %s", event, exc)
-
-
-def expected_step_owner(command: str, step_id: str) -> str:
-    """Resolve expected caller owner by command + step id.
-
-    Returns concise owner tags to align with
-    `.claude/references/claude-code-call-matrix.md`.
-    """
-    if command == "webnovel-write":
-        mapping = {
-            "Step 0.5": "webnovel-write-skill",
-            "Step 1": "context-agent",
-            "Step 1.5": "webnovel-write-skill",
-            "Step 2A": "writer-draft",
-            "Step 2B": "style-adapter",
-            "Step 3": "review-agents",
-            "Step 4": "polish-agent",
-            "Step 5": "data-agent",
-            "Step 6": "backup-agent",
-        }
-        return mapping.get(step_id, "webnovel-write-skill")
-
-    if command == "webnovel-review":
-        return "webnovel-review-skill"
-
-    return "unknown"
-
-
-def step_allowed_before(command: str, step_id: str, completed_steps: list[Dict[str, Any]]) -> bool:
-    """Check simple ordering constraints by pending sequence."""
-    sequence = get_pending_steps(command)
-    if step_id not in sequence:
-        return True
-
-    expected_index = sequence.index(step_id)
-    completed_ids = [str(item.get("id")) for item in completed_steps]
-    required_before = sequence[:expected_index]
-    return all(prev in completed_ids for prev in required_before)
-
-
-def _new_task(command: str, args: Dict[str, Any]) -> Dict[str, Any]:
-    started_at = now_iso()
-    return {
-        "command": command,
-        "args": args,
-        "started_at": started_at,
-        "last_heartbeat": started_at,
-        "status": TASK_STATUS_RUNNING,
-        "current_step": None,
-        "completed_steps": [],
-        "failed_steps": [],
-        "pending_steps": get_pending_steps(command),
-        "retry_count": 0,
-        "artifacts": {
-            "chapter_file": {},
-            "git_status": {},
-            "state_json_modified": False,
-            "entities_appeared": False,
-            "review_completed": False,
-        },
-    }
-
-
-def _finalize_current_step_as_failed(task: Dict[str, Any], reason: str):
-    current_step = task.get("current_step")
-    if not current_step:
-        return
-    if current_step.get("status") in {STEP_STATUS_COMPLETED, STEP_STATUS_FAILED}:
-        return
-
-    current_step = dict(current_step)
-    current_step["status"] = STEP_STATUS_FAILED
-    current_step["failed_at"] = now_iso()
-    current_step["failure_reason"] = reason
-    task.setdefault("failed_steps", []).append(current_step)
-    task["current_step"] = None
-
-
-def _mark_task_failed(state: Dict[str, Any], reason: str):
-    task = state.get("current_task")
-    if not task:
-        return
-
-    _finalize_current_step_as_failed(task, reason=reason)
-    task["status"] = TASK_STATUS_FAILED
-    task["failed_at"] = now_iso()
-    task["failure_reason"] = reason
-
-
-def start_task(command, args):
-    """Start a new task."""
-    state = load_state()
-    current = state.get("current_task")
-
-    if current and current.get("status") == TASK_STATUS_RUNNING:
-        current["retry_count"] = int(current.get("retry_count", 0)) + 1
-        current["last_heartbeat"] = now_iso()
-        state["current_task"] = current
-        save_state(state)
-        safe_append_call_trace(
-            "task_reentered",
-            {
-                "command": current.get("command"),
-                "chapter": current.get("args", {}).get("chapter_num"),
-                "retry_count": current["retry_count"],
-            },
-        )
-        print(f"ℹ️ 任务已在运行,执行重入标记: {current.get('command')}")
-        return
-
-    state["current_task"] = _new_task(command, args)
-    save_state(state)
-    safe_append_call_trace("task_started", {"command": command, "args": args})
-    print(f"✅ 任务已启动: {command} {json.dumps(args, ensure_ascii=False)}")
-
-
-def start_step(step_id, step_name, progress_note=None):
-    """Mark step started."""
-    state = load_state()
-    task = state.get("current_task")
-    if not task:
-        print("⚠️ 无活动任务,请先使用 start-task")
-        return
-
-    command = str(task.get("command") or "")
-    if not step_allowed_before(command, step_id, task.get("completed_steps", [])):
-        safe_append_call_trace(
-            "step_order_violation",
-            {
-                "step_id": step_id,
-                "command": command,
-                "completed_steps": [row.get("id") for row in task.get("completed_steps", [])],
-            },
-        )
-
-    owner = expected_step_owner(command, step_id)
-
-    _finalize_current_step_as_failed(task, reason="step_replaced_before_completion")
-
-    started_at = now_iso()
-    task["current_step"] = {
-        "id": step_id,
-        "name": step_name,
-        "status": STEP_STATUS_STARTED,
-        "started_at": started_at,
-        "running_at": started_at,
-        "attempt": int(task.get("retry_count", 0)) + 1,
-        "progress_note": progress_note,
-    }
-    task["current_step"]["status"] = STEP_STATUS_RUNNING
-    task["status"] = TASK_STATUS_RUNNING
-    task["last_heartbeat"] = now_iso()
-
-    save_state(state)
-    safe_append_call_trace(
-        "step_started",
-        {
-            "step_id": step_id,
-            "step_name": step_name,
-            "command": task.get("command"),
-            "chapter": task.get("args", {}).get("chapter_num"),
-            "progress_note": progress_note,
-            "expected_owner": owner,
-        },
-    )
-    print(f"▶️ {step_id} 开始: {step_name}")
-
-
-def complete_step(step_id, artifacts_json=None):
-    """Mark step completed."""
-    state = load_state()
-    task = state.get("current_task")
-    if not task or not task.get("current_step"):
-        print("⚠️ 无活动 Step")
-        return
-
-    current_step = task["current_step"]
-    if current_step.get("id") != step_id:
-        print(f"⚠️ 当前 Step 为 {current_step.get('id')},与 {step_id} 不一致,拒绝完成")
-        safe_append_call_trace(
-            "step_complete_rejected",
-            {
-                "requested_step_id": step_id,
-                "active_step_id": current_step.get("id"),
-                "command": task.get("command"),
-            },
-        )
-        return
-
-    current_step["status"] = STEP_STATUS_COMPLETED
-    current_step["completed_at"] = now_iso()
-
-    if artifacts_json:
-        try:
-            artifacts = json.loads(artifacts_json)
-            current_step["artifacts"] = artifacts
-            task["artifacts"].update(artifacts)
-        except json.JSONDecodeError as exc:
-            print(f"⚠️ Artifacts JSON 解析失败: {exc}")
-
-    task["completed_steps"].append(current_step)
-    task["current_step"] = None
-    task["last_heartbeat"] = now_iso()
-
-    save_state(state)
-    safe_append_call_trace(
-        "step_completed",
-        {
-            "step_id": step_id,
-            "command": task.get("command"),
-            "chapter": task.get("args", {}).get("chapter_num"),
-        },
-    )
-    print(f"✅ {step_id} 完成")
-
-
-def complete_task(final_artifacts_json=None):
-    """Mark task completed."""
-    state = load_state()
-    task = state.get("current_task")
-    if not task:
-        print("⚠️ 无活动任务")
-        return
-
-    _finalize_current_step_as_failed(task, reason="task_completed_with_active_step")
-
-    task["status"] = TASK_STATUS_COMPLETED
-    task["completed_at"] = now_iso()
-
-    if final_artifacts_json:
-        try:
-            final_artifacts = json.loads(final_artifacts_json)
-            task["artifacts"].update(final_artifacts)
-        except json.JSONDecodeError as exc:
-            print(f"⚠️ Final artifacts JSON 解析失败: {exc}")
-
-    state["last_stable_state"] = extract_stable_state(task)
-    if "history" not in state:
-        state["history"] = []
-    state["history"].append(
-        {
-            "task_id": f"task_{len(state['history']) + 1:03d}",
-            "command": task["command"],
-            "chapter": task["args"].get("chapter_num"),
-            "status": TASK_STATUS_COMPLETED,
-            "completed_at": task["completed_at"],
-        }
-    )
-
-    state["current_task"] = None
-    save_state(state)
-    safe_append_call_trace(
-        "task_completed",
-        {
-            "command": task.get("command"),
-            "chapter": task.get("args", {}).get("chapter_num"),
-            "completed_steps": len(task.get("completed_steps", [])),
-            "failed_steps": len(task.get("failed_steps", [])),
-        },
-    )
-    print("🎀 任务完成")
-
-
-def detect_interruption():
-    """Detect interruption state."""
-    state = load_state()
-    if not state or "current_task" not in state or state["current_task"] is None:
-        return None
-
-    task = state["current_task"]
-    if task.get("status") == TASK_STATUS_COMPLETED:
-        return None
-
-    last_heartbeat = datetime.fromisoformat(task["last_heartbeat"])
-    elapsed = (datetime.now() - last_heartbeat).total_seconds()
-
-    interrupt_info = {
-        "command": task["command"],
-        "args": task["args"],
-        "task_status": task.get("status"),
-        "current_step": task.get("current_step"),
-        "completed_steps": task.get("completed_steps", []),
-        "failed_steps": task.get("failed_steps", []),
-        "elapsed_seconds": elapsed,
-        "artifacts": task.get("artifacts", {}),
-        "started_at": task.get("started_at"),
-        "retry_count": int(task.get("retry_count", 0)),
-    }
-
-    safe_append_call_trace(
-        "interruption_detected",
-        {
-            "command": task.get("command"),
-            "chapter": task.get("args", {}).get("chapter_num"),
-            "task_status": task.get("status"),
-            "current_step": (task.get("current_step") or {}).get("id"),
-            "elapsed_seconds": elapsed,
-        },
-    )
-    return interrupt_info
-
-
-def analyze_recovery_options(interrupt_info):
-    """Analyze recovery options based on interruption point."""
-    current_step = interrupt_info["current_step"]
-    command = interrupt_info["command"]
-    chapter_num = interrupt_info["args"].get("chapter_num", "?")
-
-    if not current_step:
-        return [
-            {
-                "option": "A",
-                "label": "从头开始",
-                "risk": "low",
-                "description": "重新执行完整流程",
-                "actions": [
-                    "删除 workflow_state.json 当前任务",
-                    f"执行 /{command} {chapter_num}",
-                ],
-            }
-        ]
-
-    step_id = current_step["id"]
-
-    if step_id in {"Step 0.5", "Step 1", "Step 1.5"}:
-        return [
-            {
-                "option": "A",
-                "label": "从 Step 1 重新开始",
-                "risk": "low",
-                "description": "重新加载上下文",
-                "actions": [
-                    "清理中断状态",
-                    f"执行 /{command} {chapter_num}",
-                ],
-            }
-        ]
-
-    if step_id in {"Step 2", "Step 2A", "Step 2B"}:
-        project_root = find_project_root()
-        existing_chapter = find_chapter_file(project_root, chapter_num)
-        draft_path = None
-        if existing_chapter:
-            chapter_path = str(existing_chapter.relative_to(project_root))
-        else:
-            draft_path = default_chapter_draft_path(project_root, chapter_num)
-            chapter_path = str(draft_path.relative_to(project_root))
-
-        options = [
-            {
-                "option": "A",
-                "label": "删除半成品,从 Step 1 重启",
-                "risk": "low",
-                "description": f"清理 {chapter_path},重新生成章节",
-                "actions": [
-                    f"删除 {chapter_path}(如存在)",
-                    "清理 Git 暂存区",
-                    "清理中断状态",
-                    f"执行 /{command} {chapter_num}",
-                ],
-            }
-        ]
-
-        candidate = existing_chapter or draft_path
-        if candidate and candidate.exists():
-            options.append(
-                {
-                    "option": "B",
-                    "label": "回滚到上一章",
-                    "risk": "medium",
-                    "description": "丢弃当前章节进度",
-                    "actions": [
-                        f"git reset --hard ch{(chapter_num - 1):04d}",
-                        "清理中断状态",
-                        f"重新决定是否继续 Ch{chapter_num}",
-                    ],
-                }
-            )
-        return options
-
-    if step_id == "Step 3":
-        return [
-            {
-                "option": "A",
-                "label": "重新执行审查",
-                "risk": "medium",
-                "description": "重新调用审查员并生成报告",
-                "actions": ["重新执行审查", "生成审查报告", "继续 Step 4 润色"],
-            },
-            {
-                "option": "B",
-                "label": "跳过审查直接润色",
-                "risk": "low",
-                "description": "后续可用 /webnovel-review 补审",
-                "actions": ["标记审查已跳过", "继续 Step 4 润色"],
-            },
-        ]
-
-    if step_id == "Step 4":
-        project_root = find_project_root()
-        existing_chapter = find_chapter_file(project_root, chapter_num)
-        draft_path = None
-        if existing_chapter:
-            chapter_path = str(existing_chapter.relative_to(project_root))
-        else:
-            draft_path = default_chapter_draft_path(project_root, chapter_num)
-            chapter_path = str(draft_path.relative_to(project_root))
-
-        return [
-            {
-                "option": "A",
-                "label": "继续润色",
-                "risk": "low",
-                "description": f"继续润色 {chapter_path},完成后进入 Step 5",
-                "actions": [f"打开并继续润色 {chapter_path}", "保存文件", "继续 Step 5(Data Agent)"],
-            },
-            {
-                "option": "B",
-                "label": "删除润色稿,从 Step 2A 重写",
-                "risk": "medium",
-                "description": f"删除 {chapter_path} 并重新生成章节内容",
-                "actions": [f"删除 {chapter_path}", "清理 Git 暂存区", "清理中断状态", f"执行 /{command} {chapter_num}"],
-            },
-        ]
-
-    if step_id == "Step 5":
-        return [
-            {
-                "option": "A",
-                "label": "从 Step 5 重新开始",
-                "risk": "low",
-                "description": "重新运行 Data Agent(幂等)",
-                "actions": ["重新调用 Data Agent", "继续 Step 6(Git 备份)"],
-            }
-        ]
-
-    if step_id == "Step 6":
-        return [
-            {
-                "option": "A",
-                "label": "继续 Git 提交",
-                "risk": "low",
-                "description": "完成未完成的 Git commit + tag",
-                "actions": ["检查 Git 暂存区", "重新执行 backup_manager.py", "继续 complete-task"],
-            },
-            {
-                "option": "B",
-                "label": "回滚 Git 改动",
-                "risk": "medium",
-                "description": "丢弃暂存区所有改动",
-                "actions": ["git reset HEAD .", f"删除第{chapter_num}章文件", "清理中断状态"],
-            },
-        ]
-
-    return [
-        {
-            "option": "A",
-            "label": "从头开始",
-            "risk": "low",
-            "description": "重新执行完整流程",
-            "actions": ["清理所有中断 artifacts", f"执行 /{command} {chapter_num}"],
-        }
-    ]
-
-
-def _backup_chapter_for_cleanup(project_root: Path, chapter_num: int, chapter_path: Path) -> Path:
-    """Backup chapter file before destructive cleanup."""
-    backup_dir = project_root / ".webnovel" / "recovery_backups"
-    create_secure_directory(str(backup_dir))
-
-    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
-    backup_name = f"ch{chapter_num:04d}-{chapter_path.name}.{timestamp}.bak"
-    backup_path = backup_dir / backup_name
-    shutil.copy2(chapter_path, backup_path)
-    return backup_path
-
-
-def cleanup_artifacts(chapter_num, *, confirm: bool = False):
-    """Cleanup partial artifacts."""
-    artifacts_cleaned = []
-    planned_actions = []
-
-    project_root = find_project_root()
-
-    chapter_path = find_chapter_file(project_root, chapter_num)
-    if chapter_path is None:
-        draft_path = default_chapter_draft_path(project_root, chapter_num)
-        if draft_path.exists():
-            chapter_path = draft_path
-
-    if chapter_path and chapter_path.exists():
-        planned_actions.append(f"删除章节文件: {chapter_path.relative_to(project_root)}")
-
-    planned_actions.append("重置 Git 暂存区: git reset HEAD .")
-
-    if not confirm:
-        preview_items = [f"[预览] {action}" for action in planned_actions]
-        safe_append_call_trace(
-            "artifacts_cleanup_preview",
-            {
-                "chapter": chapter_num,
-                "planned_actions": planned_actions,
-                "confirmed": False,
-            },
-        )
-        print("⚠️ 检测到高风险清理操作,当前仅预览。若确认执行,请追加 --confirm。")
-        return preview_items or ["[预览] 无可清理项"]
-
-    if chapter_path and chapter_path.exists():
-        try:
-            backup_path = _backup_chapter_for_cleanup(project_root, chapter_num, chapter_path)
-        except OSError as exc:
-            error_msg = f"❌ 章节备份失败,已取消删除: {exc}"
-            safe_append_call_trace(
-                "artifacts_cleanup_backup_failed",
-                {
-                    "chapter": chapter_num,
-                    "chapter_file": str(chapter_path),
-                    "error": str(exc),
-                },
-            )
-            return [error_msg]
-
-        chapter_path.unlink()
-        artifacts_cleaned.append(str(chapter_path.relative_to(project_root)))
-        artifacts_cleaned.append(f"章节备份已保存: {backup_path.relative_to(project_root)}")
-
-    result = subprocess.run(["git", "reset", "HEAD", "."], cwd=project_root, capture_output=True, text=True)
-    if result.returncode == 0:
-        artifacts_cleaned.append("Git 暂存区已清理(project)")
-    else:
-        git_error = (result.stderr or "").strip() or "unknown error"
-        artifacts_cleaned.append(f"⚠️ Git 暂存区清理失败: {git_error}")
-
-    safe_append_call_trace(
-        "artifacts_cleaned",
-        {
-            "chapter": chapter_num,
-            "items": artifacts_cleaned,
-            "planned_actions": planned_actions,
-            "confirmed": True,
-            "git_reset_ok": result.returncode == 0,
-        },
-    )
-    return artifacts_cleaned or ["无可清理项"]
-
-
-def clear_current_task():
-    """Clear interrupted current task."""
-    state = load_state()
-    task = state.get("current_task")
-    if task:
-        safe_append_call_trace(
-            "task_cleared",
-            {
-                "command": task.get("command"),
-                "chapter": task.get("args", {}).get("chapter_num"),
-                "status": task.get("status"),
-            },
-        )
-        state["current_task"] = None
-        save_state(state)
-        print("✅ 中断任务已清除")
-    else:
-        print("⚠️ 无中断任务")
-
-
-def fail_current_task(reason: str = "manual_fail"):
-    """Mark current task as failed and keep state for diagnostics."""
-    state = load_state()
-    task = state.get("current_task")
-    if not task:
-        print("⚠️ 无活动任务")
-        return
-
-    _mark_task_failed(state, reason=reason)
-    save_state(state)
-    safe_append_call_trace(
-        "task_failed",
-        {
-            "command": task.get("command"),
-            "chapter": task.get("args", {}).get("chapter_num"),
-            "reason": reason,
-        },
-    )
-    print(f"⚠️ 任务已标记失败: {reason}")
-
-
-def load_state():
-    """Load workflow state."""
-    state_file = get_workflow_state_path()
-    if not state_file.exists():
-        return {"current_task": None, "last_stable_state": None, "history": []}
-    with open(state_file, "r", encoding="utf-8") as f:
-        state = json.load(f)
-
-    state.setdefault("current_task", None)
-    state.setdefault("last_stable_state", None)
-    state.setdefault("history", [])
-    if state.get("current_task"):
-        state["current_task"].setdefault("failed_steps", [])
-        state["current_task"].setdefault("retry_count", 0)
-    return state
-
-
-def save_state(state):
-    """Save workflow state atomically."""
-    state_file = get_workflow_state_path()
-    create_secure_directory(str(state_file.parent))
-    atomic_write_json(state_file, state, use_lock=True, backup=False)
-
-
-def get_pending_steps(command):
-    """Get command pending step list."""
-    if command == "webnovel-write":
-        # v2: Step 1 内置 Contract v2,不再单独记录 Step 1.5,避免产生 step_order_violation 噪声。
-        return list(WRITE_WORKFLOW_STEPS)
-    if command == "webnovel-review":
-        return ["Step 1", "Step 2", "Step 3", "Step 4", "Step 5", "Step 6", "Step 7", "Step 8"]
-    return []
-
-
-def extract_stable_state(task):
-    """Extract stable state snapshot."""
-    return {
-        "command": task["command"],
-        "chapter_num": task["args"].get("chapter_num"),
-        "completed_at": task.get("completed_at"),
-        "artifacts": task.get("artifacts", {}),
-    }
-
-
-if __name__ == "__main__":
-    import argparse
-
-    parser = argparse.ArgumentParser(description="工作流状态管理")
-    parser.add_argument(
-        "--project-root",
-        dest="global_project_root",
-        help="项目根目录(可选,默认自动检测)",
-    )
-    subparsers = parser.add_subparsers(dest="action", help="操作类型")
-
-    def add_project_root_arg(subparser):
-        """Allow --project-root after subcommand for compatibility."""
-        subparser.add_argument("--project-root", help="项目根目录(可选,默认自动检测)")
-
-    p_start_task = subparsers.add_parser("start-task", help="开始新任务")
-    add_project_root_arg(p_start_task)
-    p_start_task.add_argument("--command", required=True, help="命令名称")
-    p_start_task.add_argument("--chapter", type=int, help="章节号")
-
-    p_start_step = subparsers.add_parser("start-step", help="开始 Step")
-    add_project_root_arg(p_start_step)
-    p_start_step.add_argument("--step-id", required=True, help="Step ID")
-    p_start_step.add_argument("--step-name", required=True, help="Step 名称")
-    p_start_step.add_argument("--note", help="进度备注")
-
-    p_complete_step = subparsers.add_parser("complete-step", help="完成 Step")
-    add_project_root_arg(p_complete_step)
-    p_complete_step.add_argument("--step-id", required=True, help="Step ID")
-    p_complete_step.add_argument("--artifacts", help="Artifacts JSON")
-
-    p_complete_task = subparsers.add_parser("complete-task", help="完成任务")
-    add_project_root_arg(p_complete_task)
-    p_complete_task.add_argument("--artifacts", help="Final artifacts JSON")
-
-    p_fail_task = subparsers.add_parser("fail-task", help="标记任务失败")
-    add_project_root_arg(p_fail_task)
-    p_fail_task.add_argument("--reason", default="manual_fail", help="失败原因")
-
-    p_detect = subparsers.add_parser("detect", help="检测中断")
-    add_project_root_arg(p_detect)
-
-    p_cleanup = subparsers.add_parser("cleanup", help="清理 artifacts")
-    add_project_root_arg(p_cleanup)
-    p_cleanup.add_argument("--chapter", type=int, required=True, help="章节号")
-    p_cleanup.add_argument("--confirm", action="store_true", help="确认执行删除与 Git 重置(高风险)")
-
-    p_clear = subparsers.add_parser("clear", help="清除中断任务")
-    add_project_root_arg(p_clear)
-
-    args = parser.parse_args()
-
-    # Set global project root if provided (support both before/after subcommand).
-    project_root_arg = getattr(args, "project_root", None) or getattr(args, "global_project_root", None)
-    if project_root_arg:
-        _cli_project_root = normalize_windows_path(project_root_arg)
-
-    if args.action == "start-task":
-        start_task(args.command, {"chapter_num": args.chapter})
-    elif args.action == "start-step":
-        start_step(args.step_id, args.step_name, args.note)
-    elif args.action == "complete-step":
-        complete_step(args.step_id, args.artifacts)
-    elif args.action == "complete-task":
-        complete_task(args.artifacts)
-    elif args.action == "fail-task":
-        fail_current_task(args.reason)
-    elif args.action == "detect":
-        interrupt = detect_interruption()
-        if interrupt:
-            print("\n🔶 检测到中断任务:")
-            print(json.dumps(interrupt, ensure_ascii=False, indent=2))
-            print("\n📕 恢复选项:")
-            options = analyze_recovery_options(interrupt)
-            print(json.dumps(options, ensure_ascii=False, indent=2))
-        else:
-            print("✅ 无中断任务")
-    elif args.action == "cleanup":
-        cleaned = cleanup_artifacts(args.chapter, confirm=args.confirm)
-        if args.confirm:
-            print(f"✅ 已清理: {', '.join(cleaned)}")
-        else:
-            for item in cleaned:
-                print(item)
-            print("⚠️ 以上为预览,未执行实际清理。")
-    elif args.action == "clear":
-        clear_current_task()
-    else:
-        parser.print_help()

+ 0 - 98
webnovel-writer/skills/webnovel-resume/SKILL.md

@@ -1,98 +0,0 @@
----
-name: webnovel-resume
-description: 检测中断点并按安全策略恢复小说工作流。
-allowed-tools: Read Bash AskUserQuestion
----
-
-# Task Resume Skill
-
-## 目标
-
-- 检测真实中断点,禁止凭感觉续写。
-- 让用户基于清晰风险选择恢复策略。
-- 恢复时只做最小清理,不擅自扩写半成品。
-
-## 执行流程
-
-### Step 1:解析项目根目录并加载恢复协议
-
-```bash
-export WORKSPACE_ROOT="${CLAUDE_PROJECT_DIR:-$PWD}"
-export SKILL_ROOT="${CLAUDE_PLUGIN_ROOT}/skills/webnovel-resume"
-export SCRIPTS_DIR="${CLAUDE_PLUGIN_ROOT}/scripts"
-export PROJECT_ROOT="$(python "${SCRIPTS_DIR}/webnovel.py" --project-root "${WORKSPACE_ROOT}" where)"
-cat "${SKILL_ROOT}/references/workflow-resume.md"
-```
-
-核心原则:
-- 禁止智能续写半成品
-- 必须先检测再恢复
-- 必须用户确认后执行
-
-### Step 2:按需加载数据规范
-
-```bash
-cat "${SKILL_ROOT}/references/system-data-flow.md"
-```
-
-要求:
-- 仅在需要核对状态字段、恢复策略或数据一致性时加载
-
-### Step 3:确认上下文充足
-
-必须确认:
-- 已理解恢复协议
-- 已理解状态结构
-- 已明确“删除重来”优先于“智能续写”
-
-### Step 4:检测中断状态
-
-```bash
-python "${SCRIPTS_DIR}/webnovel.py" --project-root "$PROJECT_ROOT" workflow detect
-```
-
-结果处理:
-- 无中断:直接结束并通知用户
-- 有中断:进入 Step 5
-
-### Step 5:展示恢复选项并让用户决策
-
-必须展示:
-- 原任务命令和参数
-- 中断时间与已过时长
-- 已完成步骤
-- 当前中断步骤
-- 剩余步骤
-- 恢复选项与风险说明
-
-### Step 6:执行恢复操作
-
-选项 A:删除半成品并清理工作流状态
-
-```bash
-python "${SCRIPTS_DIR}/webnovel.py" --project-root "$PROJECT_ROOT" workflow cleanup --chapter {N} --confirm
-python "${SCRIPTS_DIR}/webnovel.py" --project-root "$PROJECT_ROOT" workflow clear
-```
-
-选项 B:按既有版本回退,再清理工作流状态
-
-```bash
-git -C "$PROJECT_ROOT" rev-parse --verify "ch{N-1:04d}"
-git -C "$PROJECT_ROOT" switch --detach "ch{N-1:04d}"
-python "${SCRIPTS_DIR}/webnovel.py" --project-root "$PROJECT_ROOT" workflow clear
-```
-
-说明:
-- `workflow cleanup --confirm` 与 Git 回退都属于高风险操作,执行前必须获得用户明确确认。
-- 若用户只是要保留现场排查问题,不应执行上述清理或回退。
-
-### Step 7:按用户意愿继续任务
-
-若用户要求立即继续,则执行原始命令;若未要求,则仅完成恢复并结束。
-
-## 禁止事项
-
-- 禁止智能续写半成品
-- 禁止自动替用户选择恢复策略
-- 禁止跳过中断检测
-- 禁止在未验证前修复 `state.json`

+ 0 - 146
webnovel-writer/skills/webnovel-resume/references/workflow-resume.md

@@ -1,146 +0,0 @@
----
-name: workflow-resume
-purpose: 任务恢复时加载,指导中断恢复流程
----
-
-<context>
-此文件用于中断任务恢复。Claude 已知通用错误处理流程,这里只补充网文创作工作流的步骤难度分级和恢复策略。
-</context>
-
-<instructions>
-
-## Step 中断难度分级
-
-| Step | 名称 | 影响 | 难度 | 默认策略 |
-|------|------|------|------|----------|
-| Step 1 | Context Agent | 无副作用(仅读取) | ⭐ | 直接重新执行 |
-| Step 2A | 生成粗稿 | 半成品章节文件 | ⭐⭐ | 删除半成品,从 Step 1 重新开始 |
-| Step 2B | 风格适配 | 部分改写内容 | ⭐⭐ | 继续适配或回到 Step 2A |
-| Step 3 | 审查 | 审查未完成 | ⭐⭐⭐ | 用户决定:重审或跳过 |
-| Step 4 | 网文化润色 | 部分润色的文件 | ⭐⭐ | 继续润色或删除重写 |
-| Step 5 | Data Agent | 实体、摘要、长期记忆未写完 | ⭐⭐ | 重新运行(幂等) |
-| Step 6 | Git 备份 | 未提交 | ⭐⭐⭐ | 检查暂存区,决定提交或保留现场 |
-
-## 恢复流程
-
-### Step 1:检测中断状态
-
-```bash
-python "${SCRIPTS_DIR}/webnovel.py" --project-root "$PROJECT_ROOT" workflow detect
-```
-
-### Step 2:询问用户
-
-必须展示:
-- 任务命令和参数
-- 中断时间和位置
-- 已完成步骤
-- 恢复选项和风险等级
-
-### Step 3:执行恢复
-
-选项 A:删除半成品后重新开始
-
-```bash
-python "${SCRIPTS_DIR}/webnovel.py" --project-root "$PROJECT_ROOT" workflow cleanup --chapter {N} --confirm
-python "${SCRIPTS_DIR}/webnovel.py" --project-root "$PROJECT_ROOT" workflow clear
-/webnovel-write {N}
-```
-
-选项 B:切到上一稳定标签,再清理中断状态
-
-```bash
-git -C "$PROJECT_ROOT" rev-parse --verify "ch{N-1:04d}"
-git -C "$PROJECT_ROOT" switch --detach "ch{N-1:04d}"
-python "${SCRIPTS_DIR}/webnovel.py" --project-root "$PROJECT_ROOT" workflow clear
-```
-
-硬规则:
-- `workflow cleanup --confirm` 与 Git 切换都属于高风险操作,执行前必须获得用户明确确认。
-- 若用户要保留现场排查问题,禁止执行清理或回退。
-
-## 为什么删除而不续写
-
-1. 半成品可能包含未完成句子、逻辑断裂。
-2. 新会话缺失创作时的隐含上下文。
-3. 直接续写更容易引入前后矛盾。
-4. 重新生成的成本通常低于修半成品加重审。
-
-## 特殊场景
-
-| 场景 | 策略 |
-|------|------|
-| 超过 24 小时 | 推荐重新开始 |
-| 超过 1 小时 | 默认删除半成品后重来 |
-| 多个半成品 | 删除所有半成品,从稳定状态重新开始 |
-| workflow_state.json 不存在 | 启发式检测 `state.json` 与正文文件 |
-
-</instructions>
-
-<examples>
-
-<example>
-<input>Step 2A 中断(最常见)</input>
-<output>
-检测到中断任务:
-
-任务:/webnovel-write 7
-中断位置:Step 2A - 章节内容生成中
-
-已完成:
-  ✅ Step 1: Context Agent 搜集上下文
-
-未完成:
-  ⏸️ Step 2A: 生成章节内容(已写1500字)
-  ⏹️ Step 2B-6: 未开始
-
-恢复选项:
-A) 删除半成品,从 Step 1 重新开始(推荐)
-B) 切到 Ch6 稳定标签,放弃 Ch7 当前进度
-
-请选择(A/B):
-</output>
-</example>
-
-<example>
-<input>Step 3 中断(审查)</input>
-<output>
-恢复选项:
-A) 重新执行审查
-   - 调用审查员
-   - 生成审查报告
-   - 继续 Step 4 润色
-
-B) 跳过审查,直接润色
-   - 不生成审查报告
-   - 可后续用 /webnovel-review 补审
-
-请选择(A/B):
-</output>
-</example>
-
-<example>
-<input>Step 4 中断(润色)</input>
-<output>
-恢复选项:
-A) 继续润色
-   - 打开并继续润色实际章节文件(优先 `正文/第0007章-章节标题.md`)
-   - 保存文件
-   - 继续 Step 5(Data Agent)
-
-B) 删除润色稿,从 Step 2A 重写
-   - 删除实际章节文件(优先 `正文/第0007章-章节标题.md`)
-   - 重新生成章节内容
-
-请选择(A/B):
-</output>
-</example>
-
-</examples>
-
-<errors>
-❌ 智能续写半成品 → ✅ 删除后重新生成
-❌ 自动决定恢复策略 → ✅ 必须用户确认
-❌ 跳过中断检测 → ✅ 先运行 workflow detect
-❌ 不确认就做高风险清理 → ✅ 先获得用户明确确认
-</errors>