|
|
@@ -27,11 +27,14 @@ try:
|
|
|
except ImportError: # pragma: no cover
|
|
|
from scripts.security_utils import atomic_write_json, read_json_safe
|
|
|
|
|
|
+from filelock import FileLock
|
|
|
+
|
|
|
|
|
|
class ScratchpadManager:
|
|
|
def __init__(self, config: DataModulesConfig | None = None):
|
|
|
self.config = config or get_config()
|
|
|
self.path = Path(self.config.scratchpad_file)
|
|
|
+ self._lock = FileLock(str(self.path) + ".lock", timeout=30)
|
|
|
|
|
|
def load(self) -> ScratchpadData:
|
|
|
if not self.path.exists():
|
|
|
@@ -41,7 +44,7 @@ class ScratchpadManager:
|
|
|
return ScratchpadData.empty()
|
|
|
return ScratchpadData.from_dict(payload)
|
|
|
|
|
|
- def save(self, data: ScratchpadData) -> None:
|
|
|
+ def save(self, data: ScratchpadData, _use_lock: bool = True) -> None:
|
|
|
self.config.ensure_dirs()
|
|
|
if bool(getattr(self.config, "memory_compactor_enabled", True)):
|
|
|
threshold = max(1, int(getattr(self.config, "memory_compactor_threshold", 500)))
|
|
|
@@ -53,7 +56,7 @@ class ScratchpadManager:
|
|
|
payload.setdefault("meta", {})
|
|
|
payload["meta"]["last_updated"] = now_iso()
|
|
|
payload["meta"]["total_items"] = data.count_items()
|
|
|
- atomic_write_json(self.path, payload, use_lock=True, backup=True)
|
|
|
+ atomic_write_json(self.path, payload, use_lock=_use_lock, backup=True)
|
|
|
|
|
|
def _key_for(self, item: MemoryItem) -> tuple[Any, ...]:
|
|
|
rule = CATEGORY_KEY_RULES.get(item.category)
|
|
|
@@ -66,31 +69,32 @@ class ScratchpadManager:
|
|
|
|
|
|
def upsert_item(self, item: MemoryItem) -> Dict[str, int]:
|
|
|
normalized = item.normalized()
|
|
|
- data = self.load()
|
|
|
- bucket = CATEGORY_TO_BUCKET[normalized.category]
|
|
|
- rows: List[MemoryItem] = list(getattr(data, bucket))
|
|
|
- target_key = self._key_for(normalized)
|
|
|
-
|
|
|
- outdated = 0
|
|
|
- replaced_existing = False
|
|
|
- new_rows: List[MemoryItem] = []
|
|
|
- for row in rows:
|
|
|
- row_key = self._key_for(row)
|
|
|
- if row_key == target_key and row.id != normalized.id:
|
|
|
- # 同 key 旧值降级为 outdated,保留审计轨迹
|
|
|
- if row.status != "outdated":
|
|
|
- row = MemoryItem(**{**asdict(row), "status": "outdated", "updated_at": now_iso()})
|
|
|
- outdated += 1
|
|
|
- replaced_existing = True
|
|
|
- elif row.id == normalized.id:
|
|
|
- replaced_existing = True
|
|
|
- continue
|
|
|
- new_rows.append(row)
|
|
|
+ with self._lock:
|
|
|
+ data = self.load()
|
|
|
+ bucket = CATEGORY_TO_BUCKET[normalized.category]
|
|
|
+ rows: List[MemoryItem] = list(getattr(data, bucket))
|
|
|
+ target_key = self._key_for(normalized)
|
|
|
+
|
|
|
+ outdated = 0
|
|
|
+ replaced_existing = False
|
|
|
+ new_rows: List[MemoryItem] = []
|
|
|
+ for row in rows:
|
|
|
+ row_key = self._key_for(row)
|
|
|
+ if row_key == target_key and row.id != normalized.id:
|
|
|
+ # 同 key 旧值降级为 outdated,保留审计轨迹
|
|
|
+ if row.status != "outdated":
|
|
|
+ row = MemoryItem(**{**asdict(row), "status": "outdated", "updated_at": now_iso()})
|
|
|
+ outdated += 1
|
|
|
+ replaced_existing = True
|
|
|
+ elif row.id == normalized.id:
|
|
|
+ replaced_existing = True
|
|
|
+ continue
|
|
|
+ new_rows.append(row)
|
|
|
|
|
|
- normalized.updated_at = normalized.updated_at or now_iso()
|
|
|
- new_rows.append(normalized)
|
|
|
- setattr(data, bucket, new_rows)
|
|
|
- self.save(data)
|
|
|
+ normalized.updated_at = normalized.updated_at or now_iso()
|
|
|
+ new_rows.append(normalized)
|
|
|
+ setattr(data, bucket, new_rows)
|
|
|
+ self.save(data, _use_lock=False)
|
|
|
|
|
|
return {
|
|
|
"added": 0 if replaced_existing else 1,
|
|
|
@@ -101,16 +105,17 @@ class ScratchpadManager:
|
|
|
def mark_status(self, item_id: str, status: str) -> bool:
|
|
|
if not item_id:
|
|
|
return False
|
|
|
- data = self.load()
|
|
|
- updated = False
|
|
|
- for bucket in BUCKET_TO_CATEGORY:
|
|
|
- rows: List[MemoryItem] = getattr(data, bucket)
|
|
|
- for i, row in enumerate(rows):
|
|
|
- if row.id == item_id:
|
|
|
- rows[i] = MemoryItem(**{**asdict(row), "status": status, "updated_at": now_iso()})
|
|
|
- updated = True
|
|
|
- if updated:
|
|
|
- self.save(data)
|
|
|
+ with self._lock:
|
|
|
+ data = self.load()
|
|
|
+ updated = False
|
|
|
+ for bucket in BUCKET_TO_CATEGORY:
|
|
|
+ rows: List[MemoryItem] = getattr(data, bucket)
|
|
|
+ for i, row in enumerate(rows):
|
|
|
+ if row.id == item_id:
|
|
|
+ rows[i] = MemoryItem(**{**asdict(row), "status": status, "updated_at": now_iso()})
|
|
|
+ updated = True
|
|
|
+ if updated:
|
|
|
+ self.save(data, _use_lock=False)
|
|
|
return updated
|
|
|
|
|
|
def query(
|