Selaa lähdekoodia

fix: harden sqlite sync and context snapshot compatibility

lingfengQAQ 4 kuukautta sitten
vanhempi
sitoutus
58d1bbb837

+ 17 - 1
.claude/scripts/data_modules/context_manager.py

@@ -32,6 +32,22 @@ class ContextManager:
         self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
         self.snapshot_manager = snapshot_manager or SnapshotManager(self.config)
         self.index_manager = IndexManager(self.config)
         self.index_manager = IndexManager(self.config)
 
 
+    def _is_snapshot_compatible(self, cached: Dict[str, Any], template: str) -> bool:
+        """判断快照是否可用于当前模板。"""
+        if not isinstance(cached, dict):
+            return False
+
+        meta = cached.get("meta")
+        if not isinstance(meta, dict):
+            # 兼容旧快照:未记录 template 时仅允许默认模板复用
+            return template == self.DEFAULT_TEMPLATE
+
+        cached_template = meta.get("template")
+        if not isinstance(cached_template, str):
+            return template == self.DEFAULT_TEMPLATE
+
+        return cached_template == template
+
     def build_context(
     def build_context(
         self,
         self,
         chapter: int,
         chapter: int,
@@ -47,7 +63,7 @@ class ContextManager:
         if use_snapshot:
         if use_snapshot:
             try:
             try:
                 cached = self.snapshot_manager.load_snapshot(chapter)
                 cached = self.snapshot_manager.load_snapshot(chapter)
-                if cached:
+                if cached and self._is_snapshot_compatible(cached, template):
                     return cached.get("payload", cached)
                     return cached.get("payload", cached)
             except SnapshotVersionMismatch:
             except SnapshotVersionMismatch:
                 # Snapshot incompatible; rebuild below.
                 # Snapshot incompatible; rebuild below.

+ 52 - 17
.claude/scripts/data_modules/state_manager.py

@@ -15,6 +15,7 @@ v5.1 变更(v5.4 沿用):
 
 
 import json
 import json
 import sys
 import sys
+from copy import deepcopy
 from pathlib import Path
 from pathlib import Path
 from typing import Dict, List, Optional, Any
 from typing import Dict, List, Optional, Any
 from dataclasses import dataclass, field, asdict
 from dataclasses import dataclass, field, asdict
@@ -332,28 +333,37 @@ class StateManager:
                 # 原子写入(锁已持有,不再二次加锁)
                 # 原子写入(锁已持有,不再二次加锁)
                 atomic_write_json(self.config.state_file, disk_state, use_lock=False, backup=True)
                 atomic_write_json(self.config.state_file, disk_state, use_lock=False, backup=True)
 
 
-                # v5.1 引入: 同步到 SQLite(必须在清空 pending 之前调用)
-                self._sync_to_sqlite()
+                # v5.1 引入: 同步到 SQLite(失败时保留 pending 以便重试)
+                sqlite_pending_snapshot = self._snapshot_sqlite_pending()
+                sqlite_sync_ok = self._sync_to_sqlite()
 
 
-                # 同步内存为磁盘最新快照,并清空增量队列
+                # 同步内存为磁盘最新快照
                 self._state = disk_state
                 self._state = disk_state
-                self._pending_entity_patches.clear()
-                self._pending_alias_entries.clear()
-                self._pending_state_changes.clear()
-                self._pending_structured_relationships.clear()
+
+                # state.json 侧 pending 已写盘,直接清空
                 self._pending_disambiguation_warnings.clear()
                 self._pending_disambiguation_warnings.clear()
                 self._pending_disambiguation_pending.clear()
                 self._pending_disambiguation_pending.clear()
                 self._pending_chapter_meta.clear()
                 self._pending_chapter_meta.clear()
                 self._pending_progress_chapter = None
                 self._pending_progress_chapter = None
                 self._pending_progress_words_delta = 0
                 self._pending_progress_words_delta = 0
 
 
+                # SQLite 侧 pending:成功后清空,失败则恢复快照(避免静默丢数据)
+                if sqlite_sync_ok:
+                    self._pending_entity_patches.clear()
+                    self._pending_alias_entries.clear()
+                    self._pending_state_changes.clear()
+                    self._pending_structured_relationships.clear()
+                    self._clear_pending_sqlite_data()
+                else:
+                    self._restore_sqlite_pending(sqlite_pending_snapshot)
+
         except filelock.Timeout:
         except filelock.Timeout:
             raise RuntimeError("无法获取 state.json 文件锁,请稍后重试")
             raise RuntimeError("无法获取 state.json 文件锁,请稍后重试")
 
 
-    def _sync_to_sqlite(self):
+    def _sync_to_sqlite(self) -> bool:
         """同步待处理数据到 SQLite(v5.1 引入,v5.4 沿用)"""
         """同步待处理数据到 SQLite(v5.1 引入,v5.4 沿用)"""
         if not self._sql_state_manager:
         if not self._sql_state_manager:
-            return
+            return True
 
 
         # 方式1: 通过 process_chapter_result 收集的数据
         # 方式1: 通过 process_chapter_result 收集的数据
         sqlite_data = self._pending_sqlite_data
         sqlite_data = self._pending_sqlite_data
@@ -379,16 +389,15 @@ class StateManager:
                     eid = entity.get("suggested_id") or entity.get("id")
                     eid = entity.get("suggested_id") or entity.get("id")
                     if eid:
                     if eid:
                         processed_appearances.add((eid, chapter))
                         processed_appearances.add((eid, chapter))
-            except Exception:
-                pass  # SQLite 同步失败时静默降级(避免中断主流程)
+            except Exception as exc:
+                print(f"[WARNING] SQLite sync failed (process_chapter_entities): {exc}", file=sys.stderr)
+                return False
 
 
         # 方式2: 使用 add_entity/update_entity 收集的增量数据。
         # 方式2: 使用 add_entity/update_entity 收集的增量数据。
         # 数据缓存在 _pending_entity_patches 等变量中。
         # 数据缓存在 _pending_entity_patches 等变量中。
-        self._sync_pending_patches_to_sqlite(processed_appearances)
+        return self._sync_pending_patches_to_sqlite(processed_appearances)
 
 
-        self._clear_pending_sqlite_data()
-
-    def _sync_pending_patches_to_sqlite(self, processed_appearances: set = None):
+    def _sync_pending_patches_to_sqlite(self, processed_appearances: set = None) -> bool:
         """同步 _pending_entity_patches 等到 SQLite(v5.1 引入,v5.4 沿用)
         """同步 _pending_entity_patches 等到 SQLite(v5.1 引入,v5.4 沿用)
 
 
         Args:
         Args:
@@ -396,7 +405,7 @@ class StateManager:
                                    用于避免重复写入 appearances 表(防止覆盖 mentions)
                                    用于避免重复写入 appearances 表(防止覆盖 mentions)
         """
         """
         if not self._sql_state_manager:
         if not self._sql_state_manager:
-            return
+            return True
 
 
         if processed_appearances is None:
         if processed_appearances is None:
             processed_appearances = set()
             processed_appearances = set()
@@ -534,10 +543,36 @@ class StateManager:
                     chapter=rel.get("chapter", 0)
                     chapter=rel.get("chapter", 0)
                 )
                 )
 
 
+            return True
+
         except Exception as e:
         except Exception as e:
             # SQLite 同步失败时记录警告(不中断主流程)
             # SQLite 同步失败时记录警告(不中断主流程)
-            import sys
             print(f"[WARNING] SQLite sync failed: {e}", file=sys.stderr)
             print(f"[WARNING] SQLite sync failed: {e}", file=sys.stderr)
+            return False
+
+    def _snapshot_sqlite_pending(self) -> Dict[str, Any]:
+        """抓取 SQLite 侧 pending 快照,用于同步失败回滚内存队列。"""
+        return {
+            "entity_patches": deepcopy(self._pending_entity_patches),
+            "alias_entries": deepcopy(self._pending_alias_entries),
+            "state_changes": deepcopy(self._pending_state_changes),
+            "structured_relationships": deepcopy(self._pending_structured_relationships),
+            "sqlite_data": deepcopy(self._pending_sqlite_data),
+        }
+
+    def _restore_sqlite_pending(self, snapshot: Dict[str, Any]) -> None:
+        """恢复 SQLite 侧 pending 快照,避免同步失败后数据静默丢失。"""
+        self._pending_entity_patches = snapshot.get("entity_patches", {})
+        self._pending_alias_entries = snapshot.get("alias_entries", {})
+        self._pending_state_changes = snapshot.get("state_changes", [])
+        self._pending_structured_relationships = snapshot.get("structured_relationships", [])
+        self._pending_sqlite_data = snapshot.get("sqlite_data", {
+            "entities_appeared": [],
+            "entities_new": [],
+            "state_changes": [],
+            "relationships_new": [],
+            "chapter": None,
+        })
 
 
     def _clear_pending_sqlite_data(self):
     def _clear_pending_sqlite_data(self):
         """清空待同步的 SQLite 数据"""
         """清空待同步的 SQLite 数据"""

+ 18 - 0
.claude/scripts/data_modules/tests/test_context_manager.py

@@ -88,3 +88,21 @@ def test_query_router():
     assert router.route("角色是谁") == "entity"
     assert router.route("角色是谁") == "entity"
     assert router.route("发生了什么剧情") == "plot"
     assert router.route("发生了什么剧情") == "plot"
     assert "A" in router.split("A, B;C")
     assert "A" in router.split("A, B;C")
+
+
+def test_context_snapshot_respects_template(temp_project):
+    state = {
+        "protagonist_state": {"name": "萧炎"},
+        "chapter_meta": {},
+        "disambiguation_warnings": [],
+        "disambiguation_pending": [],
+    }
+    temp_project.state_file.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
+
+    manager = ContextManager(temp_project)
+
+    plot_payload = manager.build_context(1, template="plot", use_snapshot=True, save_snapshot=True)
+    battle_payload = manager.build_context(1, template="battle", use_snapshot=True, save_snapshot=True)
+
+    assert plot_payload.get("template") == "plot"
+    assert battle_payload.get("template") == "battle"

+ 38 - 0
.claude/scripts/data_modules/tests/test_extract_chapter_context.py

@@ -0,0 +1,38 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import json
+import sys
+from pathlib import Path
+
+
+def test_extract_state_summary_accepts_dominant_key(tmp_path):
+    scripts_dir = Path(__file__).resolve().parents[2]
+    if str(scripts_dir) not in sys.path:
+        sys.path.insert(0, str(scripts_dir))
+
+    from extract_chapter_context import extract_state_summary
+
+    state = {
+        "progress": {"current_chapter": 12, "total_words": 12345},
+        "protagonist_state": {
+            "power": {"realm": "筑基", "layer": 2},
+            "location": "宗门",
+            "golden_finger": {"name": "系统", "level": 1},
+        },
+        "strand_tracker": {
+            "history": [
+                {"chapter": 10, "dominant": "quest"},
+                {"chapter": 11, "dominant": "fire"},
+            ]
+        },
+    }
+
+    webnovel_dir = tmp_path / ".webnovel"
+    webnovel_dir.mkdir(parents=True, exist_ok=True)
+    (webnovel_dir / "state.json").write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
+
+    text = extract_state_summary(tmp_path)
+    assert "Ch10:quest" in text
+    assert "Ch11:fire" in text
+

+ 25 - 0
.claude/scripts/data_modules/tests/test_state_manager_extra.py

@@ -6,6 +6,8 @@ StateManager extra tests
 
 
 import json
 import json
 import sys
 import sys
+import tempfile
+from pathlib import Path
 
 
 import pytest
 import pytest
 
 
@@ -329,6 +331,29 @@ def test_ensure_state_schema_invalid_inputs(temp_project):
     assert isinstance(schema2["disambiguation_pending"], list)
     assert isinstance(schema2["disambiguation_pending"], list)
 
 
 
 
+def test_save_state_preserves_sqlite_pending_on_sync_failure(temp_project):
+    manager = StateManager(temp_project)
+
+    manager.add_entity(EntityState(id="e1", name="测试角色", type="角色", first_appearance=1, last_appearance=1))
+    manager.update_entity("e1", {"current": {"realm": "炼气"}}, "角色")
+
+    class _BrokenSQLManager:
+        def process_chapter_entities(self, **kwargs):
+            raise RuntimeError("boom")
+
+    manager._sql_state_manager = _BrokenSQLManager()
+    manager._pending_sqlite_data["chapter"] = 1
+
+    manager.save_state()
+
+    state = json.loads(temp_project.state_file.read_text(encoding="utf-8"))
+    assert state.get("_migrated_to_sqlite") is True
+
+    # SQLite 同步失败后,SQLite 相关 pending 不应被清空,便于后续重试
+    assert manager._pending_entity_patches
+    assert manager._pending_sqlite_data.get("chapter") == 1
+
+
 def test_save_state_progress_and_disambiguation_merge(temp_project):
 def test_save_state_progress_and_disambiguation_merge(temp_project):
     state = {
     state = {
         "progress": {"current_chapter": "bad", "total_words": "bad"},
         "progress": {"current_chapter": "bad", "total_words": "bad"},

+ 8 - 1
.claude/scripts/extract_chapter_context.py

@@ -151,7 +151,14 @@ def extract_state_summary(project_root: Path) -> str:
         st = state["strand_tracker"]
         st = state["strand_tracker"]
         history = st.get("history", [])[-5:]  # 最近5章
         history = st.get("history", [])[-5:]  # 最近5章
         if history:
         if history:
-            strand_str = ", ".join([f"Ch{h['chapter']}:{h['strand']}" for h in history])
+            items = []
+            for h in history:
+                if not isinstance(h, dict):
+                    continue
+                ch = h.get("chapter", "?")
+                strand = h.get("strand") or h.get("dominant") or "unknown"
+                items.append(f"Ch{ch}:{strand}")
+            strand_str = ", ".join(items)
             summary_parts.append(f"**近5章Strand**: {strand_str}")
             summary_parts.append(f"**近5章Strand**: {strand_str}")
 
 
     # 活跃伏笔(只显示紧急的)
     # 活跃伏笔(只显示紧急的)