1
0

store.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. scratchpad 持久化与查询。
  5. """
  6. from __future__ import annotations
  7. from dataclasses import asdict
  8. from pathlib import Path
  9. from typing import Any, Dict, List, Optional
  10. from ..config import DataModulesConfig, get_config
  11. from ..cli_output import print_error, print_success
  12. from ..cli_args import normalize_global_project_root, load_json_arg
  13. from .schema import (
  14. BUCKET_TO_CATEGORY,
  15. CATEGORY_KEY_RULES,
  16. CATEGORY_TO_BUCKET,
  17. MemoryItem,
  18. ScratchpadData,
  19. memory_item_key,
  20. now_iso,
  21. )
  22. try:
  23. from security_utils import atomic_write_json, read_json_safe
  24. except ImportError: # pragma: no cover
  25. from scripts.security_utils import atomic_write_json, read_json_safe
  26. from filelock import FileLock
  27. class ScratchpadManager:
  28. def __init__(self, config: DataModulesConfig | None = None):
  29. self.config = config or get_config()
  30. self.path = Path(self.config.scratchpad_file)
  31. self._lock = FileLock(str(self.path) + ".lock", timeout=30)
  32. def load(self) -> ScratchpadData:
  33. if not self.path.exists():
  34. return ScratchpadData.empty()
  35. payload = read_json_safe(self.path, default={})
  36. if not isinstance(payload, dict):
  37. return ScratchpadData.empty()
  38. return ScratchpadData.from_dict(payload)
  39. def save(self, data: ScratchpadData, _use_lock: bool = True) -> None:
  40. self.config.ensure_dirs()
  41. if bool(getattr(self.config, "memory_compactor_enabled", True)):
  42. threshold = max(1, int(getattr(self.config, "memory_compactor_threshold", 500)))
  43. if data.count_items() > threshold:
  44. from .compactor import compact_scratchpad
  45. data = compact_scratchpad(data, max_items=threshold)
  46. payload = data.to_dict()
  47. payload.setdefault("meta", {})
  48. payload["meta"]["last_updated"] = now_iso()
  49. payload["meta"]["total_items"] = data.count_items()
  50. atomic_write_json(self.path, payload, use_lock=_use_lock, backup=True)
  51. def _key_for(self, item: MemoryItem) -> tuple[Any, ...]:
  52. return memory_item_key(item)
  53. def upsert_item(self, item: MemoryItem) -> Dict[str, int]:
  54. normalized = item.normalized()
  55. with self._lock:
  56. data = self.load()
  57. bucket = CATEGORY_TO_BUCKET[normalized.category]
  58. rows: List[MemoryItem] = list(getattr(data, bucket))
  59. target_key = self._key_for(normalized)
  60. outdated = 0
  61. replaced_existing = False
  62. new_rows: List[MemoryItem] = []
  63. for row in rows:
  64. row_key = self._key_for(row)
  65. if row_key == target_key and row.id != normalized.id:
  66. # 同 key 旧值降级为 outdated,保留审计轨迹
  67. if row.status != "outdated":
  68. row = MemoryItem(**{**asdict(row), "status": "outdated", "updated_at": now_iso()})
  69. outdated += 1
  70. replaced_existing = True
  71. elif row.id == normalized.id:
  72. replaced_existing = True
  73. continue
  74. new_rows.append(row)
  75. normalized.updated_at = normalized.updated_at or now_iso()
  76. new_rows.append(normalized)
  77. setattr(data, bucket, new_rows)
  78. self.save(data, _use_lock=False)
  79. return {
  80. "added": 0 if replaced_existing else 1,
  81. "updated": 1 if replaced_existing else 0,
  82. "outdated": outdated,
  83. }
  84. def mark_status(self, item_id: str, status: str) -> bool:
  85. if not item_id:
  86. return False
  87. with self._lock:
  88. data = self.load()
  89. updated = False
  90. for bucket in BUCKET_TO_CATEGORY:
  91. rows: List[MemoryItem] = getattr(data, bucket)
  92. for i, row in enumerate(rows):
  93. if row.id == item_id:
  94. rows[i] = MemoryItem(**{**asdict(row), "status": status, "updated_at": now_iso()})
  95. updated = True
  96. if updated:
  97. self.save(data, _use_lock=False)
  98. return updated
  99. def query(
  100. self,
  101. category: Optional[str] = None,
  102. subject: Optional[str] = None,
  103. status: Optional[str] = "active",
  104. ) -> List[MemoryItem]:
  105. data = self.load()
  106. categories = [category] if category else list(CATEGORY_TO_BUCKET.keys())
  107. result: List[MemoryItem] = []
  108. for cat in categories:
  109. bucket = CATEGORY_TO_BUCKET.get(cat)
  110. if not bucket:
  111. continue
  112. rows: List[MemoryItem] = getattr(data, bucket)
  113. for row in rows:
  114. if subject and row.subject != subject:
  115. continue
  116. if status and row.status != status:
  117. continue
  118. result.append(row)
  119. return result
  120. def stats(self) -> Dict[str, Any]:
  121. data = self.load()
  122. by_category: Dict[str, int] = {}
  123. active = 0
  124. outdated = 0
  125. contradicted = 0
  126. tentative = 0
  127. for category, bucket in CATEGORY_TO_BUCKET.items():
  128. rows: List[MemoryItem] = getattr(data, bucket)
  129. by_category[category] = len(rows)
  130. for row in rows:
  131. if row.status == "active":
  132. active += 1
  133. elif row.status == "outdated":
  134. outdated += 1
  135. elif row.status == "contradicted":
  136. contradicted += 1
  137. elif row.status == "tentative":
  138. tentative += 1
  139. return {
  140. "total": data.count_items(),
  141. "active": active,
  142. "outdated": outdated,
  143. "contradicted": contradicted,
  144. "tentative": tentative,
  145. "by_category": by_category,
  146. "path": str(self.path),
  147. }
  148. def dump(self) -> Dict[str, Any]:
  149. return self.load().to_dict()
  150. def conflicts(self) -> List[Dict[str, Any]]:
  151. data = self.load()
  152. conflicts: List[Dict[str, Any]] = []
  153. for category, bucket in CATEGORY_TO_BUCKET.items():
  154. key_count: Dict[tuple[Any, ...], int] = {}
  155. rows: List[MemoryItem] = getattr(data, bucket)
  156. for row in rows:
  157. if row.status != "active":
  158. continue
  159. key = self._key_for(row)
  160. key_count[key] = key_count.get(key, 0) + 1
  161. for key, cnt in key_count.items():
  162. if cnt > 1:
  163. conflicts.append({"category": category, "key": list(key), "active_items": cnt})
  164. return conflicts
  165. def main() -> None:
  166. import argparse
  167. import sys
  168. parser = argparse.ArgumentParser(description="Memory Scratchpad CLI")
  169. parser.add_argument("--project-root", type=str, help="项目根目录")
  170. sub = parser.add_subparsers(dest="command", required=True)
  171. sub.add_parser("stats")
  172. p_query = sub.add_parser("query")
  173. p_query.add_argument("--category", type=str, default=None)
  174. p_query.add_argument("--subject", type=str, default=None)
  175. p_query.add_argument("--status", type=str, default="active")
  176. sub.add_parser("dump")
  177. sub.add_parser("conflicts")
  178. p_update = sub.add_parser("update")
  179. p_update.add_argument("--chapter", type=int, required=True)
  180. p_update.add_argument("--data", required=True, help="章节结构化结果 JSON")
  181. sub.add_parser("bootstrap")
  182. args = parser.parse_args(normalize_global_project_root(sys.argv[1:]))
  183. config = None
  184. if args.project_root:
  185. from project_locator import resolve_project_root
  186. resolved_root = resolve_project_root(args.project_root)
  187. config = DataModulesConfig.from_project_root(resolved_root)
  188. manager = ScratchpadManager(config)
  189. if args.command == "stats":
  190. print_success(manager.stats(), message="memory_stats")
  191. return
  192. if args.command == "dump":
  193. print_success(manager.dump(), message="memory_dump")
  194. return
  195. if args.command == "conflicts":
  196. print_success(manager.conflicts(), message="memory_conflicts")
  197. return
  198. if args.command == "query":
  199. rows = [row.to_dict() for row in manager.query(args.category, args.subject, args.status)]
  200. print_success(rows, message="memory_query")
  201. return
  202. if args.command == "update":
  203. from .writer import MemoryWriter
  204. payload = load_json_arg(args.data)
  205. writer = MemoryWriter(config or get_config())
  206. result = writer.update_from_chapter_result(args.chapter, payload)
  207. print_success(result, message="memory_updated")
  208. return
  209. if args.command == "bootstrap":
  210. from .bootstrap import bootstrap_from_index
  211. result = bootstrap_from_index(config or get_config())
  212. print_success(result, message="memory_bootstrapped")
  213. return
  214. print_error("UNKNOWN_COMMAND", "未知命令", suggestion="请查看 --help")