github-actions
Deploy to Hugging Face
c794b6b
Raw
History Blame Contribute Delete
2.3 kB
"""
Atomic JSON persistence with per-file locking.
All backend mutable JSON stores should use save_json / load_json from this module.
"""
from __future__ import annotations
import json
import logging
import os
import tempfile
import threading
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
_locks: dict[str, threading.RLock] = {}
_lock_guard = threading.Lock()
def _file_lock(path: str) -> threading.RLock:
key = os.path.abspath(path)
with _lock_guard:
if key not in _locks:
_locks[key] = threading.RLock()
return _locks[key]
def load_json(path: str, default: Any = None) -> Any:
"""Load JSON from path; return default if missing or corrupt."""
if default is None:
default = {}
if not os.path.exists(path):
return default
lock = _file_lock(path)
with lock:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as exc:
logger.error("Failed to load %s: %s", path, exc)
return default
def save_json(path: str, data: Any) -> None:
"""Atomically write JSON using temp file + os.replace."""
lock = _file_lock(path)
with lock:
dir_name = os.path.dirname(path) or "."
os.makedirs(dir_name, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=dir_name, prefix=".tmp_", suffix=".json")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
except Exception:
if os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def append_bounded(path: str, items: list, max_items: int, new_item: Any) -> list:
"""Load list, prepend item, trim, save — all under one lock."""
lock = _file_lock(path)
with lock:
current = load_json(path, [])
if not isinstance(current, list):
current = []
current.insert(0, new_item)
trimmed = current[:max_items]
save_json(path, trimmed)
return trimmed