""" Atomic JSON persistence with per-file locking. All backend mutable JSON stores should use save_json / load_json from this module. """ from __future__ import annotations import json import logging import os import tempfile import threading from pathlib import Path from typing import Any logger = logging.getLogger(__name__) _locks: dict[str, threading.RLock] = {} _lock_guard = threading.Lock() def _file_lock(path: str) -> threading.RLock: key = os.path.abspath(path) with _lock_guard: if key not in _locks: _locks[key] = threading.RLock() return _locks[key] def load_json(path: str, default: Any = None) -> Any: """Load JSON from path; return default if missing or corrupt.""" if default is None: default = {} if not os.path.exists(path): return default lock = _file_lock(path) with lock: try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception as exc: logger.error("Failed to load %s: %s", path, exc) return default def save_json(path: str, data: Any) -> None: """Atomically write JSON using temp file + os.replace.""" lock = _file_lock(path) with lock: dir_name = os.path.dirname(path) or "." os.makedirs(dir_name, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=dir_name, prefix=".tmp_", suffix=".json") try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False) f.flush() os.fsync(f.fileno()) os.replace(tmp_path, path) except Exception: if os.path.exists(tmp_path): try: os.unlink(tmp_path) except OSError: pass raise def append_bounded(path: str, items: list, max_items: int, new_item: Any) -> list: """Load list, prepend item, trim, save — all under one lock.""" lock = _file_lock(path) with lock: current = load_json(path, []) if not isinstance(current, list): current = [] current.insert(0, new_item) trimmed = current[:max_items] save_json(path, trimmed) return trimmed