index_observability_mixin.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. IndexObservabilityMixin extracted from IndexManager.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import logging
  9. from datetime import datetime
  10. from typing import Any, Dict, List, Optional
  11. logger = logging.getLogger(__name__)
  12. class IndexObservabilityMixin:
  13. def _row_to_dict(self, row: sqlite3.Row, parse_json: List[str] = None) -> Dict:
  14. """将 Row 转换为字典"""
  15. d = dict(row)
  16. if parse_json:
  17. for key in parse_json:
  18. if key in d and d[key]:
  19. try:
  20. d[key] = json.loads(d[key])
  21. except json.JSONDecodeError as exc:
  22. logger.warning(
  23. "failed to parse JSON field %s in _row_to_dict: %s",
  24. key,
  25. exc,
  26. )
  27. return d
  28. # ==================== 无效事实管理 ====================
  29. def mark_invalid_fact(
  30. self,
  31. source_type: str,
  32. source_id: str,
  33. reason: str,
  34. marked_by: str = "user",
  35. chapter_discovered: Optional[int] = None,
  36. ) -> int:
  37. """标记无效事实(pending)"""
  38. with self._get_conn() as conn:
  39. cursor = conn.cursor()
  40. cursor.execute(
  41. """
  42. INSERT INTO invalid_facts
  43. (source_type, source_id, reason, status, marked_by, chapter_discovered)
  44. VALUES (?, ?, ?, 'pending', ?, ?)
  45. """,
  46. (source_type, str(source_id), reason, marked_by, chapter_discovered),
  47. )
  48. conn.commit()
  49. return int(cursor.lastrowid)
  50. def resolve_invalid_fact(self, invalid_id: int, action: str) -> bool:
  51. """确认或撤销无效标记"""
  52. action = action.lower()
  53. with self._get_conn() as conn:
  54. cursor = conn.cursor()
  55. if action == "confirm":
  56. cursor.execute(
  57. """
  58. UPDATE invalid_facts
  59. SET status = 'confirmed', confirmed_at = CURRENT_TIMESTAMP
  60. WHERE id = ?
  61. """,
  62. (invalid_id,),
  63. )
  64. elif action == "dismiss":
  65. cursor.execute("DELETE FROM invalid_facts WHERE id = ?", (invalid_id,))
  66. else:
  67. return False
  68. conn.commit()
  69. return cursor.rowcount > 0
  70. def list_invalid_facts(self, status: Optional[str] = None) -> List[Dict]:
  71. """列出无效事实"""
  72. with self._get_conn() as conn:
  73. cursor = conn.cursor()
  74. if status:
  75. cursor.execute(
  76. "SELECT * FROM invalid_facts WHERE status = ? ORDER BY id DESC",
  77. (status,),
  78. )
  79. else:
  80. cursor.execute("SELECT * FROM invalid_facts ORDER BY id DESC")
  81. return [dict(r) for r in cursor.fetchall()]
  82. def get_invalid_ids(self, source_type: str, status: str = "confirmed") -> set[str]:
  83. """获取无效事实 ID 集合"""
  84. with self._get_conn() as conn:
  85. cursor = conn.cursor()
  86. cursor.execute(
  87. "SELECT source_id FROM invalid_facts WHERE source_type = ? AND status = ?",
  88. (source_type, status),
  89. )
  90. return {str(r[0]) for r in cursor.fetchall() if r and r[0] is not None}
  91. # ==================== 日志记录 ====================
  92. def log_rag_query(
  93. self,
  94. query: str,
  95. query_type: str,
  96. results_count: int,
  97. hit_sources: Optional[str] = None,
  98. latency_ms: Optional[int] = None,
  99. chapter: Optional[int] = None,
  100. ) -> None:
  101. with self._get_conn() as conn:
  102. cursor = conn.cursor()
  103. cursor.execute(
  104. """
  105. INSERT INTO rag_query_log
  106. (query, query_type, results_count, hit_sources, latency_ms, chapter)
  107. VALUES (?, ?, ?, ?, ?, ?)
  108. """,
  109. (query, query_type, results_count, hit_sources, latency_ms, chapter),
  110. )
  111. conn.commit()
  112. def log_tool_call(
  113. self,
  114. tool_name: str,
  115. success: bool,
  116. retry_count: int = 0,
  117. error_code: Optional[str] = None,
  118. error_message: Optional[str] = None,
  119. chapter: Optional[int] = None,
  120. ) -> None:
  121. with self._get_conn() as conn:
  122. cursor = conn.cursor()
  123. cursor.execute(
  124. """
  125. INSERT INTO tool_call_stats
  126. (tool_name, success, retry_count, error_code, error_message, chapter)
  127. VALUES (?, ?, ?, ?, ?, ?)
  128. """,
  129. (tool_name, int(bool(success)), retry_count, error_code, error_message, chapter),
  130. )
  131. conn.commit()
  132. def get_stats(self) -> Dict[str, int]:
  133. """获取索引统计"""
  134. with self._get_conn() as conn:
  135. cursor = conn.cursor()
  136. cursor.execute("SELECT COUNT(*) FROM chapters")
  137. chapters = cursor.fetchone()[0]
  138. cursor.execute("SELECT COUNT(*) FROM scenes")
  139. scenes = cursor.fetchone()[0]
  140. cursor.execute("SELECT COUNT(DISTINCT entity_id) FROM appearances")
  141. appearances = cursor.fetchone()[0]
  142. cursor.execute("SELECT MAX(chapter) FROM chapters")
  143. max_chapter = cursor.fetchone()[0] or 0
  144. # v5.1 引入统计
  145. cursor.execute("SELECT COUNT(*) FROM entities")
  146. entities = cursor.fetchone()[0]
  147. cursor.execute("SELECT COUNT(*) FROM entities WHERE is_archived = 0")
  148. active_entities = cursor.fetchone()[0]
  149. cursor.execute("SELECT COUNT(*) FROM aliases")
  150. aliases = cursor.fetchone()[0]
  151. cursor.execute("SELECT COUNT(*) FROM state_changes")
  152. state_changes = cursor.fetchone()[0]
  153. cursor.execute("SELECT COUNT(*) FROM relationships")
  154. relationships = cursor.fetchone()[0]
  155. cursor.execute("SELECT COUNT(*) FROM relationship_events")
  156. relationship_events = cursor.fetchone()[0]
  157. # v5.3 引入统计
  158. cursor.execute("SELECT COUNT(*) FROM override_contracts")
  159. override_contracts = cursor.fetchone()[0]
  160. cursor.execute(
  161. "SELECT COUNT(*) FROM override_contracts WHERE status = 'pending'"
  162. )
  163. pending_overrides = cursor.fetchone()[0]
  164. cursor.execute("SELECT COUNT(*) FROM chase_debt WHERE status = 'active'")
  165. active_debts = cursor.fetchone()[0]
  166. cursor.execute(
  167. "SELECT COALESCE(SUM(current_amount), 0) FROM chase_debt WHERE status IN ('active', 'overdue')"
  168. )
  169. total_debt = cursor.fetchone()[0]
  170. cursor.execute("SELECT COUNT(*) FROM chapter_reading_power")
  171. reading_power_records = cursor.fetchone()[0]
  172. cursor.execute("SELECT COUNT(*) FROM review_metrics")
  173. review_metrics = cursor.fetchone()[0]
  174. return {
  175. "chapters": chapters,
  176. "scenes": scenes,
  177. "appearances": appearances,
  178. "max_chapter": max_chapter,
  179. # v5.1 引入
  180. "entities": entities,
  181. "active_entities": active_entities,
  182. "aliases": aliases,
  183. "state_changes": state_changes,
  184. "relationships": relationships,
  185. "relationship_events": relationship_events,
  186. # v5.3 引入
  187. "override_contracts": override_contracts,
  188. "pending_overrides": pending_overrides,
  189. "active_debts": active_debts,
  190. "total_debt": total_debt,
  191. "reading_power_records": reading_power_records,
  192. "review_metrics": review_metrics,
  193. }