| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504 |
- #!/usr/bin/env python3
- """
- 工作流状态管理器
- - 追踪命令执行状态
- - 检测中断点
- - 提供恢复策略
- """
- import json
- import os
- import sys
- import subprocess
- from datetime import datetime
- from pathlib import Path
- # ============================================================================
- # 安全修复:导入安全工具函数(P1 MEDIUM)
- # ============================================================================
- from security_utils import create_secure_directory, atomic_write_json
- from project_locator import resolve_project_root
- from chapter_paths import default_chapter_draft_path, find_chapter_file
- # UTF-8 编码修复(Windows兼容)
- if sys.platform == 'win32':
- import io
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
- sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
- def find_project_root():
- """解析项目根目录(包含 .webnovel/state.json)"""
- return resolve_project_root()
- def get_workflow_state_path():
- """获取 workflow_state.json 的完整路径"""
- project_root = find_project_root()
- return project_root / '.webnovel' / 'workflow_state.json'
- def start_task(command, args):
- """开始新任务"""
- state = load_state()
- state['current_task'] = {
- 'command': command,
- 'args': args,
- 'started_at': datetime.now().isoformat(),
- 'last_heartbeat': datetime.now().isoformat(),
- 'status': 'running',
- 'current_step': None,
- 'completed_steps': [],
- 'pending_steps': get_pending_steps(command),
- 'artifacts': {
- 'chapter_file': {},
- 'git_status': {},
- 'state_json_modified': False,
- 'entities_appeared': False,
- 'review_completed': False
- }
- }
- save_state(state)
- print(f"✅ 任务已启动: {command} {json.dumps(args, ensure_ascii=False)}")
- def start_step(step_id, step_name, progress_note=None):
- """标记Step开始"""
- state = load_state()
- if not state.get('current_task'):
- print("⚠️ 无活动任务,请先使用 start-task")
- return
- state['current_task']['current_step'] = {
- 'id': step_id,
- 'name': step_name,
- 'status': 'in_progress',
- 'started_at': datetime.now().isoformat(),
- 'progress_note': progress_note
- }
- state['current_task']['last_heartbeat'] = datetime.now().isoformat()
- save_state(state)
- print(f"▶️ {step_id} 开始: {step_name}")
- def complete_step(step_id, artifacts_json=None):
- """标记Step完成"""
- state = load_state()
- if not state.get('current_task') or not state['current_task'].get('current_step'):
- print("⚠️ 无活动Step")
- return
- current_step = state['current_task']['current_step']
- current_step['status'] = 'completed'
- current_step['completed_at'] = datetime.now().isoformat()
- if artifacts_json:
- try:
- artifacts = json.loads(artifacts_json)
- current_step['artifacts'] = artifacts
- # 更新task级别的artifacts
- state['current_task']['artifacts'].update(artifacts)
- except json.JSONDecodeError as e:
- print(f"⚠️ Artifacts JSON解析失败: {e}")
- state['current_task']['completed_steps'].append(current_step)
- state['current_task']['current_step'] = None
- state['current_task']['last_heartbeat'] = datetime.now().isoformat()
- save_state(state)
- print(f"✅ {step_id} 完成")
- def complete_task(final_artifacts_json=None):
- """标记任务完成"""
- state = load_state()
- if not state.get('current_task'):
- print("⚠️ 无活动任务")
- return
- state['current_task']['status'] = 'completed'
- state['current_task']['completed_at'] = datetime.now().isoformat()
- if final_artifacts_json:
- try:
- final_artifacts = json.loads(final_artifacts_json)
- state['current_task']['artifacts'].update(final_artifacts)
- except json.JSONDecodeError as e:
- print(f"⚠️ Final artifacts JSON解析失败: {e}")
- # 保存到历史记录
- state['last_stable_state'] = extract_stable_state(state['current_task'])
- if 'history' not in state:
- state['history'] = []
- state['history'].append({
- 'task_id': f"task_{len(state['history']) + 1:03d}",
- 'command': state['current_task']['command'],
- 'chapter': state['current_task']['args'].get('chapter_num'),
- 'status': 'completed',
- 'completed_at': state['current_task']['completed_at']
- })
- # 清除当前任务
- state['current_task'] = None
- save_state(state)
- print(f"🎉 任务完成")
- def detect_interruption():
- """检测中断状态"""
- state = load_state()
- if not state or 'current_task' not in state or state['current_task'] is None:
- return None # 无中断任务
- task = state['current_task']
- if task['status'] == 'completed':
- return None # 任务已完成
- # 判断中断原因
- last_heartbeat = datetime.fromisoformat(task['last_heartbeat'])
- elapsed = (datetime.now() - last_heartbeat).total_seconds()
- interrupt_info = {
- 'command': task['command'],
- 'args': task['args'],
- 'current_step': task['current_step'],
- 'completed_steps': task['completed_steps'],
- 'elapsed_seconds': elapsed,
- 'artifacts': task['artifacts'],
- 'started_at': task['started_at']
- }
- return interrupt_info
- def analyze_recovery_options(interrupt_info):
- """分析恢复选项(基于中断点)"""
- current_step = interrupt_info['current_step']
- command = interrupt_info['command']
- chapter_num = interrupt_info['args'].get('chapter_num', '?')
- if not current_step:
- # 任务刚开始就中断
- return [{
- 'option': 'A',
- 'label': '从头开始',
- 'risk': 'low',
- 'description': '重新执行完整流程',
- 'actions': [
- f"删除 workflow_state.json 当前任务",
- f"执行 /{command} {chapter_num}"
- ]
- }]
- step_id = current_step['id']
- # 基于Step ID的恢复策略
- if step_id == 'Step 1':
- # Step 1中断:无副作用
- return [{
- 'option': 'A',
- 'label': '从Step 1重新开始',
- 'risk': 'low',
- 'description': '重新加载上下文',
- 'actions': [
- f"清理中断状态",
- f"执行 /{command} {chapter_num}"
- ]
- }]
- elif step_id == 'Step 2':
- # Step 2中断:可能有半成品文件
- chapter_file = interrupt_info['artifacts'].get('chapter_file', {})
- # 使用 chapter_paths 模块定位章节文件(兼容新旧目录结构)
- project_root = find_project_root()
- existing_chapter = find_chapter_file(project_root, chapter_num)
- draft_path = None
- if existing_chapter:
- chapter_path = str(existing_chapter.relative_to(project_root))
- else:
- # 如果不存在,使用新格式的默认路径
- draft_path = default_chapter_draft_path(project_root, chapter_num)
- chapter_path = str(draft_path.relative_to(project_root))
- options = [{
- 'option': 'A',
- 'label': '删除半成品,从Step 1重新开始',
- 'risk': 'low',
- 'description': f"清理 {chapter_path},重新生成章节",
- 'actions': [
- f"删除 {chapter_path}(如存在)",
- f"清理 Git 暂存区",
- f"清理中断状态",
- f"执行 /{command} {chapter_num}"
- ]
- }]
- # 检查文件是否存在
- candidate = existing_chapter or draft_path
- if candidate and candidate.exists():
- options.append({
- 'option': 'B',
- 'label': '回滚到上一章',
- 'risk': 'medium',
- 'description': '丢弃所有当前章节进度',
- 'actions': [
- f"git reset --hard ch{(chapter_num-1):04d}",
- f"清理中断状态",
- "重新决定是否继续Ch{chapter_num}"
- ]
- })
- return options
- elif step_id == 'Step 3':
- # Step 3 中断:审查未完成
- return [
- {
- 'option': 'A',
- 'label': '重新执行审查',
- 'risk': 'medium',
- 'description': '重新调用5个审查员(并行)',
- 'actions': [
- "重新调用5个审查员(并行)",
- "生成审查报告",
- "继续 Step 4 润色"
- ]
- },
- {
- 'option': 'B',
- 'label': '跳过审查,直接润色',
- 'risk': 'low',
- 'description': '不进行审查,可后续用 /webnovel-review 补审',
- 'actions': [
- "标记审查为已跳过",
- "继续 Step 4 润色"
- ]
- }
- ]
- elif step_id == 'Step 4':
- # Step 4 中断:润色中
- project_root = find_project_root()
- existing_chapter = find_chapter_file(project_root, chapter_num)
- draft_path = None
- if existing_chapter:
- chapter_path = str(existing_chapter.relative_to(project_root))
- else:
- draft_path = default_chapter_draft_path(project_root, chapter_num)
- chapter_path = str(draft_path.relative_to(project_root))
- return [
- {
- 'option': 'A',
- 'label': '继续润色',
- 'risk': 'low',
- 'description': f"继续润色 {chapter_path},完成后进入 Step 5",
- 'actions': [
- f"打开并继续润色 {chapter_path}",
- "保存文件",
- "继续 Step 5(Data Agent)"
- ]
- },
- {
- 'option': 'B',
- 'label': '删除润色稿,从 Step 2 重写',
- 'risk': 'medium',
- 'description': f"删除 {chapter_path},重新生成章节内容",
- 'actions': [
- f"删除 {chapter_path}",
- "清理 Git 暂存区",
- "清理中断状态",
- f"执行 /{command} {chapter_num}"
- ]
- }
- ]
- elif step_id == 'Step 5':
- # Step 5 中断:Data Agent 处理中
- return [{
- 'option': 'A',
- 'label': '从 Step 5 重新开始',
- 'risk': 'low',
- 'description': '重新运行 Data Agent(幂等操作)',
- 'actions': [
- "重新调用 Data Agent",
- "继续 Step 6(Git 备份)"
- ]
- }]
- elif step_id == 'Step 6':
- # Step 6 中断:Git 未提交
- return [
- {
- 'option': 'A',
- 'label': '继续 Git 提交',
- 'risk': 'low',
- 'description': '完成未完成的 Git commit + tag',
- 'actions': [
- "检查 Git 暂存区",
- "重新执行 backup_manager.py",
- "继续完成工作流追踪(complete-task)"
- ]
- },
- {
- 'option': 'B',
- 'label': '回滚 Git 改动',
- 'risk': 'medium',
- 'description': '丢弃暂存区所有改动',
- 'actions': [
- "git reset HEAD .",
- f"删除第{chapter_num}章文件",
- "清理中断状态"
- ]
- }
- ]
- # 默认选项
- return [{
- 'option': 'A',
- 'label': '从头开始',
- 'risk': 'low',
- 'description': '重新执行完整流程',
- 'actions': [
- f"清理所有中断artifacts",
- f"执行 /{command} {chapter_num}"
- ]
- }]
- def cleanup_artifacts(chapter_num):
- """清理半成品artifacts"""
- artifacts_cleaned = []
- project_root = find_project_root()
- # 删除章节文件(兼容多种命名/目录结构)
- chapter_path = find_chapter_file(project_root, chapter_num)
- if chapter_path is None:
- # 可能是“草稿路径”但尚未重命名
- draft_path = default_chapter_draft_path(project_root, chapter_num)
- if draft_path.exists():
- chapter_path = draft_path
- if chapter_path and chapter_path.exists():
- chapter_path.unlink()
- artifacts_cleaned.append(str(chapter_path.relative_to(project_root)))
- # 清理Git暂存区
- result = subprocess.run(
- ['git', 'reset', 'HEAD', '.'],
- cwd=project_root,
- capture_output=True,
- text=True
- )
- if result.returncode == 0:
- artifacts_cleaned.append("Git暂存区已清理(project)")
- return artifacts_cleaned
- def clear_current_task():
- """清除当前中断任务"""
- state = load_state()
- if state.get('current_task'):
- state['current_task'] = None
- save_state(state)
- print("✅ 中断任务已清除")
- else:
- print("⚠️ 无中断任务")
- def load_state():
- """加载workflow状态"""
- state_file = get_workflow_state_path()
- if not state_file.exists():
- return {'current_task': None, 'last_stable_state': None, 'history': []}
- with open(state_file, 'r', encoding='utf-8') as f:
- return json.load(f)
- def save_state(state):
- """保存workflow状态(原子化写入)"""
- state_file = get_workflow_state_path()
- # ============================================================================
- # 安全修复:使用原子化写入(P1 MEDIUM)
- # ============================================================================
- create_secure_directory(str(state_file.parent))
- atomic_write_json(state_file, state, use_lock=True, backup=False)
- def get_pending_steps(command):
- """获取待执行步骤列表 (v5.0)"""
- if command == 'webnovel-write':
- # v5.0 工作流:6 步
- # Step 1: Context Agent 搜集上下文
- # Step 2: 生成章节内容 (纯正文,3000-5000字)
- # Step 3: 审查 (5个Agent并行,只报告)
- # Step 4: 润色 (基于审查报告修复 + 去AI痕迹)
- # Step 5: Data Agent 处理数据链
- # Step 6: Git 备份
- return ['Step 1', 'Step 2', 'Step 3', 'Step 4', 'Step 5', 'Step 6']
- elif command == 'webnovel-review':
- return ['Step 1', 'Step 2', 'Step 3', 'Step 4', 'Step 5', 'Step 6', 'Step 7', 'Step 8']
- # 其他命令...
- return []
- def extract_stable_state(task):
- """提取稳定状态快照"""
- return {
- 'command': task['command'],
- 'chapter_num': task['args'].get('chapter_num'),
- 'completed_at': task.get('completed_at'),
- 'artifacts': task.get('artifacts', {})
- }
- # CLI接口
- if __name__ == '__main__':
- import argparse
- parser = argparse.ArgumentParser(description='工作流状态管理')
- subparsers = parser.add_subparsers(dest='action', help='操作类型')
- # start-task
- p_start_task = subparsers.add_parser('start-task', help='开始新任务')
- p_start_task.add_argument('--command', required=True, help='命令名称')
- p_start_task.add_argument('--chapter', type=int, help='章节号')
- # start-step
- p_start_step = subparsers.add_parser('start-step', help='开始Step')
- p_start_step.add_argument('--step-id', required=True, help='Step ID')
- p_start_step.add_argument('--step-name', required=True, help='Step名称')
- p_start_step.add_argument('--note', help='进度备注')
- # complete-step
- p_complete_step = subparsers.add_parser('complete-step', help='完成Step')
- p_complete_step.add_argument('--step-id', required=True, help='Step ID')
- p_complete_step.add_argument('--artifacts', help='Artifacts JSON')
- # complete-task
- p_complete_task = subparsers.add_parser('complete-task', help='完成任务')
- p_complete_task.add_argument('--artifacts', help='Final artifacts JSON')
- # detect
- subparsers.add_parser('detect', help='检测中断')
- # cleanup
- p_cleanup = subparsers.add_parser('cleanup', help='清理artifacts')
- p_cleanup.add_argument('--chapter', type=int, required=True, help='章节号')
- # clear
- subparsers.add_parser('clear', help='清除中断任务')
- args = parser.parse_args()
- if args.action == 'start-task':
- start_task(args.command, {'chapter_num': args.chapter})
- elif args.action == 'start-step':
- start_step(args.step_id, args.step_name, args.note)
- elif args.action == 'complete-step':
- complete_step(args.step_id, args.artifacts)
- elif args.action == 'complete-task':
- complete_task(args.artifacts)
- elif args.action == 'detect':
- interrupt = detect_interruption()
- if interrupt:
- print("\n🔴 检测到中断任务:")
- print(json.dumps(interrupt, ensure_ascii=False, indent=2))
- print("\n💡 恢复选项:")
- options = analyze_recovery_options(interrupt)
- print(json.dumps(options, ensure_ascii=False, indent=2))
- else:
- print("✅ 无中断任务")
- elif args.action == 'cleanup':
- cleaned = cleanup_artifacts(args.chapter)
- print(f"✅ 已清理: {', '.join(cleaned)}")
- elif args.action == 'clear':
- clear_current_task()
- else:
- parser.print_help()
|