security_utils.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. #!/usr/bin/env python3
  2. """
  3. 安全工具函数库
  4. 用于webnovel-writer系统的通用安全函数
  5. 创建时间: 2026-01-02
  6. 创建原因: 安全审计发现路径遍历和命令注入漏洞
  7. 修复方案: 集中管理所有安全相关的输入清理函数
  8. """
  9. import json
  10. import os
  11. import re
  12. import sys
  13. import tempfile
  14. from pathlib import Path
  15. from typing import Any, Dict, Optional, Union
  16. # 尝试导入 filelock(可选依赖)
  17. try:
  18. from filelock import FileLock
  19. HAS_FILELOCK = True
  20. except ImportError:
  21. HAS_FILELOCK = False
  22. def sanitize_filename(name: str, max_length: int = 100) -> str:
  23. """
  24. 清理文件名,防止路径遍历攻击 (CWE-22)
  25. 安全关键函数 - 修复extract_entities.py路径遍历漏洞
  26. Args:
  27. name: 原始文件名(可能包含路径遍历字符)
  28. max_length: 文件名最大长度(默认100字符)
  29. Returns:
  30. 安全的文件名(仅包含基本文件名,移除所有路径信息)
  31. 示例:
  32. >>> sanitize_filename("../../../etc/passwd")
  33. 'passwd'
  34. >>> sanitize_filename("C:\\Windows\\System32")
  35. 'System32'
  36. >>> sanitize_filename("正常角色名")
  37. '正常角色名'
  38. 安全验证:
  39. - ✅ 防止目录遍历(../、..\\)
  40. - ✅ 防止绝对路径(/、C:\\)
  41. - ✅ 移除特殊字符
  42. - ✅ 长度限制
  43. """
  44. # Step 1: 仅保留基础文件名(移除所有路径)
  45. safe_name = os.path.basename(name)
  46. # Step 2: 移除路径分隔符(双重保险)
  47. safe_name = safe_name.replace('/', '_').replace('\\', '_')
  48. # Step 3: 只保留安全字符
  49. # 允许:中文(\u4e00-\u9fff)、字母(a-zA-Z)、数字(0-9)、下划线(_)、连字符(-)
  50. safe_name = re.sub(r'[^\w\u4e00-\u9fff-]', '_', safe_name)
  51. # Step 4: 移除连续的下划线(美化)
  52. safe_name = re.sub(r'_+', '_', safe_name)
  53. # Step 5: 长度限制
  54. if len(safe_name) > max_length:
  55. safe_name = safe_name[:max_length]
  56. # Step 6: 移除首尾下划线
  57. safe_name = safe_name.strip('_')
  58. # Step 7: 确保非空(防御性编程)
  59. if not safe_name:
  60. safe_name = "unnamed_entity"
  61. return safe_name
  62. def sanitize_commit_message(message: str, max_length: int = 200) -> str:
  63. """
  64. 清理Git提交消息,防止命令注入 (CWE-77)
  65. 安全关键函数 - 修复backup_manager.py命令注入漏洞
  66. Args:
  67. message: 原始提交消息(可能包含Git标志)
  68. max_length: 消息最大长度(默认200字符)
  69. Returns:
  70. 安全的提交消息(移除Git特殊标志和危险字符)
  71. 示例:
  72. >>> sanitize_commit_message("Test\\n--author='Attacker'")
  73. 'Test author Attacker'
  74. >>> sanitize_commit_message("--amend Chapter 1")
  75. 'amend Chapter 1'
  76. 安全验证:
  77. - ✅ 防止多行注入(换行符)
  78. - ✅ 防止Git标志注入(--xxx)
  79. - ✅ 防止参数分隔符混淆(引号)
  80. - ✅ 防止单字母标志(-x)
  81. """
  82. # Step 1: 移除换行符(防止多行参数注入)
  83. safe_msg = message.replace('\n', ' ').replace('\r', ' ')
  84. # Step 2: 移除Git特殊标志(--开头的参数)
  85. safe_msg = re.sub(r'--[\w-]+', '', safe_msg)
  86. # Step 3: 移除引号(防止参数分隔符混淆)
  87. safe_msg = safe_msg.replace("'", "").replace('"', '')
  88. # Step 4: 移除前导的-(防止单字母标志如-m)
  89. safe_msg = safe_msg.lstrip('-')
  90. # Step 5: 移除连续空格(美化)
  91. safe_msg = re.sub(r'\s+', ' ', safe_msg)
  92. # Step 6: 长度限制
  93. if len(safe_msg) > max_length:
  94. safe_msg = safe_msg[:max_length]
  95. # Step 7: 移除首尾空格
  96. safe_msg = safe_msg.strip()
  97. # Step 8: 确保非空
  98. if not safe_msg:
  99. safe_msg = "Untitled commit"
  100. return safe_msg
  101. def create_secure_directory(path: str, mode: int = 0o700) -> Path:
  102. """
  103. 创建安全目录(仅所有者可访问)
  104. 安全关键函数 - 修复文件权限配置缺失漏洞
  105. Args:
  106. path: 目录路径
  107. mode: 权限模式(默认0o700,仅所有者可读写执行)
  108. Returns:
  109. Path对象
  110. 示例:
  111. >>> create_secure_directory('.webnovel')
  112. PosixPath('.webnovel') # drwx------ (700)
  113. 安全验证:
  114. - ✅ 仅所有者可访问(0o700)
  115. - ✅ 防止同组用户读取
  116. - ✅ 跨平台兼容(Windows/Linux/macOS)
  117. """
  118. path_obj = Path(path)
  119. # 创建目录(设置安全权限)
  120. os.makedirs(path, mode=mode, exist_ok=True)
  121. # 双重保险:显式设置权限(某些系统可能忽略makedirs的mode参数)
  122. if os.name != 'nt': # Unix系统(Linux/macOS)
  123. os.chmod(path, mode)
  124. return path_obj
  125. def create_secure_file(file_path: str, content: str, mode: int = 0o600) -> None:
  126. """
  127. 创建安全文件(仅所有者可读写)
  128. Args:
  129. file_path: 文件路径
  130. content: 文件内容
  131. mode: 权限模式(默认0o600,仅所有者可读写)
  132. 安全验证:
  133. - ✅ 仅所有者可读写(0o600)
  134. - ✅ 防止其他用户访问
  135. """
  136. # 创建文件
  137. with open(file_path, 'w', encoding='utf-8') as f:
  138. f.write(content)
  139. # 设置权限(仅Unix系统)
  140. if os.name != 'nt':
  141. os.chmod(file_path, mode)
  142. def validate_integer_input(value: str, field_name: str) -> int:
  143. """
  144. 验证并转换整数输入(严格模式)
  145. 安全关键函数 - 修复update_state.py弱验证漏洞
  146. Args:
  147. value: 输入值(字符串)
  148. field_name: 字段名称(用于错误消息)
  149. Returns:
  150. 转换后的整数
  151. Raises:
  152. ValueError: 输入不是有效整数
  153. 示例:
  154. >>> validate_integer_input("123", "chapter_num")
  155. 123
  156. >>> validate_integer_input("abc", "level")
  157. ValueError: ❌ 错误:level 必须是整数,收到: abc
  158. """
  159. try:
  160. return int(value)
  161. except ValueError:
  162. print(f"❌ 错误:{field_name} 必须是整数,收到: {value}", file=sys.stderr)
  163. raise ValueError(f"Invalid integer input for {field_name}: {value}")
  164. # ============================================================================
  165. # Git 环境检测(优雅降级支持)
  166. # ============================================================================
  167. # 缓存 Git 可用性检测结果
  168. _git_available: Optional[bool] = None
  169. def is_git_available() -> bool:
  170. """
  171. 检测 Git 是否可用
  172. Returns:
  173. bool: Git 是否可用
  174. 说明:
  175. - 检测结果会被缓存,避免重复检测
  176. - 用于支持在无 Git 环境下优雅降级
  177. """
  178. global _git_available
  179. if _git_available is not None:
  180. return _git_available
  181. import subprocess
  182. try:
  183. result = subprocess.run(
  184. ["git", "--version"],
  185. capture_output=True,
  186. text=True,
  187. timeout=5
  188. )
  189. _git_available = result.returncode == 0
  190. except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
  191. _git_available = False
  192. return _git_available
  193. def is_git_repo(path: Union[str, Path]) -> bool:
  194. """
  195. 检测指定目录是否是 Git 仓库
  196. Args:
  197. path: 目录路径
  198. Returns:
  199. bool: 是否是 Git 仓库
  200. """
  201. if not is_git_available():
  202. return False
  203. path = Path(path)
  204. git_dir = path / ".git"
  205. return git_dir.exists() and git_dir.is_dir()
  206. def git_graceful_operation(
  207. args: list,
  208. cwd: Union[str, Path],
  209. *,
  210. fallback_msg: str = "Git 不可用,跳过版本控制操作"
  211. ) -> tuple:
  212. """
  213. 优雅执行 Git 操作(Git 不可用时静默降级)
  214. Args:
  215. args: Git 命令参数(不含 'git')
  216. cwd: 工作目录
  217. fallback_msg: 降级时的提示消息
  218. Returns:
  219. (success: bool, output: str, was_skipped: bool)
  220. - success: 操作是否成功
  221. - output: 输出内容
  222. - was_skipped: 是否因 Git 不可用而跳过
  223. 示例:
  224. >>> success, output, skipped = git_graceful_operation(
  225. ... ["add", "."], cwd="/path/to/project"
  226. ... )
  227. >>> if skipped:
  228. ... print("Git not available, using fallback")
  229. """
  230. if not is_git_available():
  231. print(f"⚠️ {fallback_msg}", file=sys.stderr)
  232. return False, "", True
  233. import subprocess
  234. try:
  235. result = subprocess.run(
  236. ["git"] + args,
  237. cwd=cwd,
  238. capture_output=True,
  239. text=True,
  240. encoding='utf-8',
  241. timeout=60
  242. )
  243. return result.returncode == 0, result.stdout, False
  244. except subprocess.TimeoutExpired:
  245. print(f"⚠️ Git 操作超时: git {' '.join(args)}", file=sys.stderr)
  246. return False, "", False
  247. except OSError as e:
  248. print(f"⚠️ Git 操作失败: {e}", file=sys.stderr)
  249. return False, "", False
  250. # ============================================================================
  251. # 原子化文件写入(防止并发冲突和数据损坏)
  252. # ============================================================================
  253. class AtomicWriteError(Exception):
  254. """原子写入失败异常"""
  255. pass
  256. def atomic_write_json(
  257. file_path: Union[str, Path],
  258. data: Dict[str, Any],
  259. *,
  260. use_lock: bool = True,
  261. backup: bool = True,
  262. indent: int = 2
  263. ) -> None:
  264. """
  265. 原子化写入 JSON 文件,防止并发冲突和数据损坏 (CWE-362, CWE-367)
  266. 安全关键函数 - 修复 state.json 并发写入风险
  267. 实现策略:
  268. 1. 写入临时文件(同目录,确保同文件系统)
  269. 2. 可选:使用 filelock 获取排他锁
  270. 3. 可选:备份原文件
  271. 4. 原子重命名(os.replace 在 POSIX 上是原子的)
  272. Args:
  273. file_path: 目标文件路径
  274. data: 要写入的字典数据
  275. use_lock: 是否使用文件锁(需要 filelock 库)
  276. backup: 是否在写入前备份原文件
  277. indent: JSON 缩进(默认 2)
  278. Raises:
  279. AtomicWriteError: 写入失败时抛出
  280. 示例:
  281. >>> atomic_write_json('.webnovel/state.json', {'progress': {'chapter': 10}})
  282. 安全验证:
  283. - ✅ 防止写入中断导致的数据损坏(先写临时文件)
  284. - ✅ 防止并发写入冲突(filelock)
  285. - ✅ 支持回滚(备份机制)
  286. - ✅ 跨平台兼容
  287. """
  288. file_path = Path(file_path)
  289. parent_dir = file_path.parent
  290. parent_dir.mkdir(parents=True, exist_ok=True)
  291. # 准备 JSON 内容
  292. try:
  293. json_content = json.dumps(data, ensure_ascii=False, indent=indent)
  294. except (TypeError, ValueError) as e:
  295. raise AtomicWriteError(f"JSON 序列化失败: {e}")
  296. # 锁文件路径
  297. lock_path = file_path.with_suffix(file_path.suffix + '.lock')
  298. backup_path = file_path.with_suffix(file_path.suffix + '.bak')
  299. # 创建临时文件(同目录确保同文件系统,os.replace 才能原子操作)
  300. fd, temp_path = tempfile.mkstemp(
  301. suffix='.tmp',
  302. prefix=file_path.stem + '_',
  303. dir=parent_dir
  304. )
  305. try:
  306. # Step 1: 写入临时文件
  307. with os.fdopen(fd, 'w', encoding='utf-8') as f:
  308. f.write(json_content)
  309. f.flush()
  310. os.fsync(f.fileno()) # 确保写入磁盘
  311. # Step 2: 获取锁(如果可用且启用)
  312. lock = None
  313. if use_lock and HAS_FILELOCK:
  314. lock = FileLock(str(lock_path), timeout=10)
  315. lock.acquire()
  316. try:
  317. # Step 3: 备份原文件(如果存在且启用备份)
  318. if backup and file_path.exists():
  319. try:
  320. import shutil
  321. shutil.copy2(file_path, backup_path)
  322. except OSError:
  323. pass # 备份失败不阻止写入
  324. # Step 4: 原子重命名
  325. os.replace(temp_path, file_path)
  326. temp_path = None # 标记已成功,不需要清理
  327. finally:
  328. if lock is not None:
  329. lock.release()
  330. except Exception as e:
  331. raise AtomicWriteError(f"原子写入失败: {e}")
  332. finally:
  333. # 清理:删除临时文件(如果仍存在说明写入失败)
  334. if temp_path is not None:
  335. try:
  336. os.unlink(temp_path)
  337. except OSError:
  338. pass
  339. def read_json_safe(
  340. file_path: Union[str, Path],
  341. default: Optional[Dict[str, Any]] = None
  342. ) -> Dict[str, Any]:
  343. """
  344. 安全读取 JSON 文件(带默认值和错误处理)
  345. Args:
  346. file_path: 文件路径
  347. default: 文件不存在或解析失败时的默认值
  348. Returns:
  349. 解析后的字典,或默认值
  350. 示例:
  351. >>> state = read_json_safe('.webnovel/state.json', {})
  352. """
  353. file_path = Path(file_path)
  354. if default is None:
  355. default = {}
  356. if not file_path.exists():
  357. return default
  358. try:
  359. with open(file_path, 'r', encoding='utf-8') as f:
  360. return json.load(f)
  361. except (json.JSONDecodeError, OSError) as e:
  362. print(f"⚠️ 读取 JSON 失败 ({file_path}): {e}", file=sys.stderr)
  363. return default
  364. def restore_from_backup(file_path: Union[str, Path]) -> bool:
  365. """
  366. 从备份恢复文件
  367. Args:
  368. file_path: 原文件路径
  369. Returns:
  370. 是否成功恢复
  371. 示例:
  372. >>> restore_from_backup('.webnovel/state.json')
  373. True
  374. """
  375. file_path = Path(file_path)
  376. backup_path = file_path.with_suffix(file_path.suffix + '.bak')
  377. if not backup_path.exists():
  378. print(f"⚠️ 备份文件不存在: {backup_path}", file=sys.stderr)
  379. return False
  380. try:
  381. import shutil
  382. shutil.copy2(backup_path, file_path)
  383. print(f"✅ 已从备份恢复: {file_path}")
  384. return True
  385. except OSError as e:
  386. print(f"❌ 恢复失败: {e}", file=sys.stderr)
  387. return False
  388. # ============================================================================
  389. # 单元测试(内置自检)
  390. # ============================================================================
  391. def _run_self_tests():
  392. """运行内置安全测试"""
  393. print("🔍 运行安全工具函数自检...")
  394. # Test 1: sanitize_filename
  395. assert sanitize_filename("../../../etc/passwd") == "passwd", "路径遍历测试失败"
  396. assert sanitize_filename("C:\\Windows\\System32") == "System32", "Windows路径测试失败"
  397. assert sanitize_filename("正常角色名") == "正常角色名", "中文测试失败"
  398. assert sanitize_filename("/tmp/../../../../../etc/hosts") == "hosts", "复杂路径遍历测试失败"
  399. assert sanitize_filename("test///file...name") == "file_name", "特殊字符测试失败" # . 会被替换
  400. print(" ✅ sanitize_filename: 所有测试通过")
  401. # Test 2: sanitize_commit_message
  402. result = sanitize_commit_message("Test\n--author='Attacker'")
  403. assert "\n" not in result, "换行符未移除"
  404. assert "--author" not in result, "Git标志未移除"
  405. assert "Attacker" in result, "内容被错误移除"
  406. assert sanitize_commit_message("--amend Chapter 1") == "Chapter 1", "Git标志测试失败" # --amend被完全移除
  407. assert "'" not in sanitize_commit_message("Test'message"), "引号测试失败"
  408. assert sanitize_commit_message("-m Test") == "m Test", "单字母标志测试失败" # -m被移除后是"m Test"
  409. print(" ✅ sanitize_commit_message: 所有测试通过")
  410. # Test 3: validate_integer_input
  411. assert validate_integer_input("123", "test") == 123, "整数验证测试失败"
  412. try:
  413. validate_integer_input("abc", "test")
  414. assert False, "应该抛出ValueError"
  415. except ValueError:
  416. pass
  417. print(" ✅ validate_integer_input: 所有测试通过")
  418. # Test 4: atomic_write_json
  419. import tempfile as tf
  420. test_dir = Path(tf.mkdtemp())
  421. test_file = test_dir / "test_state.json"
  422. # 写入测试
  423. test_data = {"chapter": 10, "中文键": "中文值"}
  424. atomic_write_json(test_file, test_data, use_lock=False, backup=False)
  425. assert test_file.exists(), "原子写入未创建文件"
  426. # 读取验证
  427. with open(test_file, 'r', encoding='utf-8') as f:
  428. loaded = json.load(f)
  429. assert loaded == test_data, "原子写入数据不匹配"
  430. # 备份测试
  431. atomic_write_json(test_file, {"updated": True}, use_lock=False, backup=True)
  432. backup_file = test_file.with_suffix('.json.bak')
  433. assert backup_file.exists(), "备份未创建"
  434. # 恢复测试
  435. restore_from_backup(test_file)
  436. with open(test_file, 'r', encoding='utf-8') as f:
  437. restored = json.load(f)
  438. assert restored == test_data, "恢复数据不匹配"
  439. # 清理
  440. import shutil
  441. shutil.rmtree(test_dir)
  442. print(" ✅ atomic_write_json: 所有测试通过")
  443. if HAS_FILELOCK:
  444. print(" ℹ️ filelock 可用,已启用文件锁支持")
  445. else:
  446. print(" ⚠️ filelock 未安装,文件锁功能不可用")
  447. print("\n✅ 所有安全工具函数测试通过!")
  448. if __name__ == "__main__":
  449. # Windows UTF-8 编码修复(必须在打印前执行)
  450. if sys.platform == 'win32':
  451. import io
  452. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
  453. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
  454. # 运行自检测试
  455. _run_self_tests()