workflow_manager.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. #!/usr/bin/env python3
  2. """
  3. 工作流状态管理器
  4. - 追踪命令执行状态
  5. - 检测中断点
  6. - 提供恢复策略
  7. """
  8. import json
  9. import os
  10. import sys
  11. import subprocess
  12. from datetime import datetime
  13. from pathlib import Path
  14. # ============================================================================
  15. # 安全修复:导入安全工具函数(P1 MEDIUM)
  16. # ============================================================================
  17. from security_utils import create_secure_directory, atomic_write_json
  18. from project_locator import resolve_project_root
  19. from chapter_paths import default_chapter_draft_path, find_chapter_file
  20. # UTF-8 编码修复(Windows兼容)
  21. if sys.platform == 'win32':
  22. import io
  23. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
  24. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
  25. def find_project_root():
  26. """解析项目根目录(包含 .webnovel/state.json)"""
  27. return resolve_project_root()
  28. def get_workflow_state_path():
  29. """获取 workflow_state.json 的完整路径"""
  30. project_root = find_project_root()
  31. return project_root / '.webnovel' / 'workflow_state.json'
  32. def start_task(command, args):
  33. """开始新任务"""
  34. state = load_state()
  35. state['current_task'] = {
  36. 'command': command,
  37. 'args': args,
  38. 'started_at': datetime.now().isoformat(),
  39. 'last_heartbeat': datetime.now().isoformat(),
  40. 'status': 'running',
  41. 'current_step': None,
  42. 'completed_steps': [],
  43. 'pending_steps': get_pending_steps(command),
  44. 'artifacts': {
  45. 'chapter_file': {},
  46. 'git_status': {},
  47. 'state_json_modified': False,
  48. 'entities_appeared': False,
  49. 'review_completed': False
  50. }
  51. }
  52. save_state(state)
  53. print(f"✅ 任务已启动: {command} {json.dumps(args, ensure_ascii=False)}")
  54. def start_step(step_id, step_name, progress_note=None):
  55. """标记Step开始"""
  56. state = load_state()
  57. if not state.get('current_task'):
  58. print("⚠️ 无活动任务,请先使用 start-task")
  59. return
  60. state['current_task']['current_step'] = {
  61. 'id': step_id,
  62. 'name': step_name,
  63. 'status': 'in_progress',
  64. 'started_at': datetime.now().isoformat(),
  65. 'progress_note': progress_note
  66. }
  67. state['current_task']['last_heartbeat'] = datetime.now().isoformat()
  68. save_state(state)
  69. print(f"▶️ {step_id} 开始: {step_name}")
  70. def complete_step(step_id, artifacts_json=None):
  71. """标记Step完成"""
  72. state = load_state()
  73. if not state.get('current_task') or not state['current_task'].get('current_step'):
  74. print("⚠️ 无活动Step")
  75. return
  76. current_step = state['current_task']['current_step']
  77. current_step['status'] = 'completed'
  78. current_step['completed_at'] = datetime.now().isoformat()
  79. if artifacts_json:
  80. try:
  81. artifacts = json.loads(artifacts_json)
  82. current_step['artifacts'] = artifacts
  83. # 更新task级别的artifacts
  84. state['current_task']['artifacts'].update(artifacts)
  85. except json.JSONDecodeError as e:
  86. print(f"⚠️ Artifacts JSON解析失败: {e}")
  87. state['current_task']['completed_steps'].append(current_step)
  88. state['current_task']['current_step'] = None
  89. state['current_task']['last_heartbeat'] = datetime.now().isoformat()
  90. save_state(state)
  91. print(f"✅ {step_id} 完成")
  92. def complete_task(final_artifacts_json=None):
  93. """标记任务完成"""
  94. state = load_state()
  95. if not state.get('current_task'):
  96. print("⚠️ 无活动任务")
  97. return
  98. state['current_task']['status'] = 'completed'
  99. state['current_task']['completed_at'] = datetime.now().isoformat()
  100. if final_artifacts_json:
  101. try:
  102. final_artifacts = json.loads(final_artifacts_json)
  103. state['current_task']['artifacts'].update(final_artifacts)
  104. except json.JSONDecodeError as e:
  105. print(f"⚠️ Final artifacts JSON解析失败: {e}")
  106. # 保存到历史记录
  107. state['last_stable_state'] = extract_stable_state(state['current_task'])
  108. if 'history' not in state:
  109. state['history'] = []
  110. state['history'].append({
  111. 'task_id': f"task_{len(state['history']) + 1:03d}",
  112. 'command': state['current_task']['command'],
  113. 'chapter': state['current_task']['args'].get('chapter_num'),
  114. 'status': 'completed',
  115. 'completed_at': state['current_task']['completed_at']
  116. })
  117. # 清除当前任务
  118. state['current_task'] = None
  119. save_state(state)
  120. print(f"🎉 任务完成")
  121. def detect_interruption():
  122. """检测中断状态"""
  123. state = load_state()
  124. if not state or 'current_task' not in state or state['current_task'] is None:
  125. return None # 无中断任务
  126. task = state['current_task']
  127. if task['status'] == 'completed':
  128. return None # 任务已完成
  129. # 判断中断原因
  130. last_heartbeat = datetime.fromisoformat(task['last_heartbeat'])
  131. elapsed = (datetime.now() - last_heartbeat).total_seconds()
  132. interrupt_info = {
  133. 'command': task['command'],
  134. 'args': task['args'],
  135. 'current_step': task['current_step'],
  136. 'completed_steps': task['completed_steps'],
  137. 'elapsed_seconds': elapsed,
  138. 'artifacts': task['artifacts'],
  139. 'started_at': task['started_at']
  140. }
  141. return interrupt_info
  142. def analyze_recovery_options(interrupt_info):
  143. """分析恢复选项(基于中断点)"""
  144. current_step = interrupt_info['current_step']
  145. command = interrupt_info['command']
  146. chapter_num = interrupt_info['args'].get('chapter_num', '?')
  147. if not current_step:
  148. # 任务刚开始就中断
  149. return [{
  150. 'option': 'A',
  151. 'label': '从头开始',
  152. 'risk': 'low',
  153. 'description': '重新执行完整流程',
  154. 'actions': [
  155. f"删除 workflow_state.json 当前任务",
  156. f"执行 /{command} {chapter_num}"
  157. ]
  158. }]
  159. step_id = current_step['id']
  160. # 基于Step ID的恢复策略
  161. if step_id == 'Step 1':
  162. # Step 1中断:无副作用
  163. return [{
  164. 'option': 'A',
  165. 'label': '从Step 1重新开始',
  166. 'risk': 'low',
  167. 'description': '重新加载上下文',
  168. 'actions': [
  169. f"清理中断状态",
  170. f"执行 /{command} {chapter_num}"
  171. ]
  172. }]
  173. elif step_id == 'Step 2':
  174. # Step 2中断:可能有半成品文件
  175. chapter_file = interrupt_info['artifacts'].get('chapter_file', {})
  176. # 使用 chapter_paths 模块定位章节文件(兼容新旧目录结构)
  177. project_root = find_project_root()
  178. existing_chapter = find_chapter_file(project_root, chapter_num)
  179. draft_path = None
  180. if existing_chapter:
  181. chapter_path = str(existing_chapter.relative_to(project_root))
  182. else:
  183. # 如果不存在,使用新格式的默认路径
  184. draft_path = default_chapter_draft_path(project_root, chapter_num)
  185. chapter_path = str(draft_path.relative_to(project_root))
  186. options = [{
  187. 'option': 'A',
  188. 'label': '删除半成品,从Step 1重新开始',
  189. 'risk': 'low',
  190. 'description': f"清理 {chapter_path},重新生成章节",
  191. 'actions': [
  192. f"删除 {chapter_path}(如存在)",
  193. f"清理 Git 暂存区",
  194. f"清理中断状态",
  195. f"执行 /{command} {chapter_num}"
  196. ]
  197. }]
  198. # 检查文件是否存在
  199. candidate = existing_chapter or draft_path
  200. if candidate and candidate.exists():
  201. options.append({
  202. 'option': 'B',
  203. 'label': '回滚到上一章',
  204. 'risk': 'medium',
  205. 'description': '丢弃所有当前章节进度',
  206. 'actions': [
  207. f"git reset --hard ch{(chapter_num-1):04d}",
  208. f"清理中断状态",
  209. "重新决定是否继续Ch{chapter_num}"
  210. ]
  211. })
  212. return options
  213. elif step_id == 'Step 3':
  214. # Step 3 中断:审查未完成
  215. return [
  216. {
  217. 'option': 'A',
  218. 'label': '重新执行审查',
  219. 'risk': 'medium',
  220. 'description': '重新调用5个审查员(并行)',
  221. 'actions': [
  222. "重新调用5个审查员(并行)",
  223. "生成审查报告",
  224. "继续 Step 4 润色"
  225. ]
  226. },
  227. {
  228. 'option': 'B',
  229. 'label': '跳过审查,直接润色',
  230. 'risk': 'low',
  231. 'description': '不进行审查,可后续用 /webnovel-review 补审',
  232. 'actions': [
  233. "标记审查为已跳过",
  234. "继续 Step 4 润色"
  235. ]
  236. }
  237. ]
  238. elif step_id == 'Step 4':
  239. # Step 4 中断:润色中
  240. project_root = find_project_root()
  241. existing_chapter = find_chapter_file(project_root, chapter_num)
  242. draft_path = None
  243. if existing_chapter:
  244. chapter_path = str(existing_chapter.relative_to(project_root))
  245. else:
  246. draft_path = default_chapter_draft_path(project_root, chapter_num)
  247. chapter_path = str(draft_path.relative_to(project_root))
  248. return [
  249. {
  250. 'option': 'A',
  251. 'label': '继续润色',
  252. 'risk': 'low',
  253. 'description': f"继续润色 {chapter_path},完成后进入 Step 5",
  254. 'actions': [
  255. f"打开并继续润色 {chapter_path}",
  256. "保存文件",
  257. "继续 Step 5(Data Agent)"
  258. ]
  259. },
  260. {
  261. 'option': 'B',
  262. 'label': '删除润色稿,从 Step 2 重写',
  263. 'risk': 'medium',
  264. 'description': f"删除 {chapter_path},重新生成章节内容",
  265. 'actions': [
  266. f"删除 {chapter_path}",
  267. "清理 Git 暂存区",
  268. "清理中断状态",
  269. f"执行 /{command} {chapter_num}"
  270. ]
  271. }
  272. ]
  273. elif step_id == 'Step 5':
  274. # Step 5 中断:Data Agent 处理中
  275. return [{
  276. 'option': 'A',
  277. 'label': '从 Step 5 重新开始',
  278. 'risk': 'low',
  279. 'description': '重新运行 Data Agent(幂等操作)',
  280. 'actions': [
  281. "重新调用 Data Agent",
  282. "继续 Step 6(Git 备份)"
  283. ]
  284. }]
  285. elif step_id == 'Step 6':
  286. # Step 6 中断:Git 未提交
  287. return [
  288. {
  289. 'option': 'A',
  290. 'label': '继续 Git 提交',
  291. 'risk': 'low',
  292. 'description': '完成未完成的 Git commit + tag',
  293. 'actions': [
  294. "检查 Git 暂存区",
  295. "重新执行 backup_manager.py",
  296. "继续完成工作流追踪(complete-task)"
  297. ]
  298. },
  299. {
  300. 'option': 'B',
  301. 'label': '回滚 Git 改动',
  302. 'risk': 'medium',
  303. 'description': '丢弃暂存区所有改动',
  304. 'actions': [
  305. "git reset HEAD .",
  306. f"删除第{chapter_num}章文件",
  307. "清理中断状态"
  308. ]
  309. }
  310. ]
  311. # 默认选项
  312. return [{
  313. 'option': 'A',
  314. 'label': '从头开始',
  315. 'risk': 'low',
  316. 'description': '重新执行完整流程',
  317. 'actions': [
  318. f"清理所有中断artifacts",
  319. f"执行 /{command} {chapter_num}"
  320. ]
  321. }]
  322. def cleanup_artifacts(chapter_num):
  323. """清理半成品artifacts"""
  324. artifacts_cleaned = []
  325. project_root = find_project_root()
  326. # 删除章节文件(兼容多种命名/目录结构)
  327. chapter_path = find_chapter_file(project_root, chapter_num)
  328. if chapter_path is None:
  329. # 可能是“草稿路径”但尚未重命名
  330. draft_path = default_chapter_draft_path(project_root, chapter_num)
  331. if draft_path.exists():
  332. chapter_path = draft_path
  333. if chapter_path and chapter_path.exists():
  334. chapter_path.unlink()
  335. artifacts_cleaned.append(str(chapter_path.relative_to(project_root)))
  336. # 清理Git暂存区
  337. result = subprocess.run(
  338. ['git', 'reset', 'HEAD', '.'],
  339. cwd=project_root,
  340. capture_output=True,
  341. text=True
  342. )
  343. if result.returncode == 0:
  344. artifacts_cleaned.append("Git暂存区已清理(project)")
  345. return artifacts_cleaned
  346. def clear_current_task():
  347. """清除当前中断任务"""
  348. state = load_state()
  349. if state.get('current_task'):
  350. state['current_task'] = None
  351. save_state(state)
  352. print("✅ 中断任务已清除")
  353. else:
  354. print("⚠️ 无中断任务")
  355. def load_state():
  356. """加载workflow状态"""
  357. state_file = get_workflow_state_path()
  358. if not state_file.exists():
  359. return {'current_task': None, 'last_stable_state': None, 'history': []}
  360. with open(state_file, 'r', encoding='utf-8') as f:
  361. return json.load(f)
  362. def save_state(state):
  363. """保存workflow状态(原子化写入)"""
  364. state_file = get_workflow_state_path()
  365. # ============================================================================
  366. # 安全修复:使用原子化写入(P1 MEDIUM)
  367. # ============================================================================
  368. create_secure_directory(str(state_file.parent))
  369. atomic_write_json(state_file, state, use_lock=True, backup=False)
  370. def get_pending_steps(command):
  371. """获取待执行步骤列表 (v5.0)"""
  372. if command == 'webnovel-write':
  373. # v5.0 工作流:6 步
  374. # Step 1: Context Agent 搜集上下文
  375. # Step 2: 生成章节内容 (纯正文,3000-5000字)
  376. # Step 3: 审查 (5个Agent并行,只报告)
  377. # Step 4: 润色 (基于审查报告修复 + 去AI痕迹)
  378. # Step 5: Data Agent 处理数据链
  379. # Step 6: Git 备份
  380. return ['Step 1', 'Step 2', 'Step 3', 'Step 4', 'Step 5', 'Step 6']
  381. elif command == 'webnovel-review':
  382. return ['Step 1', 'Step 2', 'Step 3', 'Step 4', 'Step 5', 'Step 6', 'Step 7', 'Step 8']
  383. # 其他命令...
  384. return []
  385. def extract_stable_state(task):
  386. """提取稳定状态快照"""
  387. return {
  388. 'command': task['command'],
  389. 'chapter_num': task['args'].get('chapter_num'),
  390. 'completed_at': task.get('completed_at'),
  391. 'artifacts': task.get('artifacts', {})
  392. }
  393. # CLI接口
  394. if __name__ == '__main__':
  395. import argparse
  396. parser = argparse.ArgumentParser(description='工作流状态管理')
  397. subparsers = parser.add_subparsers(dest='action', help='操作类型')
  398. # start-task
  399. p_start_task = subparsers.add_parser('start-task', help='开始新任务')
  400. p_start_task.add_argument('--command', required=True, help='命令名称')
  401. p_start_task.add_argument('--chapter', type=int, help='章节号')
  402. # start-step
  403. p_start_step = subparsers.add_parser('start-step', help='开始Step')
  404. p_start_step.add_argument('--step-id', required=True, help='Step ID')
  405. p_start_step.add_argument('--step-name', required=True, help='Step名称')
  406. p_start_step.add_argument('--note', help='进度备注')
  407. # complete-step
  408. p_complete_step = subparsers.add_parser('complete-step', help='完成Step')
  409. p_complete_step.add_argument('--step-id', required=True, help='Step ID')
  410. p_complete_step.add_argument('--artifacts', help='Artifacts JSON')
  411. # complete-task
  412. p_complete_task = subparsers.add_parser('complete-task', help='完成任务')
  413. p_complete_task.add_argument('--artifacts', help='Final artifacts JSON')
  414. # detect
  415. subparsers.add_parser('detect', help='检测中断')
  416. # cleanup
  417. p_cleanup = subparsers.add_parser('cleanup', help='清理artifacts')
  418. p_cleanup.add_argument('--chapter', type=int, required=True, help='章节号')
  419. # clear
  420. subparsers.add_parser('clear', help='清除中断任务')
  421. args = parser.parse_args()
  422. if args.action == 'start-task':
  423. start_task(args.command, {'chapter_num': args.chapter})
  424. elif args.action == 'start-step':
  425. start_step(args.step_id, args.step_name, args.note)
  426. elif args.action == 'complete-step':
  427. complete_step(args.step_id, args.artifacts)
  428. elif args.action == 'complete-task':
  429. complete_task(args.artifacts)
  430. elif args.action == 'detect':
  431. interrupt = detect_interruption()
  432. if interrupt:
  433. print("\n🔴 检测到中断任务:")
  434. print(json.dumps(interrupt, ensure_ascii=False, indent=2))
  435. print("\n💡 恢复选项:")
  436. options = analyze_recovery_options(interrupt)
  437. print(json.dumps(options, ensure_ascii=False, indent=2))
  438. else:
  439. print("✅ 无中断任务")
  440. elif args.action == 'cleanup':
  441. cleaned = cleanup_artifacts(args.chapter)
  442. print(f"✅ 已清理: {', '.join(cleaned)}")
  443. elif args.action == 'clear':
  444. clear_current_task()
  445. else:
  446. parser.print_help()