|
|
@@ -26,6 +26,26 @@ if sys.platform == 'win32':
|
|
|
|
|
|
WORKFLOW_STATE_FILE = '.webnovel/workflow_state.json'
|
|
|
|
|
|
+def find_project_root():
|
|
|
+ """动态查找包含 .webnovel/ 的项目根目录"""
|
|
|
+ # 优先检查环境变量
|
|
|
+ if os.environ.get('WEBNOVEL_PROJECT_ROOT'):
|
|
|
+ return Path(os.environ['WEBNOVEL_PROJECT_ROOT'])
|
|
|
+
|
|
|
+ # 从当前目录往上查找
|
|
|
+ current = Path.cwd()
|
|
|
+ for parent in [current] + list(current.parents):
|
|
|
+ if (parent / '.webnovel').exists():
|
|
|
+ return parent
|
|
|
+
|
|
|
+ # 默认返回当前目录(向后兼容)
|
|
|
+ return current
|
|
|
+
|
|
|
+def get_workflow_state_path():
|
|
|
+ """获取 workflow_state.json 的完整路径"""
|
|
|
+ project_root = find_project_root()
|
|
|
+ return project_root / '.webnovel' / 'workflow_state.json'
|
|
|
+
|
|
|
def start_task(command, args):
|
|
|
"""开始新任务"""
|
|
|
state = load_state()
|
|
|
@@ -360,21 +380,23 @@ def clear_current_task():
|
|
|
|
|
|
def load_state():
|
|
|
"""加载workflow状态"""
|
|
|
- if not os.path.exists(WORKFLOW_STATE_FILE):
|
|
|
+ state_file = get_workflow_state_path()
|
|
|
+ if not state_file.exists():
|
|
|
return {'current_task': None, 'last_stable_state': None, 'history': []}
|
|
|
- with open(WORKFLOW_STATE_FILE, 'r', encoding='utf-8') as f:
|
|
|
+ with open(state_file, 'r', encoding='utf-8') as f:
|
|
|
return json.load(f)
|
|
|
|
|
|
def save_state(state):
|
|
|
"""保存workflow状态"""
|
|
|
+ state_file = get_workflow_state_path()
|
|
|
# ============================================================================
|
|
|
# 安全修复:使用安全目录创建函数(P1 MEDIUM)
|
|
|
# 原代码: os.makedirs(os.path.dirname(WORKFLOW_STATE_FILE), exist_ok=True)
|
|
|
# 漏洞: 未设置权限,使用OS默认(可能为755,允许同组用户读取)
|
|
|
# ============================================================================
|
|
|
- create_secure_directory(os.path.dirname(WORKFLOW_STATE_FILE))
|
|
|
+ create_secure_directory(str(state_file.parent))
|
|
|
|
|
|
- with open(WORKFLOW_STATE_FILE, 'w', encoding='utf-8') as f:
|
|
|
+ with open(state_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(state, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def get_pending_steps(command):
|