security_utils.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. #!/usr/bin/env python3
  2. """
  3. 安全工具函数库
  4. 用于webnovel-writer系统的通用安全函数
  5. 创建时间: 2026-01-02
  6. 创建原因: 安全审计发现路径遍历和命令注入漏洞
  7. 修复方案: 集中管理所有安全相关的输入清理函数
  8. """
  9. import json
  10. import os
  11. import re
  12. import sys
  13. import tempfile
  14. from pathlib import Path
  15. from runtime_compat import enable_windows_utf8_stdio
  16. from typing import Any, Dict, Optional, Union
  17. # 尝试导入 filelock(可选依赖)
  18. try:
  19. from filelock import FileLock
  20. HAS_FILELOCK = True
  21. except (ImportError, OSError):
  22. FileLock = None # type: ignore[assignment]
  23. HAS_FILELOCK = False
  24. def sanitize_filename(name: str, max_length: int = 100) -> str:
  25. """
  26. 清理文件名,防止路径遍历攻击 (CWE-22)
  27. 安全关键函数 - 修复extract_entities.py路径遍历漏洞
  28. Args:
  29. name: 原始文件名(可能包含路径遍历字符)
  30. max_length: 文件名最大长度(默认100字符)
  31. Returns:
  32. 安全的文件名(仅包含基本文件名,移除所有路径信息)
  33. 示例:
  34. >>> sanitize_filename("../../../etc/passwd")
  35. 'passwd'
  36. >>> sanitize_filename("C:\\Windows\\System32")
  37. 'System32'
  38. >>> sanitize_filename("正常角色名")
  39. '正常角色名'
  40. 安全验证:
  41. - ✅ 防止目录遍历(../、..\\)
  42. - ✅ 防止绝对路径(/、C:\\)
  43. - ✅ 移除特殊字符
  44. - ✅ 长度限制
  45. """
  46. # Step 1: 仅保留基础文件名(移除所有路径)
  47. safe_name = os.path.basename(name)
  48. # Step 2: 移除路径分隔符(双重保险)
  49. safe_name = safe_name.replace('/', '_').replace('\\', '_')
  50. # Step 3: 只保留安全字符
  51. # 允许:中文(\u4e00-\u9fff)、字母(a-zA-Z)、数字(0-9)、下划线(_)、连字符(-)
  52. safe_name = re.sub(r'[^\w\u4e00-\u9fff-]', '_', safe_name)
  53. # Step 4: 移除连续的下划线(美化)
  54. safe_name = re.sub(r'_+', '_', safe_name)
  55. # Step 5: 长度限制
  56. if len(safe_name) > max_length:
  57. safe_name = safe_name[:max_length]
  58. # Step 6: 移除首尾下划线
  59. safe_name = safe_name.strip('_')
  60. # Step 7: 确保非空(防御性编程)
  61. if not safe_name:
  62. safe_name = "unnamed_entity"
  63. return safe_name
  64. def sanitize_commit_message(message: str, max_length: int = 200) -> str:
  65. """
  66. 清理Git提交消息,防止命令注入 (CWE-77)
  67. 安全关键函数 - 修复backup_manager.py命令注入漏洞
  68. Args:
  69. message: 原始提交消息(可能包含Git标志)
  70. max_length: 消息最大长度(默认200字符)
  71. Returns:
  72. 安全的提交消息(移除Git特殊标志和危险字符)
  73. 示例:
  74. >>> sanitize_commit_message("Test\\n--author='Attacker'")
  75. 'Test author Attacker'
  76. >>> sanitize_commit_message("--amend Chapter 1")
  77. 'amend Chapter 1'
  78. 安全验证:
  79. - ✅ 防止多行注入(换行符)
  80. - ✅ 防止Git标志注入(--xxx)
  81. - ✅ 防止参数分隔符混淆(引号)
  82. - ✅ 防止单字母标志(-x)
  83. """
  84. # Step 1: 移除换行符(防止多行参数注入)
  85. safe_msg = message.replace('\n', ' ').replace('\r', ' ')
  86. # Step 2: 移除Git特殊标志(--开头的参数)
  87. safe_msg = re.sub(r'--[\w-]+', '', safe_msg)
  88. # Step 3: 移除引号(防止参数分隔符混淆)
  89. safe_msg = safe_msg.replace("'", "").replace('"', '')
  90. # Step 4: 移除前导的-(防止单字母标志如-m)
  91. safe_msg = safe_msg.lstrip('-')
  92. # Step 5: 移除连续空格(美化)
  93. safe_msg = re.sub(r'\s+', ' ', safe_msg)
  94. # Step 6: 长度限制
  95. if len(safe_msg) > max_length:
  96. safe_msg = safe_msg[:max_length]
  97. # Step 7: 移除首尾空格
  98. safe_msg = safe_msg.strip()
  99. # Step 8: 确保非空
  100. if not safe_msg:
  101. safe_msg = "Untitled commit"
  102. return safe_msg
  103. def create_secure_directory(path: str, mode: int = 0o700) -> Path:
  104. """
  105. 创建安全目录(仅所有者可访问)
  106. 安全关键函数 - 修复文件权限配置缺失漏洞
  107. Args:
  108. path: 目录路径
  109. mode: 权限模式(默认0o700,仅所有者可读写执行)
  110. Returns:
  111. Path对象
  112. 示例:
  113. >>> create_secure_directory('.webnovel')
  114. PosixPath('.webnovel') # drwx------ (700)
  115. 安全验证:
  116. - ✅ 仅所有者可访问(0o700)
  117. - ✅ 防止同组用户读取
  118. - ✅ 跨平台兼容(Windows/Linux/macOS)
  119. """
  120. path_obj = Path(path)
  121. # Windows 上传入 mode 会触发不可预期的 ACL 行为(实测会导致目录创建后立刻无法访问)。
  122. # 因此在 Windows 下不传 mode,保持默认继承权限;在类 Unix 系统才使用 mode。
  123. if os.name == 'nt':
  124. os.makedirs(path, exist_ok=True)
  125. else:
  126. os.makedirs(path, mode=mode, exist_ok=True)
  127. # 双重保险:显式设置权限(某些系统可能忽略makedirs的mode参数)
  128. if os.name != 'nt': # Unix系统(Linux/macOS)
  129. os.chmod(path, mode)
  130. return path_obj
  131. def create_secure_file(file_path: str, content: str, mode: int = 0o600) -> None:
  132. """
  133. 创建安全文件(仅所有者可读写)
  134. Args:
  135. file_path: 文件路径
  136. content: 文件内容
  137. mode: 权限模式(默认0o600,仅所有者可读写)
  138. 安全验证:
  139. - ✅ 仅所有者可读写(0o600)
  140. - ✅ 防止其他用户访问
  141. """
  142. # 创建文件
  143. with open(file_path, 'w', encoding='utf-8') as f:
  144. f.write(content)
  145. # 设置权限(仅Unix系统)
  146. if os.name != 'nt':
  147. os.chmod(file_path, mode)
  148. def validate_integer_input(value: str, field_name: str) -> int:
  149. """
  150. 验证并转换整数输入(严格模式)
  151. 安全关键函数 - 修复update_state.py弱验证漏洞
  152. Args:
  153. value: 输入值(字符串)
  154. field_name: 字段名称(用于错误消息)
  155. Returns:
  156. 转换后的整数
  157. Raises:
  158. ValueError: 输入不是有效整数
  159. 示例:
  160. >>> validate_integer_input("123", "chapter_num")
  161. 123
  162. >>> validate_integer_input("abc", "level")
  163. ValueError: ❌ 错误:level 必须是整数,收到: abc
  164. """
  165. try:
  166. return int(value)
  167. except ValueError:
  168. print(f"❌ 错误:{field_name} 必须是整数,收到: {value}", file=sys.stderr)
  169. raise ValueError(f"Invalid integer input for {field_name}: {value}")
  170. # ============================================================================
  171. # Git 环境检测(优雅降级支持)
  172. # ============================================================================
  173. # 缓存 Git 可用性检测结果
  174. _git_available: Optional[bool] = None
  175. def is_git_available() -> bool:
  176. """
  177. 检测 Git 是否可用
  178. Returns:
  179. bool: Git 是否可用
  180. 说明:
  181. - 检测结果会被缓存,避免重复检测
  182. - 用于支持在无 Git 环境下优雅降级
  183. """
  184. global _git_available
  185. if _git_available is not None:
  186. return _git_available
  187. import subprocess
  188. try:
  189. result = subprocess.run(
  190. ["git", "--version"],
  191. capture_output=True,
  192. text=True,
  193. timeout=5
  194. )
  195. _git_available = result.returncode == 0
  196. except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
  197. _git_available = False
  198. return _git_available
  199. def is_git_repo(path: Union[str, Path]) -> bool:
  200. """
  201. 检测指定目录是否是 Git 仓库
  202. Args:
  203. path: 目录路径
  204. Returns:
  205. bool: 是否是 Git 仓库
  206. """
  207. if not is_git_available():
  208. return False
  209. path = Path(path)
  210. git_dir = path / ".git"
  211. return git_dir.exists() and git_dir.is_dir()
  212. def git_graceful_operation(
  213. args: list,
  214. cwd: Union[str, Path],
  215. *,
  216. fallback_msg: str = "Git 不可用,跳过版本控制操作"
  217. ) -> tuple:
  218. """
  219. 优雅执行 Git 操作(Git 不可用时静默降级)
  220. Args:
  221. args: Git 命令参数(不含 'git')
  222. cwd: 工作目录
  223. fallback_msg: 降级时的提示消息
  224. Returns:
  225. (success: bool, output: str, was_skipped: bool)
  226. - success: 操作是否成功
  227. - output: 输出内容
  228. - was_skipped: 是否因 Git 不可用而跳过
  229. 示例:
  230. >>> success, output, skipped = git_graceful_operation(
  231. ... ["add", "."], cwd="/path/to/project"
  232. ... )
  233. >>> if skipped:
  234. ... print("Git not available, using fallback")
  235. """
  236. if not is_git_available():
  237. print(f"⚠️ {fallback_msg}", file=sys.stderr)
  238. return False, "", True
  239. import subprocess
  240. try:
  241. result = subprocess.run(
  242. ["git"] + args,
  243. cwd=cwd,
  244. capture_output=True,
  245. text=True,
  246. encoding='utf-8',
  247. timeout=60
  248. )
  249. return result.returncode == 0, result.stdout, False
  250. except subprocess.TimeoutExpired:
  251. print(f"⚠️ Git 操作超时: git {' '.join(args)}", file=sys.stderr)
  252. return False, "", False
  253. except OSError as e:
  254. print(f"⚠️ Git 操作失败: {e}", file=sys.stderr)
  255. return False, "", False
  256. # ============================================================================
  257. # 原子化文件写入(防止并发冲突和数据损坏)
  258. # ============================================================================
  259. class AtomicWriteError(Exception):
  260. """原子写入失败异常"""
  261. pass
  262. def atomic_write_json(
  263. file_path: Union[str, Path],
  264. data: Dict[str, Any],
  265. *,
  266. use_lock: bool = True,
  267. backup: bool = True,
  268. indent: int = 2
  269. ) -> None:
  270. """
  271. 原子化写入 JSON 文件,防止并发冲突和数据损坏 (CWE-362, CWE-367)
  272. 安全关键函数 - 修复 state.json 并发写入风险
  273. 实现策略:
  274. 1. 写入临时文件(同目录,确保同文件系统)
  275. 2. 可选:使用 filelock 获取排他锁
  276. 3. 可选:备份原文件
  277. 4. 原子重命名(os.replace 在 POSIX 上是原子的)
  278. Args:
  279. file_path: 目标文件路径
  280. data: 要写入的字典数据
  281. use_lock: 是否使用文件锁(需要 filelock 库)
  282. backup: 是否在写入前备份原文件
  283. indent: JSON 缩进(默认 2)
  284. Raises:
  285. AtomicWriteError: 写入失败时抛出
  286. 示例:
  287. >>> atomic_write_json('.webnovel/state.json', {'progress': {'chapter': 10}})
  288. 安全验证:
  289. - ✅ 防止写入中断导致的数据损坏(先写临时文件)
  290. - ✅ 防止并发写入冲突(filelock)
  291. - ✅ 支持回滚(备份机制)
  292. - ✅ 跨平台兼容
  293. """
  294. file_path = Path(file_path)
  295. parent_dir = file_path.parent
  296. parent_dir.mkdir(parents=True, exist_ok=True)
  297. # 准备 JSON 内容
  298. try:
  299. json_content = json.dumps(data, ensure_ascii=False, indent=indent)
  300. except (TypeError, ValueError) as e:
  301. raise AtomicWriteError(f"JSON 序列化失败: {e}")
  302. # 锁文件路径
  303. lock_path = file_path.with_suffix(file_path.suffix + '.lock')
  304. backup_path = file_path.with_suffix(file_path.suffix + '.bak')
  305. # 创建临时文件(同目录确保同文件系统,os.replace 才能原子操作)
  306. fd, temp_path = tempfile.mkstemp(
  307. suffix='.tmp',
  308. prefix=file_path.stem + '_',
  309. dir=parent_dir
  310. )
  311. try:
  312. # Step 1: 写入临时文件
  313. with os.fdopen(fd, 'w', encoding='utf-8') as f:
  314. f.write(json_content)
  315. f.flush()
  316. os.fsync(f.fileno()) # 确保写入磁盘
  317. # Step 2: 获取锁(如果可用且启用)
  318. lock = None
  319. if use_lock and HAS_FILELOCK:
  320. lock = FileLock(str(lock_path), timeout=10)
  321. lock.acquire()
  322. try:
  323. # Step 3: 备份原文件(如果存在且启用备份)
  324. if backup and file_path.exists():
  325. try:
  326. import shutil
  327. shutil.copy2(file_path, backup_path)
  328. except OSError:
  329. pass # 备份失败不阻止写入
  330. # Step 4: 原子重命名
  331. try:
  332. os.replace(temp_path, file_path)
  333. temp_path = None # 标记已成功,不需要清理
  334. except PermissionError:
  335. if os.environ.get("WEBNOVEL_TEST_RELAX_ATOMIC_REPLACE") != "1":
  336. raise
  337. # 测试沙箱可能允许写入但拒绝替换/删除既有文件;生产环境不启用该降级。
  338. with open(file_path, "w", encoding="utf-8") as f:
  339. f.write(json_content)
  340. f.flush()
  341. os.fsync(f.fileno())
  342. finally:
  343. if lock is not None:
  344. lock.release()
  345. except Exception as e:
  346. raise AtomicWriteError(f"原子写入失败: {e}")
  347. finally:
  348. # 清理:删除临时文件(如果仍存在说明写入失败)
  349. if temp_path is not None:
  350. try:
  351. os.unlink(temp_path)
  352. except OSError:
  353. pass
  354. def read_json_safe(
  355. file_path: Union[str, Path],
  356. default: Optional[Dict[str, Any]] = None
  357. ) -> Dict[str, Any]:
  358. """
  359. 安全读取 JSON 文件(带默认值和错误处理)
  360. Args:
  361. file_path: 文件路径
  362. default: 文件不存在或解析失败时的默认值
  363. Returns:
  364. 解析后的字典,或默认值
  365. 示例:
  366. >>> state = read_json_safe('.webnovel/state.json', {})
  367. """
  368. file_path = Path(file_path)
  369. if default is None:
  370. default = {}
  371. if not file_path.exists():
  372. return default
  373. try:
  374. with open(file_path, 'r', encoding='utf-8') as f:
  375. return json.load(f)
  376. except (json.JSONDecodeError, OSError) as e:
  377. print(f"⚠️ 读取 JSON 失败 ({file_path}): {e}", file=sys.stderr)
  378. return default
  379. def restore_from_backup(file_path: Union[str, Path]) -> bool:
  380. """
  381. 从备份恢复文件
  382. Args:
  383. file_path: 原文件路径
  384. Returns:
  385. 是否成功恢复
  386. 示例:
  387. >>> restore_from_backup('.webnovel/state.json')
  388. True
  389. """
  390. file_path = Path(file_path)
  391. backup_path = file_path.with_suffix(file_path.suffix + '.bak')
  392. if not backup_path.exists():
  393. print(f"⚠️ 备份文件不存在: {backup_path}", file=sys.stderr)
  394. return False
  395. try:
  396. import shutil
  397. shutil.copy2(backup_path, file_path)
  398. print(f"✅ 已从备份恢复: {file_path}")
  399. return True
  400. except OSError as e:
  401. print(f"❌ 恢复失败: {e}", file=sys.stderr)
  402. return False
  403. # ============================================================================
  404. # 单元测试(内置自检)
  405. # ============================================================================
  406. def _run_self_tests():
  407. """运行内置安全测试"""
  408. print("🔍 运行安全工具函数自检...")
  409. # Test 1: sanitize_filename
  410. assert sanitize_filename("../../../etc/passwd") == "passwd", "路径遍历测试失败"
  411. assert sanitize_filename("C:\\Windows\\System32") == "System32", "Windows路径测试失败"
  412. assert sanitize_filename("正常角色名") == "正常角色名", "中文测试失败"
  413. assert sanitize_filename("/tmp/../../../../../etc/hosts") == "hosts", "复杂路径遍历测试失败"
  414. assert sanitize_filename("test///file...name") == "file_name", "特殊字符测试失败" # . 会被替换
  415. print(" ✅ sanitize_filename: 所有测试通过")
  416. # Test 2: sanitize_commit_message
  417. result = sanitize_commit_message("Test\n--author='Attacker'")
  418. assert "\n" not in result, "换行符未移除"
  419. assert "--author" not in result, "Git标志未移除"
  420. assert "Attacker" in result, "内容被错误移除"
  421. assert sanitize_commit_message("--amend Chapter 1") == "Chapter 1", "Git标志测试失败" # --amend被完全移除
  422. assert "'" not in sanitize_commit_message("Test'message"), "引号测试失败"
  423. assert sanitize_commit_message("-m Test") == "m Test", "单字母标志测试失败" # -m被移除后是"m Test"
  424. print(" ✅ sanitize_commit_message: 所有测试通过")
  425. # Test 3: validate_integer_input
  426. assert validate_integer_input("123", "test") == 123, "整数验证测试失败"
  427. try:
  428. validate_integer_input("abc", "test")
  429. assert False, "应该抛出ValueError"
  430. except ValueError:
  431. pass
  432. print(" ✅ validate_integer_input: 所有测试通过")
  433. # Test 4: atomic_write_json
  434. import tempfile as tf
  435. test_dir = Path(tf.mkdtemp())
  436. test_file = test_dir / "test_state.json"
  437. # 写入测试
  438. test_data = {"chapter": 10, "中文键": "中文值"}
  439. atomic_write_json(test_file, test_data, use_lock=False, backup=False)
  440. assert test_file.exists(), "原子写入未创建文件"
  441. # 读取验证
  442. with open(test_file, 'r', encoding='utf-8') as f:
  443. loaded = json.load(f)
  444. assert loaded == test_data, "原子写入数据不匹配"
  445. # 备份测试
  446. atomic_write_json(test_file, {"updated": True}, use_lock=False, backup=True)
  447. backup_file = test_file.with_suffix('.json.bak')
  448. assert backup_file.exists(), "备份未创建"
  449. # 恢复测试
  450. restore_from_backup(test_file)
  451. with open(test_file, 'r', encoding='utf-8') as f:
  452. restored = json.load(f)
  453. assert restored == test_data, "恢复数据不匹配"
  454. # 清理
  455. import shutil
  456. shutil.rmtree(test_dir)
  457. print(" ✅ atomic_write_json: 所有测试通过")
  458. if HAS_FILELOCK:
  459. print(" ℹ️ filelock 可用,已启用文件锁支持")
  460. else:
  461. print(" ⚠️ filelock 未安装,文件锁功能不可用")
  462. print("\n✅ 所有安全工具函数测试通过!")
  463. if __name__ == "__main__":
  464. # Windows UTF-8 编码修复(必须在打印前执行)
  465. if sys.platform == "win32":
  466. enable_windows_utf8_stdio()
  467. # 运行自检测试
  468. _run_self_tests()