index_observability_mixin.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. IndexObservabilityMixin extracted from IndexManager.
  5. """
  6. from __future__ import annotations
  7. import json
  8. import sys
  9. from datetime import datetime
  10. from typing import Any, Dict, List, Optional
  11. class IndexObservabilityMixin:
  12. def _row_to_dict(self, row: sqlite3.Row, parse_json: List[str] = None) -> Dict:
  13. """将 Row 转换为字典"""
  14. d = dict(row)
  15. if parse_json:
  16. for key in parse_json:
  17. if key in d and d[key]:
  18. try:
  19. d[key] = json.loads(d[key])
  20. except json.JSONDecodeError as exc:
  21. print(f"[index_manager] failed to parse JSON field {key} in _row_to_dict: {exc}", file=sys.stderr)
  22. return d
  23. # ==================== 无效事实管理 ====================
  24. def mark_invalid_fact(
  25. self,
  26. source_type: str,
  27. source_id: str,
  28. reason: str,
  29. marked_by: str = "user",
  30. chapter_discovered: Optional[int] = None,
  31. ) -> int:
  32. """标记无效事实(pending)"""
  33. with self._get_conn() as conn:
  34. cursor = conn.cursor()
  35. cursor.execute(
  36. """
  37. INSERT INTO invalid_facts
  38. (source_type, source_id, reason, status, marked_by, chapter_discovered)
  39. VALUES (?, ?, ?, 'pending', ?, ?)
  40. """,
  41. (source_type, str(source_id), reason, marked_by, chapter_discovered),
  42. )
  43. conn.commit()
  44. return int(cursor.lastrowid)
  45. def resolve_invalid_fact(self, invalid_id: int, action: str) -> bool:
  46. """确认或撤销无效标记"""
  47. action = action.lower()
  48. with self._get_conn() as conn:
  49. cursor = conn.cursor()
  50. if action == "confirm":
  51. cursor.execute(
  52. """
  53. UPDATE invalid_facts
  54. SET status = 'confirmed', confirmed_at = CURRENT_TIMESTAMP
  55. WHERE id = ?
  56. """,
  57. (invalid_id,),
  58. )
  59. elif action == "dismiss":
  60. cursor.execute("DELETE FROM invalid_facts WHERE id = ?", (invalid_id,))
  61. else:
  62. return False
  63. conn.commit()
  64. return cursor.rowcount > 0
  65. def list_invalid_facts(self, status: Optional[str] = None) -> List[Dict]:
  66. """列出无效事实"""
  67. with self._get_conn() as conn:
  68. cursor = conn.cursor()
  69. if status:
  70. cursor.execute(
  71. "SELECT * FROM invalid_facts WHERE status = ? ORDER BY id DESC",
  72. (status,),
  73. )
  74. else:
  75. cursor.execute("SELECT * FROM invalid_facts ORDER BY id DESC")
  76. return [dict(r) for r in cursor.fetchall()]
  77. def get_invalid_ids(self, source_type: str, status: str = "confirmed") -> set[str]:
  78. """获取无效事实 ID 集合"""
  79. with self._get_conn() as conn:
  80. cursor = conn.cursor()
  81. cursor.execute(
  82. "SELECT source_id FROM invalid_facts WHERE source_type = ? AND status = ?",
  83. (source_type, status),
  84. )
  85. return {str(r[0]) for r in cursor.fetchall() if r and r[0] is not None}
  86. # ==================== 日志记录 ====================
  87. def log_rag_query(
  88. self,
  89. query: str,
  90. query_type: str,
  91. results_count: int,
  92. hit_sources: Optional[str] = None,
  93. latency_ms: Optional[int] = None,
  94. chapter: Optional[int] = None,
  95. ) -> None:
  96. with self._get_conn() as conn:
  97. cursor = conn.cursor()
  98. cursor.execute(
  99. """
  100. INSERT INTO rag_query_log
  101. (query, query_type, results_count, hit_sources, latency_ms, chapter)
  102. VALUES (?, ?, ?, ?, ?, ?)
  103. """,
  104. (query, query_type, results_count, hit_sources, latency_ms, chapter),
  105. )
  106. conn.commit()
  107. def log_tool_call(
  108. self,
  109. tool_name: str,
  110. success: bool,
  111. retry_count: int = 0,
  112. error_code: Optional[str] = None,
  113. error_message: Optional[str] = None,
  114. chapter: Optional[int] = None,
  115. ) -> None:
  116. with self._get_conn() as conn:
  117. cursor = conn.cursor()
  118. cursor.execute(
  119. """
  120. INSERT INTO tool_call_stats
  121. (tool_name, success, retry_count, error_code, error_message, chapter)
  122. VALUES (?, ?, ?, ?, ?, ?)
  123. """,
  124. (tool_name, int(bool(success)), retry_count, error_code, error_message, chapter),
  125. )
  126. conn.commit()
  127. def get_stats(self) -> Dict[str, int]:
  128. """获取索引统计"""
  129. with self._get_conn() as conn:
  130. cursor = conn.cursor()
  131. cursor.execute("SELECT COUNT(*) FROM chapters")
  132. chapters = cursor.fetchone()[0]
  133. cursor.execute("SELECT COUNT(*) FROM scenes")
  134. scenes = cursor.fetchone()[0]
  135. cursor.execute("SELECT COUNT(DISTINCT entity_id) FROM appearances")
  136. appearances = cursor.fetchone()[0]
  137. cursor.execute("SELECT MAX(chapter) FROM chapters")
  138. max_chapter = cursor.fetchone()[0] or 0
  139. # v5.1 引入统计
  140. cursor.execute("SELECT COUNT(*) FROM entities")
  141. entities = cursor.fetchone()[0]
  142. cursor.execute("SELECT COUNT(*) FROM entities WHERE is_archived = 0")
  143. active_entities = cursor.fetchone()[0]
  144. cursor.execute("SELECT COUNT(*) FROM aliases")
  145. aliases = cursor.fetchone()[0]
  146. cursor.execute("SELECT COUNT(*) FROM state_changes")
  147. state_changes = cursor.fetchone()[0]
  148. cursor.execute("SELECT COUNT(*) FROM relationships")
  149. relationships = cursor.fetchone()[0]
  150. # v5.3 引入统计
  151. cursor.execute("SELECT COUNT(*) FROM override_contracts")
  152. override_contracts = cursor.fetchone()[0]
  153. cursor.execute(
  154. "SELECT COUNT(*) FROM override_contracts WHERE status = 'pending'"
  155. )
  156. pending_overrides = cursor.fetchone()[0]
  157. cursor.execute("SELECT COUNT(*) FROM chase_debt WHERE status = 'active'")
  158. active_debts = cursor.fetchone()[0]
  159. cursor.execute(
  160. "SELECT COALESCE(SUM(current_amount), 0) FROM chase_debt WHERE status IN ('active', 'overdue')"
  161. )
  162. total_debt = cursor.fetchone()[0]
  163. cursor.execute("SELECT COUNT(*) FROM chapter_reading_power")
  164. reading_power_records = cursor.fetchone()[0]
  165. cursor.execute("SELECT COUNT(*) FROM review_metrics")
  166. review_metrics = cursor.fetchone()[0]
  167. return {
  168. "chapters": chapters,
  169. "scenes": scenes,
  170. "appearances": appearances,
  171. "max_chapter": max_chapter,
  172. # v5.1 引入
  173. "entities": entities,
  174. "active_entities": active_entities,
  175. "aliases": aliases,
  176. "state_changes": state_changes,
  177. "relationships": relationships,
  178. # v5.3 引入
  179. "override_contracts": override_contracts,
  180. "pending_overrides": pending_overrides,
  181. "active_debts": active_debts,
  182. "total_debt": total_debt,
  183. "reading_power_records": reading_power_records,
  184. "review_metrics": review_metrics,
  185. }