workflow_manager.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. #!/usr/bin/env python3
  2. """
  3. Workflow state manager
  4. - Track write/review task execution status
  5. - Detect interruption points
  6. - Provide recovery options
  7. - Emit call traces for observability
  8. """
  9. from __future__ import annotations
  10. import json
  11. import os
  12. import subprocess
  13. import sys
  14. from datetime import datetime
  15. from pathlib import Path
  16. from typing import Any, Dict, Optional
  17. from chapter_paths import default_chapter_draft_path, find_chapter_file
  18. from project_locator import resolve_project_root
  19. from security_utils import atomic_write_json, create_secure_directory
  20. # UTF-8 output for Windows console (CLI run only, avoid pytest capture issues)
  21. if sys.platform == "win32" and __name__ == "__main__" and not os.environ.get("PYTEST_CURRENT_TEST"):
  22. import io
  23. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
  24. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
  25. TASK_STATUS_RUNNING = "running"
  26. TASK_STATUS_COMPLETED = "completed"
  27. TASK_STATUS_FAILED = "failed"
  28. STEP_STATUS_STARTED = "started"
  29. STEP_STATUS_RUNNING = "running"
  30. STEP_STATUS_COMPLETED = "completed"
  31. STEP_STATUS_FAILED = "failed"
  32. def now_iso() -> str:
  33. return datetime.now().isoformat()
  34. def find_project_root() -> Path:
  35. """Resolve project root (containing .webnovel/state.json)."""
  36. return resolve_project_root()
  37. def get_workflow_state_path() -> Path:
  38. """Absolute path to workflow_state.json."""
  39. project_root = find_project_root()
  40. return project_root / ".webnovel" / "workflow_state.json"
  41. def get_call_trace_path() -> Path:
  42. project_root = find_project_root()
  43. return project_root / ".webnovel" / "observability" / "call_trace.jsonl"
  44. def append_call_trace(event: str, payload: Optional[Dict[str, Any]] = None):
  45. """Append workflow call trace event (best effort)."""
  46. payload = payload or {}
  47. trace_path = get_call_trace_path()
  48. create_secure_directory(str(trace_path.parent))
  49. row = {
  50. "timestamp": now_iso(),
  51. "event": event,
  52. "payload": payload,
  53. }
  54. with open(trace_path, "a", encoding="utf-8") as f:
  55. f.write(json.dumps(row, ensure_ascii=False) + "\n")
  56. def safe_append_call_trace(event: str, payload: Optional[Dict[str, Any]] = None):
  57. try:
  58. append_call_trace(event, payload)
  59. except Exception:
  60. pass
  61. def _new_task(command: str, args: Dict[str, Any]) -> Dict[str, Any]:
  62. started_at = now_iso()
  63. return {
  64. "command": command,
  65. "args": args,
  66. "started_at": started_at,
  67. "last_heartbeat": started_at,
  68. "status": TASK_STATUS_RUNNING,
  69. "current_step": None,
  70. "completed_steps": [],
  71. "failed_steps": [],
  72. "pending_steps": get_pending_steps(command),
  73. "retry_count": 0,
  74. "artifacts": {
  75. "chapter_file": {},
  76. "git_status": {},
  77. "state_json_modified": False,
  78. "entities_appeared": False,
  79. "review_completed": False,
  80. },
  81. }
  82. def _finalize_current_step_as_failed(task: Dict[str, Any], reason: str):
  83. current_step = task.get("current_step")
  84. if not current_step:
  85. return
  86. if current_step.get("status") in {STEP_STATUS_COMPLETED, STEP_STATUS_FAILED}:
  87. return
  88. current_step = dict(current_step)
  89. current_step["status"] = STEP_STATUS_FAILED
  90. current_step["failed_at"] = now_iso()
  91. current_step["failure_reason"] = reason
  92. task.setdefault("failed_steps", []).append(current_step)
  93. task["current_step"] = None
  94. def _mark_task_failed(state: Dict[str, Any], reason: str):
  95. task = state.get("current_task")
  96. if not task:
  97. return
  98. _finalize_current_step_as_failed(task, reason=reason)
  99. task["status"] = TASK_STATUS_FAILED
  100. task["failed_at"] = now_iso()
  101. task["failure_reason"] = reason
  102. def start_task(command, args):
  103. """Start a new task."""
  104. state = load_state()
  105. current = state.get("current_task")
  106. if current and current.get("status") == TASK_STATUS_RUNNING:
  107. current["retry_count"] = int(current.get("retry_count", 0)) + 1
  108. current["last_heartbeat"] = now_iso()
  109. state["current_task"] = current
  110. save_state(state)
  111. safe_append_call_trace(
  112. "task_reentered",
  113. {
  114. "command": current.get("command"),
  115. "chapter": current.get("args", {}).get("chapter_num"),
  116. "retry_count": current["retry_count"],
  117. },
  118. )
  119. print(f"ℹ️ 任务已在运行,执行重入标记: {current.get('command')}")
  120. return
  121. state["current_task"] = _new_task(command, args)
  122. save_state(state)
  123. safe_append_call_trace("task_started", {"command": command, "args": args})
  124. print(f"✅ 任务已启动: {command} {json.dumps(args, ensure_ascii=False)}")
  125. def start_step(step_id, step_name, progress_note=None):
  126. """Mark step started."""
  127. state = load_state()
  128. task = state.get("current_task")
  129. if not task:
  130. print("⚠️ 无活动任务,请先使用 start-task")
  131. return
  132. _finalize_current_step_as_failed(task, reason="step_replaced_before_completion")
  133. started_at = now_iso()
  134. task["current_step"] = {
  135. "id": step_id,
  136. "name": step_name,
  137. "status": STEP_STATUS_STARTED,
  138. "started_at": started_at,
  139. "running_at": started_at,
  140. "attempt": int(task.get("retry_count", 0)) + 1,
  141. "progress_note": progress_note,
  142. }
  143. task["current_step"]["status"] = STEP_STATUS_RUNNING
  144. task["status"] = TASK_STATUS_RUNNING
  145. task["last_heartbeat"] = now_iso()
  146. save_state(state)
  147. safe_append_call_trace(
  148. "step_started",
  149. {
  150. "step_id": step_id,
  151. "step_name": step_name,
  152. "command": task.get("command"),
  153. "chapter": task.get("args", {}).get("chapter_num"),
  154. "progress_note": progress_note,
  155. },
  156. )
  157. print(f"▶️ {step_id} 开始: {step_name}")
  158. def complete_step(step_id, artifacts_json=None):
  159. """Mark step completed."""
  160. state = load_state()
  161. task = state.get("current_task")
  162. if not task or not task.get("current_step"):
  163. print("⚠️ 无活动 Step")
  164. return
  165. current_step = task["current_step"]
  166. if current_step.get("id") != step_id:
  167. print(f"⚠️ 当前 Step 为 {current_step.get('id')},与 {step_id} 不一致,拒绝完成")
  168. safe_append_call_trace(
  169. "step_complete_rejected",
  170. {
  171. "requested_step_id": step_id,
  172. "active_step_id": current_step.get("id"),
  173. "command": task.get("command"),
  174. },
  175. )
  176. return
  177. current_step["status"] = STEP_STATUS_COMPLETED
  178. current_step["completed_at"] = now_iso()
  179. if artifacts_json:
  180. try:
  181. artifacts = json.loads(artifacts_json)
  182. current_step["artifacts"] = artifacts
  183. task["artifacts"].update(artifacts)
  184. except json.JSONDecodeError as exc:
  185. print(f"⚠️ Artifacts JSON 解析失败: {exc}")
  186. task["completed_steps"].append(current_step)
  187. task["current_step"] = None
  188. task["last_heartbeat"] = now_iso()
  189. save_state(state)
  190. safe_append_call_trace(
  191. "step_completed",
  192. {
  193. "step_id": step_id,
  194. "command": task.get("command"),
  195. "chapter": task.get("args", {}).get("chapter_num"),
  196. },
  197. )
  198. print(f"✅ {step_id} 完成")
  199. def complete_task(final_artifacts_json=None):
  200. """Mark task completed."""
  201. state = load_state()
  202. task = state.get("current_task")
  203. if not task:
  204. print("⚠️ 无活动任务")
  205. return
  206. _finalize_current_step_as_failed(task, reason="task_completed_with_active_step")
  207. task["status"] = TASK_STATUS_COMPLETED
  208. task["completed_at"] = now_iso()
  209. if final_artifacts_json:
  210. try:
  211. final_artifacts = json.loads(final_artifacts_json)
  212. task["artifacts"].update(final_artifacts)
  213. except json.JSONDecodeError as exc:
  214. print(f"⚠️ Final artifacts JSON 解析失败: {exc}")
  215. state["last_stable_state"] = extract_stable_state(task)
  216. if "history" not in state:
  217. state["history"] = []
  218. state["history"].append(
  219. {
  220. "task_id": f"task_{len(state['history']) + 1:03d}",
  221. "command": task["command"],
  222. "chapter": task["args"].get("chapter_num"),
  223. "status": TASK_STATUS_COMPLETED,
  224. "completed_at": task["completed_at"],
  225. }
  226. )
  227. state["current_task"] = None
  228. save_state(state)
  229. safe_append_call_trace(
  230. "task_completed",
  231. {
  232. "command": task.get("command"),
  233. "chapter": task.get("args", {}).get("chapter_num"),
  234. "completed_steps": len(task.get("completed_steps", [])),
  235. "failed_steps": len(task.get("failed_steps", [])),
  236. },
  237. )
  238. print("🎀 任务完成")
  239. def detect_interruption():
  240. """Detect interruption state."""
  241. state = load_state()
  242. if not state or "current_task" not in state or state["current_task"] is None:
  243. return None
  244. task = state["current_task"]
  245. if task.get("status") == TASK_STATUS_COMPLETED:
  246. return None
  247. last_heartbeat = datetime.fromisoformat(task["last_heartbeat"])
  248. elapsed = (datetime.now() - last_heartbeat).total_seconds()
  249. interrupt_info = {
  250. "command": task["command"],
  251. "args": task["args"],
  252. "task_status": task.get("status"),
  253. "current_step": task.get("current_step"),
  254. "completed_steps": task.get("completed_steps", []),
  255. "failed_steps": task.get("failed_steps", []),
  256. "elapsed_seconds": elapsed,
  257. "artifacts": task.get("artifacts", {}),
  258. "started_at": task.get("started_at"),
  259. "retry_count": int(task.get("retry_count", 0)),
  260. }
  261. safe_append_call_trace(
  262. "interruption_detected",
  263. {
  264. "command": task.get("command"),
  265. "chapter": task.get("args", {}).get("chapter_num"),
  266. "task_status": task.get("status"),
  267. "current_step": (task.get("current_step") or {}).get("id"),
  268. "elapsed_seconds": elapsed,
  269. },
  270. )
  271. return interrupt_info
  272. def analyze_recovery_options(interrupt_info):
  273. """Analyze recovery options based on interruption point."""
  274. current_step = interrupt_info["current_step"]
  275. command = interrupt_info["command"]
  276. chapter_num = interrupt_info["args"].get("chapter_num", "?")
  277. if not current_step:
  278. return [
  279. {
  280. "option": "A",
  281. "label": "从头开始",
  282. "risk": "low",
  283. "description": "重新执行完整流程",
  284. "actions": [
  285. "删除 workflow_state.json 当前任务",
  286. f"执行 /{command} {chapter_num}",
  287. ],
  288. }
  289. ]
  290. step_id = current_step["id"]
  291. if step_id in {"Step 1", "Step 1.5"}:
  292. return [
  293. {
  294. "option": "A",
  295. "label": "从 Step 1 重新开始",
  296. "risk": "low",
  297. "description": "重新加载上下文",
  298. "actions": [
  299. "清理中断状态",
  300. f"执行 /{command} {chapter_num}",
  301. ],
  302. }
  303. ]
  304. if step_id in {"Step 2", "Step 2A", "Step 2B"}:
  305. project_root = find_project_root()
  306. existing_chapter = find_chapter_file(project_root, chapter_num)
  307. draft_path = None
  308. if existing_chapter:
  309. chapter_path = str(existing_chapter.relative_to(project_root))
  310. else:
  311. draft_path = default_chapter_draft_path(project_root, chapter_num)
  312. chapter_path = str(draft_path.relative_to(project_root))
  313. options = [
  314. {
  315. "option": "A",
  316. "label": "删除半成品,从 Step 1 重启",
  317. "risk": "low",
  318. "description": f"清理 {chapter_path},重新生成章节",
  319. "actions": [
  320. f"删除 {chapter_path}(如存在)",
  321. "清理 Git 暂存区",
  322. "清理中断状态",
  323. f"执行 /{command} {chapter_num}",
  324. ],
  325. }
  326. ]
  327. candidate = existing_chapter or draft_path
  328. if candidate and candidate.exists():
  329. options.append(
  330. {
  331. "option": "B",
  332. "label": "回滚到上一章",
  333. "risk": "medium",
  334. "description": "丢弃当前章节进度",
  335. "actions": [
  336. f"git reset --hard ch{(chapter_num - 1):04d}",
  337. "清理中断状态",
  338. f"重新决定是否继续 Ch{chapter_num}",
  339. ],
  340. }
  341. )
  342. return options
  343. if step_id == "Step 3":
  344. return [
  345. {
  346. "option": "A",
  347. "label": "重新执行审查",
  348. "risk": "medium",
  349. "description": "重新调用审查员并生成报告",
  350. "actions": ["重新执行审查", "生成审查报告", "继续 Step 4 润色"],
  351. },
  352. {
  353. "option": "B",
  354. "label": "跳过审查直接润色",
  355. "risk": "low",
  356. "description": "后续可用 /webnovel-review 补审",
  357. "actions": ["标记审查已跳过", "继续 Step 4 润色"],
  358. },
  359. ]
  360. if step_id == "Step 4":
  361. project_root = find_project_root()
  362. existing_chapter = find_chapter_file(project_root, chapter_num)
  363. draft_path = None
  364. if existing_chapter:
  365. chapter_path = str(existing_chapter.relative_to(project_root))
  366. else:
  367. draft_path = default_chapter_draft_path(project_root, chapter_num)
  368. chapter_path = str(draft_path.relative_to(project_root))
  369. return [
  370. {
  371. "option": "A",
  372. "label": "继续润色",
  373. "risk": "low",
  374. "description": f"继续润色 {chapter_path},完成后进入 Step 5",
  375. "actions": [f"打开并继续润色 {chapter_path}", "保存文件", "继续 Step 5(Data Agent)"],
  376. },
  377. {
  378. "option": "B",
  379. "label": "删除润色稿,从 Step 2A 重写",
  380. "risk": "medium",
  381. "description": f"删除 {chapter_path} 并重新生成章节内容",
  382. "actions": [f"删除 {chapter_path}", "清理 Git 暂存区", "清理中断状态", f"执行 /{command} {chapter_num}"],
  383. },
  384. ]
  385. if step_id == "Step 5":
  386. return [
  387. {
  388. "option": "A",
  389. "label": "从 Step 5 重新开始",
  390. "risk": "low",
  391. "description": "重新运行 Data Agent(幂等)",
  392. "actions": ["重新调用 Data Agent", "继续 Step 6(Git 备份)"],
  393. }
  394. ]
  395. if step_id == "Step 6":
  396. return [
  397. {
  398. "option": "A",
  399. "label": "继续 Git 提交",
  400. "risk": "low",
  401. "description": "完成未完成的 Git commit + tag",
  402. "actions": ["检查 Git 暂存区", "重新执行 backup_manager.py", "继续 complete-task"],
  403. },
  404. {
  405. "option": "B",
  406. "label": "回滚 Git 改动",
  407. "risk": "medium",
  408. "description": "丢弃暂存区所有改动",
  409. "actions": ["git reset HEAD .", f"删除第{chapter_num}章文件", "清理中断状态"],
  410. },
  411. ]
  412. return [
  413. {
  414. "option": "A",
  415. "label": "从头开始",
  416. "risk": "low",
  417. "description": "重新执行完整流程",
  418. "actions": ["清理所有中断 artifacts", f"执行 /{command} {chapter_num}"],
  419. }
  420. ]
  421. def cleanup_artifacts(chapter_num):
  422. """Cleanup partial artifacts."""
  423. artifacts_cleaned = []
  424. project_root = find_project_root()
  425. chapter_path = find_chapter_file(project_root, chapter_num)
  426. if chapter_path is None:
  427. draft_path = default_chapter_draft_path(project_root, chapter_num)
  428. if draft_path.exists():
  429. chapter_path = draft_path
  430. if chapter_path and chapter_path.exists():
  431. chapter_path.unlink()
  432. artifacts_cleaned.append(str(chapter_path.relative_to(project_root)))
  433. result = subprocess.run(["git", "reset", "HEAD", "."], cwd=project_root, capture_output=True, text=True)
  434. if result.returncode == 0:
  435. artifacts_cleaned.append("Git 暂存区已清理(project)")
  436. safe_append_call_trace(
  437. "artifacts_cleaned",
  438. {
  439. "chapter": chapter_num,
  440. "items": artifacts_cleaned,
  441. "git_reset_ok": result.returncode == 0,
  442. },
  443. )
  444. return artifacts_cleaned
  445. def clear_current_task():
  446. """Clear interrupted current task."""
  447. state = load_state()
  448. task = state.get("current_task")
  449. if task:
  450. safe_append_call_trace(
  451. "task_cleared",
  452. {
  453. "command": task.get("command"),
  454. "chapter": task.get("args", {}).get("chapter_num"),
  455. "status": task.get("status"),
  456. },
  457. )
  458. state["current_task"] = None
  459. save_state(state)
  460. print("✅ 中断任务已清除")
  461. else:
  462. print("⚠️ 无中断任务")
  463. def fail_current_task(reason: str = "manual_fail"):
  464. """Mark current task as failed and keep state for diagnostics."""
  465. state = load_state()
  466. task = state.get("current_task")
  467. if not task:
  468. print("⚠️ 无活动任务")
  469. return
  470. _mark_task_failed(state, reason=reason)
  471. save_state(state)
  472. safe_append_call_trace(
  473. "task_failed",
  474. {
  475. "command": task.get("command"),
  476. "chapter": task.get("args", {}).get("chapter_num"),
  477. "reason": reason,
  478. },
  479. )
  480. print(f"⚠️ 任务已标记失败: {reason}")
  481. def load_state():
  482. """Load workflow state."""
  483. state_file = get_workflow_state_path()
  484. if not state_file.exists():
  485. return {"current_task": None, "last_stable_state": None, "history": []}
  486. with open(state_file, "r", encoding="utf-8") as f:
  487. state = json.load(f)
  488. state.setdefault("current_task", None)
  489. state.setdefault("last_stable_state", None)
  490. state.setdefault("history", [])
  491. if state.get("current_task"):
  492. state["current_task"].setdefault("failed_steps", [])
  493. state["current_task"].setdefault("retry_count", 0)
  494. return state
  495. def save_state(state):
  496. """Save workflow state atomically."""
  497. state_file = get_workflow_state_path()
  498. create_secure_directory(str(state_file.parent))
  499. atomic_write_json(state_file, state, use_lock=True, backup=False)
  500. def get_pending_steps(command):
  501. """Get command pending step list."""
  502. if command == "webnovel-write":
  503. return ["Step 1", "Step 1.5", "Step 2A", "Step 2B", "Step 3", "Step 4", "Step 5", "Step 6"]
  504. if command == "webnovel-review":
  505. return ["Step 1", "Step 2", "Step 3", "Step 4", "Step 5", "Step 6", "Step 7", "Step 8"]
  506. return []
  507. def extract_stable_state(task):
  508. """Extract stable state snapshot."""
  509. return {
  510. "command": task["command"],
  511. "chapter_num": task["args"].get("chapter_num"),
  512. "completed_at": task.get("completed_at"),
  513. "artifacts": task.get("artifacts", {}),
  514. }
  515. if __name__ == "__main__":
  516. import argparse
  517. parser = argparse.ArgumentParser(description="工作流状态管理")
  518. subparsers = parser.add_subparsers(dest="action", help="操作类型")
  519. p_start_task = subparsers.add_parser("start-task", help="开始新任务")
  520. p_start_task.add_argument("--command", required=True, help="命令名称")
  521. p_start_task.add_argument("--chapter", type=int, help="章节号")
  522. p_start_step = subparsers.add_parser("start-step", help="开始 Step")
  523. p_start_step.add_argument("--step-id", required=True, help="Step ID")
  524. p_start_step.add_argument("--step-name", required=True, help="Step 名称")
  525. p_start_step.add_argument("--note", help="进度备注")
  526. p_complete_step = subparsers.add_parser("complete-step", help="完成 Step")
  527. p_complete_step.add_argument("--step-id", required=True, help="Step ID")
  528. p_complete_step.add_argument("--artifacts", help="Artifacts JSON")
  529. p_complete_task = subparsers.add_parser("complete-task", help="完成任务")
  530. p_complete_task.add_argument("--artifacts", help="Final artifacts JSON")
  531. p_fail_task = subparsers.add_parser("fail-task", help="标记任务失败")
  532. p_fail_task.add_argument("--reason", default="manual_fail", help="失败原因")
  533. subparsers.add_parser("detect", help="检测中断")
  534. p_cleanup = subparsers.add_parser("cleanup", help="清理 artifacts")
  535. p_cleanup.add_argument("--chapter", type=int, required=True, help="章节号")
  536. subparsers.add_parser("clear", help="清除中断任务")
  537. args = parser.parse_args()
  538. if args.action == "start-task":
  539. start_task(args.command, {"chapter_num": args.chapter})
  540. elif args.action == "start-step":
  541. start_step(args.step_id, args.step_name, args.note)
  542. elif args.action == "complete-step":
  543. complete_step(args.step_id, args.artifacts)
  544. elif args.action == "complete-task":
  545. complete_task(args.artifacts)
  546. elif args.action == "fail-task":
  547. fail_current_task(args.reason)
  548. elif args.action == "detect":
  549. interrupt = detect_interruption()
  550. if interrupt:
  551. print("\n🔶 检测到中断任务:")
  552. print(json.dumps(interrupt, ensure_ascii=False, indent=2))
  553. print("\n📕 恢复选项:")
  554. options = analyze_recovery_options(interrupt)
  555. print(json.dumps(options, ensure_ascii=False, indent=2))
  556. else:
  557. print("✅ 无中断任务")
  558. elif args.action == "cleanup":
  559. cleaned = cleanup_artifacts(args.chapter)
  560. print(f"✅ 已清理: {', '.join(cleaned)}")
  561. elif args.action == "clear":
  562. clear_current_task()
  563. else:
  564. parser.print_help()