瀏覽代碼

fix: resolve v5.1 SQLite-only migration issues

- Remove structured_index imports from archive_manager.py and status_reporter.py
- Update archive_manager.py to use IndexManager for entity operations
- Update status_reporter.py to use IndexManager for character scanning
- Update state_manager.py entity read methods to query SQLite first
- Add missing 'relationships' field to init_project.py schema
- Update unit tests to match SQLite-only behavior (remove entities_v3/alias_index assertions)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
lingfengQAQ 5 月之前
父節點
當前提交
eafe11b6dc

+ 38 - 64
.claude/scripts/archive_manager.py

@@ -45,6 +45,14 @@ from pathlib import Path
 from security_utils import create_secure_directory, atomic_write_json
 from security_utils import create_secure_directory, atomic_write_json
 from project_locator import resolve_project_root
 from project_locator import resolve_project_root
 
 
+# v5.1: 使用 IndexManager 读取实体
+try:
+    from data_modules.index_manager import IndexManager
+    from data_modules.config import get_config
+except ImportError:
+    from scripts.data_modules.index_manager import IndexManager
+    from scripts.data_modules.config import get_config
+
 # Windows UTF-8 编码修复
 # Windows UTF-8 编码修复
 if sys.platform == 'win32':
 if sys.platform == 'win32':
     import io
     import io
@@ -62,9 +70,14 @@ class ArchiveManager:
         else:
         else:
             project_root = Path(project_root)
             project_root = Path(project_root)
 
 
+        self.project_root = project_root
         self.state_file = project_root / ".webnovel" / "state.json"
         self.state_file = project_root / ".webnovel" / "state.json"
         self.archive_dir = project_root / ".webnovel" / "archive"
         self.archive_dir = project_root / ".webnovel" / "archive"
 
 
+        # v5.1: IndexManager 用于读取实体
+        self._config = get_config(project_root)
+        self._index_manager = IndexManager(self._config)
+
         # ============================================================================
         # ============================================================================
         # 安全修复:使用安全目录创建函数(P1 MEDIUM)
         # 安全修复:使用安全目录创建函数(P1 MEDIUM)
         # 原代码: self.archive_dir.mkdir(parents=True, exist_ok=True)
         # 原代码: self.archive_dir.mkdir(parents=True, exist_ok=True)
@@ -134,15 +147,15 @@ class ArchiveManager:
         }
         }
 
 
     def identify_inactive_characters(self, state):
     def identify_inactive_characters(self, state):
-        """识别不活跃的次要角色 (v5.0 entities_v3 格式)"""
+        """识别不活跃的次要角色 (v5.1 SQLite)"""
         current_chapter = state.get("progress", {}).get("current_chapter", 0)
         current_chapter = state.get("progress", {}).get("current_chapter", 0)
-        # v5.0: 从 entities_v3.角色 获取角色列表
-        entities_v3 = state.get("entities_v3", {})
-        characters_dict = entities_v3.get("角色", {})
         threshold = self.config["character_inactive_threshold"]
         threshold = self.config["character_inactive_threshold"]
 
 
+        # v5.1: 从 SQLite 获取所有角色实体
+        characters = self._index_manager.get_entities_by_type("角色")
+
         inactive = []
         inactive = []
-        for char_id, char in characters_dict.items():
+        for char in characters:
             # 只归档次要角色(tier="装饰" 或 tier="支线")
             # 只归档次要角色(tier="装饰" 或 tier="支线")
             tier = str(char.get("tier", "")).strip()
             tier = str(char.get("tier", "")).strip()
             if tier == "核心":
             if tier == "核心":
@@ -160,7 +173,7 @@ class ArchiveManager:
             inactive_chapters = current_chapter - last_appearance
             inactive_chapters = current_chapter - last_appearance
 
 
             if inactive_chapters >= threshold:
             if inactive_chapters >= threshold:
-                # 构造兼容结构
+                char_id = char.get("id", "")
                 char_data = {
                 char_data = {
                     "id": char_id,
                     "id": char_id,
                     "name": char.get("canonical_name", char_id),
                     "name": char.get("canonical_name", char_id),
@@ -283,7 +296,7 @@ class ArchiveManager:
         return old_reviews
         return old_reviews
 
 
     def archive_characters(self, inactive_list, dry_run=False):
     def archive_characters(self, inactive_list, dry_run=False):
-        """归档不活跃角色(Priority 2 修复:与索引集成)"""
+        """归档不活跃角色(v5.1: 使用 IndexManager 更新状态)"""
         if not inactive_list:
         if not inactive_list:
             return 0
             return 0
 
 
@@ -296,23 +309,17 @@ class ArchiveManager:
             item["character"]["archived_at"] = timestamp
             item["character"]["archived_at"] = timestamp
             archived.append(item["character"])
             archived.append(item["character"])
 
 
-            # ✅ Priority 2 修复:同步更新索引状态(而非删除)
+            # v5.1: 通过 IndexManager 更新实体状态
             if not dry_run:
             if not dry_run:
                 try:
                 try:
-                    # 导入索引模块
-                    import sys
-                    from pathlib import Path
-                    script_dir = Path(__file__).parent
-                    sys.path.insert(0, str(script_dir))
-                    from structured_index import StructuredIndex
-
-                    # 更新索引状态为 'archived'
-                    project_root = self.state_file.parent.parent
-                    index = StructuredIndex(str(project_root))
-                    index.mark_character_archived(item["character"]["name"], timestamp)
+                    entity_id = item["character"].get("id")
+                    if entity_id:
+                        # 更新实体的 current_json 添加 archived 标记
+                        self._index_manager.update_entity_field(
+                            entity_id, "status", "archived"
+                        )
                 except Exception as e:
                 except Exception as e:
-                    # 索引更新失败不影响归档流程
-                    print(f"⚠️ 索引状态更新失败(不影响归档): {e}")
+                    print(f"⚠️ 实体状态更新失败(不影响归档): {e}")
 
 
         if not dry_run:
         if not dry_run:
             self.save_archive(self.characters_archive, archived)
             self.save_archive(self.characters_archive, archived)
@@ -358,15 +365,9 @@ class ArchiveManager:
         return len(old_reviews_list)
         return len(old_reviews_list)
 
 
     def remove_from_state(self, state, inactive_chars, resolved_threads, old_reviews):
     def remove_from_state(self, state, inactive_chars, resolved_threads, old_reviews):
-        """从 state.json 中移除已归档的数据 (v5.0 entities_v3 格式)"""
-        # 移除不活跃角色 (v5.0: 从 entities_v3.角色 中移除)
-        if inactive_chars:
-            char_ids = {item["character"].get("id") for item in inactive_chars}
-            entities_v3 = state.get("entities_v3", {})
-            characters_dict = entities_v3.get("角色", {})
-            for char_id in char_ids:
-                if char_id in characters_dict:
-                    del characters_dict[char_id]
+        """从 state.json/SQLite 中移除已归档的数据 (v5.1)"""
+        # v5.1: 角色数据在 SQLite,archive_characters 已处理状态更新
+        # 这里只需要处理 state.json 中的伏笔和审查报告
 
 
         # 移除已归档的伏笔
         # 移除已归档的伏笔
         if resolved_threads:
         if resolved_threads:
@@ -476,9 +477,8 @@ class ArchiveManager:
         print(f"\n💾 文件大小: {trigger['file_size_mb']:.2f} MB → {new_size_mb:.2f} MB (节省 {saved_mb:.2f} MB)")
         print(f"\n💾 文件大小: {trigger['file_size_mb']:.2f} MB → {new_size_mb:.2f} MB (节省 {saved_mb:.2f} MB)")
 
 
     def restore_character(self, name):
     def restore_character(self, name):
-        """恢复归档的角色(Priority 2 修复:同步恢复索引状态)"""
+        """恢复归档的角色(v5.1: 使用 IndexManager 恢复状态)"""
         archived = self.load_archive(self.characters_archive)
         archived = self.load_archive(self.characters_archive)
-        state = self.load_state()
 
 
         # 查找角色
         # 查找角色
         char_to_restore = None
         char_to_restore = None
@@ -494,44 +494,18 @@ class ArchiveManager:
         # 移除 archived_at 字段
         # 移除 archived_at 字段
         char_to_restore.pop("archived_at", None)
         char_to_restore.pop("archived_at", None)
 
 
-        # ✅ 原子性修复:先从归档中移除,再添加到 state.json
-        # 理由:即使崩溃,数据仍在归档中,可重新恢复,不会丢失或重复
+        # 原子性修复:先从归档中移除
         archived = [char for char in archived if char["name"] != name]
         archived = [char for char in archived if char["name"] != name]
         self.save_archive(self.characters_archive, archived)
         self.save_archive(self.characters_archive, archived)
 
 
-        # 恢复到 state.json (v5.0: 添加到 entities_v3.角色)
-        if "entities_v3" not in state:
-            state["entities_v3"] = {"角色": {}, "地点": {}, "物品": {}, "势力": {}, "招式": {}}
-        if "角色" not in state["entities_v3"]:
-            state["entities_v3"]["角色"] = {}
-
+        # v5.1: 恢复到 SQLite (通过 IndexManager)
         char_id = char_to_restore.get("id", char_to_restore.get("name", "unknown"))
         char_id = char_to_restore.get("id", char_to_restore.get("name", "unknown"))
-        state["entities_v3"]["角色"][char_id] = {
-            "canonical_name": char_to_restore.get("name", char_id),
-            "tier": char_to_restore.get("tier", "装饰"),
-            "desc": char_to_restore.get("desc", ""),
-            "current": char_to_restore.get("current", {}),
-            "first_appearance": char_to_restore.get("first_appearance", 0),
-            "last_appearance": char_to_restore.get("last_appearance", 0),
-            "history": char_to_restore.get("history", [])
-        }
-        self.save_state(state)
-
-        # ✅ Priority 2 修复:同步恢复索引状态为 'active'
         try:
         try:
-            import sys
-            from pathlib import Path
-            script_dir = Path(__file__).parent
-            sys.path.insert(0, str(script_dir))
-            from structured_index import StructuredIndex
-
-            project_root = self.state_file.parent.parent
-            index = StructuredIndex(str(project_root))
-            index.mark_character_active(name)
+            # 更新实体状态为 active
+            self._index_manager.update_entity_field(char_id, "status", "active")
+            print(f"✅ 角色已恢复: {name}")
         except Exception as e:
         except Exception as e:
-            print(f"⚠️ 索引状态恢复失败(不影响数据恢复): {e}")
-
-        print(f"✅ 角色已恢复: {name}")
+            print(f"⚠️ 实体状态恢复失败: {e}")
 
 
     def show_stats(self):
     def show_stats(self):
         """显示归档统计"""
         """显示归档统计"""

+ 51 - 5
.claude/scripts/data_modules/state_manager.py

@@ -558,12 +558,18 @@ class StateManager:
         if words > 0:
         if words > 0:
             self._pending_progress_words_delta += int(words)
             self._pending_progress_words_delta += int(words)
 
 
-    # ==================== 实体管理 (v5.0 entities_v3) ====================
+    # ==================== 实体管理 (v5.1 SQLite-first) ====================
 
 
     def get_entity(self, entity_id: str, entity_type: str = None) -> Optional[Dict]:
     def get_entity(self, entity_id: str, entity_type: str = None) -> Optional[Dict]:
-        """获取实体 (v5.0 entities_v3 格式)"""
-        entities_v3 = self._state.get("entities_v3", {})
+        """获取实体 (v5.1: 优先从 SQLite 读取)"""
+        # v5.1: 优先从 SQLite 读取
+        if self._sql_state_manager:
+            entity = self._sql_state_manager._index_manager.get_entity(entity_id)
+            if entity:
+                return entity
 
 
+        # 回退到内存 state (兼容未迁移场景)
+        entities_v3 = self._state.get("entities_v3", {})
         if entity_type:
         if entity_type:
             return entities_v3.get(entity_type, {}).get(entity_id)
             return entities_v3.get(entity_type, {}).get(entity_id)
 
 
@@ -575,13 +581,33 @@ class StateManager:
 
 
     def get_entity_type(self, entity_id: str) -> Optional[str]:
     def get_entity_type(self, entity_id: str) -> Optional[str]:
         """获取实体所属类型"""
         """获取实体所属类型"""
+        # v5.1: 优先从 SQLite 读取
+        if self._sql_state_manager:
+            entity = self._sql_state_manager._index_manager.get_entity(entity_id)
+            if entity:
+                return entity.get("type")
+
+        # 回退到内存 state
         for type_name, entities in self._state.get("entities_v3", {}).items():
         for type_name, entities in self._state.get("entities_v3", {}).items():
             if entity_id in entities:
             if entity_id in entities:
                 return type_name
                 return type_name
         return None
         return None
 
 
     def get_all_entities(self) -> Dict[str, Dict]:
     def get_all_entities(self) -> Dict[str, Dict]:
-        """获取所有实体(扁平化视图,兼容旧代码)"""
+        """获取所有实体(扁平化视图)"""
+        # v5.1: 优先从 SQLite 读取
+        if self._sql_state_manager:
+            result = {}
+            for entity_type in self.ENTITY_TYPES:
+                entities = self._sql_state_manager._index_manager.get_entities_by_type(entity_type)
+                for e in entities:
+                    eid = e.get("id")
+                    if eid:
+                        result[eid] = {**e, "type": entity_type}
+            if result:
+                return result
+
+        # 回退到内存 state
         result = {}
         result = {}
         for type_name, entities in self._state.get("entities_v3", {}).items():
         for type_name, entities in self._state.get("entities_v3", {}).items():
             for eid, e in entities.items():
             for eid, e in entities.items():
@@ -590,10 +616,30 @@ class StateManager:
 
 
     def get_entities_by_type(self, entity_type: str) -> Dict[str, Dict]:
     def get_entities_by_type(self, entity_type: str) -> Dict[str, Dict]:
         """按类型获取实体"""
         """按类型获取实体"""
+        # v5.1: 优先从 SQLite 读取
+        if self._sql_state_manager:
+            entities = self._sql_state_manager._index_manager.get_entities_by_type(entity_type)
+            if entities:
+                return {e.get("id"): e for e in entities if e.get("id")}
+
+        # 回退到内存 state
         return self._state.get("entities_v3", {}).get(entity_type, {})
         return self._state.get("entities_v3", {}).get(entity_type, {})
 
 
     def get_entities_by_tier(self, tier: str) -> Dict[str, Dict]:
     def get_entities_by_tier(self, tier: str) -> Dict[str, Dict]:
         """按层级获取实体"""
         """按层级获取实体"""
+        # v5.1: 优先从 SQLite 读取
+        if self._sql_state_manager:
+            result = {}
+            for entity_type in self.ENTITY_TYPES:
+                entities = self._sql_state_manager._index_manager.get_entities_by_tier(tier)
+                for e in entities:
+                    eid = e.get("id")
+                    if eid and e.get("type") == entity_type:
+                        result[eid] = {**e, "type": entity_type}
+            if result:
+                return result
+
+        # 回退到内存 state
         result = {}
         result = {}
         for type_name, entities in self._state.get("entities_v3", {}).items():
         for type_name, entities in self._state.get("entities_v3", {}).items():
             for eid, e in entities.items():
             for eid, e in entities.items():
@@ -1014,7 +1060,7 @@ class StateManager:
 
 
     def sync_protagonist_from_entity(self, entity_id: str = None):
     def sync_protagonist_from_entity(self, entity_id: str = None):
         """
         """
-        将 entities_v3 中主角实体的状态同步到 protagonist_state
+        将主角实体的状态同步到 protagonist_state (v5.1: 从 SQLite 读取)
 
 
         用于确保 consistency-checker 等依赖 protagonist_state 的组件获取最新数据
         用于确保 consistency-checker 等依赖 protagonist_state 的组件获取最新数据
         """
         """

+ 4 - 9
.claude/scripts/data_modules/tests/test_data_modules.py

@@ -206,8 +206,8 @@ class TestStateManager:
         assert manager.get_current_chapter() == 100
         assert manager.get_current_chapter() == 100
 
 
     def test_save_state_with_init_project_schema(self, temp_project):
     def test_save_state_with_init_project_schema(self, temp_project):
-        """回归:init_project 生成的 state.json 无 meta 字段,StateManager 仍应可写入。"""
-        # 模拟 init_project.py 生成的 v5.0 state.json 形状(包含 entities_v3/alias_index)
+        """回归:init_project 生成的 state.json,StateManager 仍应可写入。(v5.1 SQLite-only)"""
+        # v5.1: state.json 不再包含 entities_v3/alias_index,实体数据在 SQLite
         init_state = {
         init_state = {
             "project_info": {"title": "测试书名", "genre": "修仙/玄幻", "created_at": "2026-01-01"},
             "project_info": {"title": "测试书名", "genre": "修仙/玄幻", "created_at": "2026-01-01"},
             "progress": {"current_chapter": 0, "total_words": 0, "last_updated": "2026-01-01 00:00:00"},
             "progress": {"current_chapter": 0, "total_words": 0, "last_updated": "2026-01-01 00:00:00"},
@@ -217,8 +217,6 @@ class TestStateManager:
             "plot_threads": {"active_threads": [], "foreshadowing": []},
             "plot_threads": {"active_threads": [], "foreshadowing": []},
             "review_checkpoints": [],
             "review_checkpoints": [],
             "strand_tracker": {"current_dominant": "quest", "history": []},
             "strand_tracker": {"current_dominant": "quest", "history": []},
-            "entities_v3": {"角色": {}, "地点": {}, "物品": {}, "势力": {}, "招式": {}},
-            "alias_index": {},
         }
         }
         temp_project.state_file.write_text(json.dumps(init_state, ensure_ascii=False, indent=2), encoding="utf-8")
         temp_project.state_file.write_text(json.dumps(init_state, ensure_ascii=False, indent=2), encoding="utf-8")
 
 
@@ -230,11 +228,10 @@ class TestStateManager:
         assert "meta" not in saved
         assert "meta" not in saved
         assert saved["progress"]["current_chapter"] == 5
         assert saved["progress"]["current_chapter"] == 5
         assert saved["progress"]["total_words"] == 100
         assert saved["progress"]["total_words"] == 100
-        assert isinstance(saved.get("entities_v3"), dict)
-        assert isinstance(saved.get("alias_index"), dict)
+        # v5.1: entities_v3/alias_index 不再在 state.json 中
 
 
     def test_save_state_preserves_unrelated_fields(self, temp_project):
     def test_save_state_preserves_unrelated_fields(self, temp_project):
-        """回归:仅写入增量,不应覆盖/丢失其他模块维护的字段。"""
+        """回归:仅写入增量,不应覆盖/丢失其他模块维护的字段。(v5.1 SQLite-only)"""
         init_state = {
         init_state = {
             "project_info": {"title": "测试书名", "genre": "修仙/玄幻", "created_at": "2026-01-01"},
             "project_info": {"title": "测试书名", "genre": "修仙/玄幻", "created_at": "2026-01-01"},
             "progress": {"current_chapter": 10, "total_words": 1000, "last_updated": "2026-01-01 00:00:00"},
             "progress": {"current_chapter": 10, "total_words": 1000, "last_updated": "2026-01-01 00:00:00"},
@@ -244,8 +241,6 @@ class TestStateManager:
             "plot_threads": {"active_threads": [{"id": "t1", "title": "主线"}], "foreshadowing": []},
             "plot_threads": {"active_threads": [{"id": "t1", "title": "主线"}], "foreshadowing": []},
             "review_checkpoints": [],
             "review_checkpoints": [],
             "strand_tracker": {"current_dominant": "quest", "history": []},
             "strand_tracker": {"current_dominant": "quest", "history": []},
-            "entities_v3": {"角色": {}, "地点": {}, "物品": {}, "势力": {}, "招式": {}},
-            "alias_index": {},
             "custom_field": {"keep": True},
             "custom_field": {"keep": True},
         }
         }
         temp_project.state_file.write_text(json.dumps(init_state, ensure_ascii=False, indent=2), encoding="utf-8")
         temp_project.state_file.write_text(json.dumps(init_state, ensure_ascii=False, indent=2), encoding="utf-8")

+ 1 - 0
.claude/scripts/init_project.py

@@ -59,6 +59,7 @@ def _ensure_state_schema(state: Dict[str, Any]) -> Dict[str, Any]:
     state.setdefault("project_info", {})
     state.setdefault("project_info", {})
     state.setdefault("progress", {})
     state.setdefault("progress", {})
     state.setdefault("protagonist_state", {})
     state.setdefault("protagonist_state", {})
+    state.setdefault("relationships", {})  # update_state.py 需要此字段
     state.setdefault("disambiguation_warnings", [])
     state.setdefault("disambiguation_warnings", [])
     state.setdefault("disambiguation_pending", [])
     state.setdefault("disambiguation_pending", [])
     state.setdefault("world_settings", {"power_system": [], "factions": [], "locations": []})
     state.setdefault("world_settings", {"power_system": [], "factions": [], "locations": []})

+ 43 - 50
.claude/scripts/status_reporter.py

@@ -92,8 +92,10 @@ from chapter_paths import extract_chapter_num_from_filename
 # 导入配置
 # 导入配置
 try:
 try:
     from data_modules.config import get_config, DataModulesConfig
     from data_modules.config import get_config, DataModulesConfig
+    from data_modules.index_manager import IndexManager
 except ImportError:
 except ImportError:
     from scripts.data_modules.config import get_config, DataModulesConfig
     from scripts.data_modules.config import get_config, DataModulesConfig
+    from scripts.data_modules.index_manager import IndexManager
 
 
 def _is_resolved_foreshadowing_status(raw_status: Any) -> bool:
 def _is_resolved_foreshadowing_status(raw_status: Any) -> bool:
     """判断伏笔是否已回收(兼容历史字段与同义词)。"""
     """判断伏笔是否已回收(兼容历史字段与同义词)。"""
@@ -131,13 +133,8 @@ class StatusReporter:
         self.state = None
         self.state = None
         self.chapters_data = []
         self.chapters_data = []
 
 
-        # 可选:集成结构化索引(如果可用,角色统计更准)
-        self.index = None
-        try:
-            from structured_index import StructuredIndex
-            self.index = StructuredIndex(self.project_root)
-        except Exception:
-            self.index = None
+        # v5.1: 使用 IndexManager 读取实体
+        self._index_manager = IndexManager(self.config)
 
 
     def _extract_stats_field(self, content: str, field_name: str) -> str:
     def _extract_stats_field(self, content: str, field_name: str) -> str:
         """
         """
@@ -173,19 +170,22 @@ class StatusReporter:
         # 2) 正文/第1卷/第001章-标题.md
         # 2) 正文/第1卷/第001章-标题.md
         chapter_files = sorted(self.chapters_dir.rglob("第*.md"))
         chapter_files = sorted(self.chapters_dir.rglob("第*.md"))
 
 
-        # 角色候选(fallback 用):从 state.json 获取已知角色名 (v5.0 entities_v3 格式)
+        # v5.1: 从 SQLite 获取已知角色名
         known_character_names: List[str] = []
         known_character_names: List[str] = []
         protagonist_name = ""
         protagonist_name = ""
         if self.state:
         if self.state:
             protagonist_name = self.state.get("protagonist_state", {}).get("name", "") or ""
             protagonist_name = self.state.get("protagonist_state", {}).get("name", "") or ""
-            # v5.0: 从 entities_v3.角色 获取角色名
-            entities_v3 = self.state.get("entities_v3", {})
-            characters_dict = entities_v3.get("角色", {})
+
+        # 从 SQLite 获取所有角色的 canonical_name
+        try:
+            characters_from_db = self._index_manager.get_entities_by_type("角色")
             known_character_names = [
             known_character_names = [
-                c.get("canonical_name", char_id)
-                for char_id, c in characters_dict.items()
+                c.get("canonical_name", c.get("id", ""))
+                for c in characters_from_db
                 if c.get("canonical_name")
                 if c.get("canonical_name")
             ]
             ]
+        except Exception:
+            known_character_names = []
 
 
         for chapter_file in chapter_files:
         for chapter_file in chapter_files:
             chapter_num = extract_chapter_num_from_filename(chapter_file.name)
             chapter_num = extract_chapter_num_from_filename(chapter_file.name)
@@ -202,40 +202,29 @@ class StatusReporter:
             text = re.sub(r'---', '', text)  # 去除分隔线
             text = re.sub(r'---', '', text)  # 去除分隔线
             word_count = len(text.strip())
             word_count = len(text.strip())
 
 
-            # 主导 Strand / 爽点类型(优先从“本章统计”解析)
+            # 主导 Strand / 爽点类型(优先从"本章统计"解析)
             dominant_strand = (self._extract_stats_field(content, "主导Strand") or "").lower()
             dominant_strand = (self._extract_stats_field(content, "主导Strand") or "").lower()
             cool_point_type = self._extract_stats_field(content, "爽点")
             cool_point_type = self._extract_stats_field(content, "爽点")
 
 
-            # 角色提取:优先从结构化索引读取(若有),否则 fallback 用“出现即算出场”
+            # v5.1: 角色提取从 SQLite chapters 表读取
             characters: List[str] = []
             characters: List[str] = []
-            if self.index is not None:
-                try:
-                    cursor = self.index.conn.execute(
-                        "SELECT characters FROM chapters WHERE chapter_num = ?",
-                        (chapter_num,),
-                    )
-                    row = cursor.fetchone()
-                    if row and row[0]:
-                        try:
-                            stored = json.loads(row[0])
-                            if isinstance(stored, list):
-                                # v4.0: chapters.characters 存 entity_id 列表,输出时尽量还原为 canonical_name
-                                for x in stored:
-                                    entity_id = str(x).strip()
-                                    if not entity_id:
-                                        continue
-                                    name = entity_id
-                                    try:
-                                        ent = self.index.query_entity_by_id(entity_id)
-                                        if ent and ent.get("canonical_name"):
-                                            name = str(ent["canonical_name"]).strip() or entity_id
-                                    except Exception:
-                                        name = entity_id
-                                    characters.append(name)
-                        except json.JSONDecodeError:
-                            characters = []
-                except Exception:
-                    characters = []
+            try:
+                chapter_info = self._index_manager.get_chapter(chapter_num)
+                if chapter_info and chapter_info.get("characters"):
+                    stored = chapter_info["characters"]
+                    if isinstance(stored, str):
+                        stored = json.loads(stored)
+                    if isinstance(stored, list):
+                        for entity_id in stored:
+                            entity_id = str(entity_id).strip()
+                            if not entity_id:
+                                continue
+                            # 尝试获取 canonical_name
+                            entity = self._index_manager.get_entity(entity_id)
+                            name = entity.get("canonical_name", entity_id) if entity else entity_id
+                            characters.append(name)
+            except Exception:
+                characters = []
 
 
             if not characters and (protagonist_name or known_character_names):
             if not characters and (protagonist_name or known_character_names):
                 # 限制候选规模,避免在超大角色库下过慢
                 # 限制候选规模,避免在超大角色库下过慢
@@ -262,26 +251,30 @@ class StatusReporter:
             })
             })
 
 
     def analyze_characters(self) -> Dict:
     def analyze_characters(self) -> Dict:
-        """分析角色活跃度 (v5.0 entities_v3 格式)"""
+        """分析角色活跃度 (v5.1 SQLite)"""
         if not self.state:
         if not self.state:
             return {}
             return {}
 
 
         current_chapter = self.state.get("progress", {}).get("current_chapter", 0)
         current_chapter = self.state.get("progress", {}).get("current_chapter", 0)
-        # v5.0: 从 entities_v3.角色 获取角色
-        entities_v3 = self.state.get("entities_v3", {})
-        characters_dict = entities_v3.get("角色", {})
+
+        # v5.1: 从 SQLite 获取所有角色
+        try:
+            characters_list = self._index_manager.get_entities_by_type("角色")
+        except Exception:
+            characters_list = []
 
 
         # 统计每个角色的最后出场章节
         # 统计每个角色的最后出场章节
         character_activity = {}
         character_activity = {}
 
 
-        for char_id, char in characters_dict.items():
-            char_name = char.get("canonical_name", char_id)
+        for char in characters_list:
+            char_name = char.get("canonical_name", char.get("id", ""))
             if not char_name:
             if not char_name:
                 continue
                 continue
 
 
             # 查找最后出场章节
             # 查找最后出场章节
-            last_appearance = 0
+            last_appearance = char.get("last_appearance", 0) or 0
 
 
+            # 也从 chapters_data 中检查
             for ch_data in self.chapters_data:
             for ch_data in self.chapters_data:
                 if char_name in ch_data.get("characters", []):
                 if char_name in ch_data.get("characters", []):
                     last_appearance = max(last_appearance, ch_data["chapter"])
                     last_appearance = max(last_appearance, ch_data["chapter"])