| """ |
| Atomic JSON persistence with per-file locking. |
| All backend mutable JSON stores should use save_json / load_json from this module. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import tempfile |
| import threading |
| from pathlib import Path |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _locks: dict[str, threading.RLock] = {} |
| _lock_guard = threading.Lock() |
|
|
|
|
| def _file_lock(path: str) -> threading.RLock: |
| key = os.path.abspath(path) |
| with _lock_guard: |
| if key not in _locks: |
| _locks[key] = threading.RLock() |
| return _locks[key] |
|
|
|
|
| def load_json(path: str, default: Any = None) -> Any: |
| """Load JSON from path; return default if missing or corrupt.""" |
| if default is None: |
| default = {} |
| if not os.path.exists(path): |
| return default |
| lock = _file_lock(path) |
| with lock: |
| try: |
| with open(path, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception as exc: |
| logger.error("Failed to load %s: %s", path, exc) |
| return default |
|
|
|
|
| def save_json(path: str, data: Any) -> None: |
| """Atomically write JSON using temp file + os.replace.""" |
| lock = _file_lock(path) |
| with lock: |
| dir_name = os.path.dirname(path) or "." |
| os.makedirs(dir_name, exist_ok=True) |
| fd, tmp_path = tempfile.mkstemp(dir=dir_name, prefix=".tmp_", suffix=".json") |
| try: |
| with os.fdopen(fd, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False) |
| f.flush() |
| os.fsync(f.fileno()) |
| os.replace(tmp_path, path) |
| except Exception: |
| if os.path.exists(tmp_path): |
| try: |
| os.unlink(tmp_path) |
| except OSError: |
| pass |
| raise |
|
|
|
|
| def append_bounded(path: str, items: list, max_items: int, new_item: Any) -> list: |
| """Load list, prepend item, trim, save — all under one lock.""" |
| lock = _file_lock(path) |
| with lock: |
| current = load_json(path, []) |
| if not isinstance(current, list): |
| current = [] |
| current.insert(0, new_item) |
| trimmed = current[:max_items] |
| save_json(path, trimmed) |
| return trimmed |
|
|