workflow_manager.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. #!/usr/bin/env python3
  2. """
  3. 工作流状态管理器
  4. - 追踪命令执行状态
  5. - 检测中断点
  6. - 提供恢复策略
  7. """
  8. import json
  9. import os
  10. import sys
  11. import subprocess
  12. from datetime import datetime
  13. from pathlib import Path
  14. # ============================================================================
  15. # 安全修复:导入安全工具函数(P1 MEDIUM)
  16. # ============================================================================
  17. from security_utils import create_secure_directory
  18. from project_locator import resolve_project_root
  19. from chapter_paths import default_chapter_draft_path, find_chapter_file
  20. # UTF-8 编码修复(Windows兼容)
  21. if sys.platform == 'win32':
  22. import io
  23. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
  24. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
  25. def find_project_root():
  26. """解析项目根目录(包含 .webnovel/state.json)"""
  27. return resolve_project_root()
  28. def get_workflow_state_path():
  29. """获取 workflow_state.json 的完整路径"""
  30. project_root = find_project_root()
  31. return project_root / '.webnovel' / 'workflow_state.json'
  32. def start_task(command, args):
  33. """开始新任务"""
  34. state = load_state()
  35. state['current_task'] = {
  36. 'command': command,
  37. 'args': args,
  38. 'started_at': datetime.now().isoformat(),
  39. 'last_heartbeat': datetime.now().isoformat(),
  40. 'status': 'running',
  41. 'current_step': None,
  42. 'completed_steps': [],
  43. 'pending_steps': get_pending_steps(command),
  44. 'artifacts': {
  45. 'chapter_file': {},
  46. 'git_status': {},
  47. 'state_json_modified': False,
  48. 'entities_extracted': False,
  49. 'review_completed': False
  50. }
  51. }
  52. save_state(state)
  53. print(f"✅ 任务已启动: {command} {json.dumps(args, ensure_ascii=False)}")
  54. def start_step(step_id, step_name, progress_note=None):
  55. """标记Step开始"""
  56. state = load_state()
  57. if not state.get('current_task'):
  58. print("⚠️ 无活动任务,请先使用 start-task")
  59. return
  60. state['current_task']['current_step'] = {
  61. 'id': step_id,
  62. 'name': step_name,
  63. 'status': 'in_progress',
  64. 'started_at': datetime.now().isoformat(),
  65. 'progress_note': progress_note
  66. }
  67. state['current_task']['last_heartbeat'] = datetime.now().isoformat()
  68. save_state(state)
  69. print(f"▶️ {step_id} 开始: {step_name}")
  70. def complete_step(step_id, artifacts_json=None):
  71. """标记Step完成"""
  72. state = load_state()
  73. if not state.get('current_task') or not state['current_task'].get('current_step'):
  74. print("⚠️ 无活动Step")
  75. return
  76. current_step = state['current_task']['current_step']
  77. current_step['status'] = 'completed'
  78. current_step['completed_at'] = datetime.now().isoformat()
  79. if artifacts_json:
  80. try:
  81. artifacts = json.loads(artifacts_json)
  82. current_step['artifacts'] = artifacts
  83. # 更新task级别的artifacts
  84. state['current_task']['artifacts'].update(artifacts)
  85. except json.JSONDecodeError as e:
  86. print(f"⚠️ Artifacts JSON解析失败: {e}")
  87. state['current_task']['completed_steps'].append(current_step)
  88. state['current_task']['current_step'] = None
  89. state['current_task']['last_heartbeat'] = datetime.now().isoformat()
  90. save_state(state)
  91. print(f"✅ {step_id} 完成")
  92. def complete_task(final_artifacts_json=None):
  93. """标记任务完成"""
  94. state = load_state()
  95. if not state.get('current_task'):
  96. print("⚠️ 无活动任务")
  97. return
  98. state['current_task']['status'] = 'completed'
  99. state['current_task']['completed_at'] = datetime.now().isoformat()
  100. if final_artifacts_json:
  101. try:
  102. final_artifacts = json.loads(final_artifacts_json)
  103. state['current_task']['artifacts'].update(final_artifacts)
  104. except json.JSONDecodeError as e:
  105. print(f"⚠️ Final artifacts JSON解析失败: {e}")
  106. # 保存到历史记录
  107. state['last_stable_state'] = extract_stable_state(state['current_task'])
  108. if 'history' not in state:
  109. state['history'] = []
  110. state['history'].append({
  111. 'task_id': f"task_{len(state['history']) + 1:03d}",
  112. 'command': state['current_task']['command'],
  113. 'chapter': state['current_task']['args'].get('chapter_num'),
  114. 'status': 'completed',
  115. 'completed_at': state['current_task']['completed_at']
  116. })
  117. # 清除当前任务
  118. state['current_task'] = None
  119. save_state(state)
  120. print(f"🎉 任务完成")
  121. def detect_interruption():
  122. """检测中断状态"""
  123. state = load_state()
  124. if not state or 'current_task' not in state or state['current_task'] is None:
  125. return None # 无中断任务
  126. task = state['current_task']
  127. if task['status'] == 'completed':
  128. return None # 任务已完成
  129. # 判断中断原因
  130. last_heartbeat = datetime.fromisoformat(task['last_heartbeat'])
  131. elapsed = (datetime.now() - last_heartbeat).total_seconds()
  132. interrupt_info = {
  133. 'command': task['command'],
  134. 'args': task['args'],
  135. 'current_step': task['current_step'],
  136. 'completed_steps': task['completed_steps'],
  137. 'elapsed_seconds': elapsed,
  138. 'artifacts': task['artifacts'],
  139. 'started_at': task['started_at']
  140. }
  141. return interrupt_info
  142. def analyze_recovery_options(interrupt_info):
  143. """分析恢复选项(基于中断点)"""
  144. current_step = interrupt_info['current_step']
  145. command = interrupt_info['command']
  146. chapter_num = interrupt_info['args'].get('chapter_num', '?')
  147. if not current_step:
  148. # 任务刚开始就中断
  149. return [{
  150. 'option': 'A',
  151. 'label': '从头开始',
  152. 'risk': 'low',
  153. 'description': '重新执行完整流程',
  154. 'actions': [
  155. f"删除 workflow_state.json 当前任务",
  156. f"执行 /{command} {chapter_num}"
  157. ]
  158. }]
  159. step_id = current_step['id']
  160. # 基于Step ID的恢复策略
  161. if step_id == 'Step 1':
  162. # Step 1中断:无副作用
  163. return [{
  164. 'option': 'A',
  165. 'label': '从Step 1重新开始',
  166. 'risk': 'low',
  167. 'description': '重新加载上下文',
  168. 'actions': [
  169. f"清理中断状态",
  170. f"执行 /{command} {chapter_num}"
  171. ]
  172. }]
  173. elif step_id == 'Step 2':
  174. # Step 2中断:可能有半成品文件
  175. chapter_file = interrupt_info['artifacts'].get('chapter_file', {})
  176. chapter_path = f"正文/第{chapter_num:04d}章.md"
  177. options = [{
  178. 'option': 'A',
  179. 'label': '删除半成品,从Step 1重新开始',
  180. 'risk': 'low',
  181. 'description': f"清理 {chapter_path},重新生成章节",
  182. 'actions': [
  183. f"删除 {chapter_path}(如存在)",
  184. f"清理 Git 暂存区",
  185. f"清理中断状态",
  186. f"执行 /{command} {chapter_num}"
  187. ]
  188. }]
  189. # 检查文件是否存在
  190. if os.path.exists(chapter_path):
  191. options.append({
  192. 'option': 'B',
  193. 'label': '回滚到上一章',
  194. 'risk': 'medium',
  195. 'description': '丢弃所有当前章节进度',
  196. 'actions': [
  197. f"git reset --hard ch{(chapter_num-1):04d}",
  198. f"清理中断状态",
  199. "重新决定是否继续Ch{chapter_num}"
  200. ]
  201. })
  202. return options
  203. elif step_id in ['Step 3', 'Step 6']:
  204. # Step 3/6中断:脚本未执行完
  205. return [{
  206. 'option': 'A',
  207. 'label': f'从{step_id}重新开始',
  208. 'risk': 'low',
  209. 'description': '重新运行脚本(幂等操作)',
  210. 'actions': [
  211. f"重新执行脚本",
  212. f"继续后续Step"
  213. ]
  214. }]
  215. elif step_id == 'Step 4':
  216. # Step 4中断:state.json可能部分更新
  217. return [
  218. {
  219. 'option': 'A',
  220. 'label': '检查并修复state.json',
  221. 'risk': 'medium',
  222. 'description': '验证state.json一致性,补全缺失字段',
  223. 'actions': [
  224. "读取 state.json",
  225. "检查必要字段(progress, protagonist_state等)",
  226. "如缺失则从前一章推断",
  227. "重新执行 update_state.py",
  228. "继续Step 5"
  229. ]
  230. },
  231. {
  232. 'option': 'B',
  233. 'label': '回滚到上一章',
  234. 'risk': 'high',
  235. 'description': '恢复到上一章的state.json快照',
  236. 'actions': [
  237. f"git checkout ch{(chapter_num-1):04d} -- .webnovel/state.json",
  238. f"删除第{chapter_num}章文件",
  239. "清理中断状态"
  240. ]
  241. }
  242. ]
  243. elif step_id == 'Step 5':
  244. # Step 5中断:Git未提交
  245. return [
  246. {
  247. 'option': 'A',
  248. 'label': '继续Git提交',
  249. 'risk': 'low',
  250. 'description': '完成未完成的Git commit + tag',
  251. 'actions': [
  252. "检查 Git 暂存区",
  253. "重新执行 backup_manager.py",
  254. "继续Step 6"
  255. ]
  256. },
  257. {
  258. 'option': 'B',
  259. 'label': '回滚Git改动',
  260. 'risk': 'medium',
  261. 'description': '丢弃暂存区所有改动',
  262. 'actions': [
  263. "git reset HEAD .",
  264. f"删除第{chapter_num}章文件",
  265. "清理中断状态"
  266. ]
  267. }
  268. ]
  269. elif step_id == 'Step 7':
  270. # Step 7中断:审查未完成
  271. return [
  272. {
  273. 'option': 'A',
  274. 'label': '重新执行双章审查',
  275. 'risk': 'high',
  276. 'description': '重新调用5个审查员(成本高,耗时长)',
  277. 'actions': [
  278. "重新调用5个审查员(并行)",
  279. "生成审查报告",
  280. "更新 state.json review_checkpoints"
  281. ]
  282. },
  283. {
  284. 'option': 'B',
  285. 'label': '跳过审查,继续下一章',
  286. 'risk': 'medium',
  287. 'description': '不进行审查(可后续用 /webnovel-review 补审)',
  288. 'actions': [
  289. "标记审查为已跳过",
  290. "清理中断状态",
  291. "可继续创作下一章"
  292. ]
  293. }
  294. ]
  295. # 默认选项
  296. return [{
  297. 'option': 'A',
  298. 'label': '从头开始',
  299. 'risk': 'low',
  300. 'description': '重新执行完整流程',
  301. 'actions': [
  302. f"清理所有中断artifacts",
  303. f"执行 /{command} {chapter_num}"
  304. ]
  305. }]
  306. def cleanup_artifacts(chapter_num):
  307. """清理半成品artifacts"""
  308. artifacts_cleaned = []
  309. project_root = find_project_root()
  310. # 删除章节文件(兼容多种命名/目录结构)
  311. chapter_path = find_chapter_file(project_root, chapter_num)
  312. if chapter_path is None:
  313. # 可能是“草稿路径”但尚未重命名
  314. draft_path = default_chapter_draft_path(project_root, chapter_num)
  315. if draft_path.exists():
  316. chapter_path = draft_path
  317. if chapter_path and chapter_path.exists():
  318. chapter_path.unlink()
  319. artifacts_cleaned.append(str(chapter_path.relative_to(project_root)))
  320. # 清理Git暂存区
  321. result = subprocess.run(
  322. ['git', 'reset', 'HEAD', '.'],
  323. cwd=project_root,
  324. capture_output=True,
  325. text=True
  326. )
  327. if result.returncode == 0:
  328. artifacts_cleaned.append("Git暂存区已清理(project)")
  329. return artifacts_cleaned
  330. def clear_current_task():
  331. """清除当前中断任务"""
  332. state = load_state()
  333. if state.get('current_task'):
  334. state['current_task'] = None
  335. save_state(state)
  336. print("✅ 中断任务已清除")
  337. else:
  338. print("⚠️ 无中断任务")
  339. def load_state():
  340. """加载workflow状态"""
  341. state_file = get_workflow_state_path()
  342. if not state_file.exists():
  343. return {'current_task': None, 'last_stable_state': None, 'history': []}
  344. with open(state_file, 'r', encoding='utf-8') as f:
  345. return json.load(f)
  346. def save_state(state):
  347. """保存workflow状态"""
  348. state_file = get_workflow_state_path()
  349. # ============================================================================
  350. # 安全修复:使用安全目录创建函数(P1 MEDIUM)
  351. # 原代码: os.makedirs(os.path.dirname(WORKFLOW_STATE_FILE), exist_ok=True)
  352. # 漏洞: 未设置权限,使用OS默认(可能为755,允许同组用户读取)
  353. # ============================================================================
  354. create_secure_directory(str(state_file.parent))
  355. with open(state_file, 'w', encoding='utf-8') as f:
  356. json.dump(state, f, ensure_ascii=False, indent=2)
  357. def get_pending_steps(command):
  358. """获取待执行步骤列表"""
  359. if command == 'webnovel-write':
  360. return ['Step 1', 'Step 2', 'Step 3', 'Step 4', 'Step 5', 'Step 6', 'Step 7']
  361. elif command == 'webnovel-review':
  362. return ['Step 1', 'Step 2', 'Step 3', 'Step 4', 'Step 5', 'Step 6', 'Step 7', 'Step 8']
  363. # 其他命令...
  364. return []
  365. def extract_stable_state(task):
  366. """提取稳定状态快照"""
  367. return {
  368. 'command': task['command'],
  369. 'chapter_num': task['args'].get('chapter_num'),
  370. 'completed_at': task.get('completed_at'),
  371. 'artifacts': task.get('artifacts', {})
  372. }
  373. # CLI接口
  374. if __name__ == '__main__':
  375. import argparse
  376. parser = argparse.ArgumentParser(description='工作流状态管理')
  377. subparsers = parser.add_subparsers(dest='action', help='操作类型')
  378. # start-task
  379. p_start_task = subparsers.add_parser('start-task', help='开始新任务')
  380. p_start_task.add_argument('--command', required=True, help='命令名称')
  381. p_start_task.add_argument('--chapter', type=int, help='章节号')
  382. # start-step
  383. p_start_step = subparsers.add_parser('start-step', help='开始Step')
  384. p_start_step.add_argument('--step-id', required=True, help='Step ID')
  385. p_start_step.add_argument('--step-name', required=True, help='Step名称')
  386. p_start_step.add_argument('--note', help='进度备注')
  387. # complete-step
  388. p_complete_step = subparsers.add_parser('complete-step', help='完成Step')
  389. p_complete_step.add_argument('--step-id', required=True, help='Step ID')
  390. p_complete_step.add_argument('--artifacts', help='Artifacts JSON')
  391. # complete-task
  392. p_complete_task = subparsers.add_parser('complete-task', help='完成任务')
  393. p_complete_task.add_argument('--artifacts', help='Final artifacts JSON')
  394. # detect
  395. subparsers.add_parser('detect', help='检测中断')
  396. # cleanup
  397. p_cleanup = subparsers.add_parser('cleanup', help='清理artifacts')
  398. p_cleanup.add_argument('--chapter', type=int, required=True, help='章节号')
  399. # clear
  400. subparsers.add_parser('clear', help='清除中断任务')
  401. args = parser.parse_args()
  402. if args.action == 'start-task':
  403. start_task(args.command, {'chapter_num': args.chapter})
  404. elif args.action == 'start-step':
  405. start_step(args.step_id, args.step_name, args.note)
  406. elif args.action == 'complete-step':
  407. complete_step(args.step_id, args.artifacts)
  408. elif args.action == 'complete-task':
  409. complete_task(args.artifacts)
  410. elif args.action == 'detect':
  411. interrupt = detect_interruption()
  412. if interrupt:
  413. print("\n🔴 检测到中断任务:")
  414. print(json.dumps(interrupt, ensure_ascii=False, indent=2))
  415. print("\n💡 恢复选项:")
  416. options = analyze_recovery_options(interrupt)
  417. print(json.dumps(options, ensure_ascii=False, indent=2))
  418. else:
  419. print("✅ 无中断任务")
  420. elif args.action == 'cleanup':
  421. cleaned = cleanup_artifacts(args.chapter)
  422. print(f"✅ 已清理: {', '.join(cleaned)}")
  423. elif args.action == 'clear':
  424. clear_current_task()
  425. else:
  426. parser.print_help()