watcher.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. """
  2. Watchdog 文件变更监听器 + SSE 推送
  3. 监控 PROJECT_ROOT/.webnovel/ 目录下 state.json / index.db 等文件的写事件,
  4. 通过 SSE 通知所有已连接的前端客户端刷新数据。
  5. """
  6. import asyncio
  7. import json
  8. import time
  9. from pathlib import Path
  10. from typing import AsyncGenerator
  11. from watchdog.observers import Observer
  12. from watchdog.events import FileSystemEventHandler, FileModifiedEvent, FileCreatedEvent
  13. class _WebnovelFileHandler(FileSystemEventHandler):
  14. """仅关注 .webnovel/ 目录下关键文件的修改/创建事件。"""
  15. WATCH_NAMES = {"state.json", "index.db", "workflow_state.json"}
  16. def __init__(self, notify_callback):
  17. super().__init__()
  18. self._notify = notify_callback
  19. def on_modified(self, event):
  20. if event.is_directory:
  21. return
  22. if Path(event.src_path).name in self.WATCH_NAMES:
  23. self._notify(event.src_path, "modified")
  24. def on_created(self, event):
  25. if event.is_directory:
  26. return
  27. if Path(event.src_path).name in self.WATCH_NAMES:
  28. self._notify(event.src_path, "created")
  29. class FileWatcher:
  30. """管理 watchdog Observer 和 SSE 客户端订阅。"""
  31. def __init__(self):
  32. self._observer: Observer | None = None
  33. self._subscribers: list[asyncio.Queue] = []
  34. self._loop: asyncio.AbstractEventLoop | None = None
  35. # --- 订阅管理 ---
  36. def subscribe(self) -> asyncio.Queue:
  37. q: asyncio.Queue = asyncio.Queue(maxsize=64)
  38. self._subscribers.append(q)
  39. return q
  40. def unsubscribe(self, q: asyncio.Queue):
  41. try:
  42. self._subscribers.remove(q)
  43. except ValueError:
  44. pass
  45. # --- 推送 ---
  46. def _on_change(self, path: str, kind: str):
  47. """在 watchdog 线程中调用,向主事件循环投递通知。"""
  48. msg = json.dumps({"file": Path(path).name, "kind": kind, "ts": time.time()})
  49. if self._loop and not self._loop.is_closed():
  50. self._loop.call_soon_threadsafe(self._dispatch, msg)
  51. def _dispatch(self, msg: str):
  52. dead: list[asyncio.Queue] = []
  53. for q in self._subscribers:
  54. try:
  55. q.put_nowait(msg)
  56. except asyncio.QueueFull:
  57. dead.append(q)
  58. for dq in dead:
  59. self.unsubscribe(dq)
  60. # --- 生命周期 ---
  61. def start(self, watch_dir: Path, loop: asyncio.AbstractEventLoop):
  62. """启动 watchdog observer,监听 watch_dir。"""
  63. self._loop = loop
  64. handler = _WebnovelFileHandler(self._on_change)
  65. self._observer = Observer()
  66. self._observer.schedule(handler, str(watch_dir), recursive=False)
  67. self._observer.daemon = True
  68. self._observer.start()
  69. def stop(self):
  70. if self._observer:
  71. self._observer.stop()
  72. self._observer.join(timeout=3)
  73. self._observer = None