Spaces:
Sleeping
Sleeping
| """JSON-backed playbook and drift-card store with file locking. | |
| Each store file is a JSON array. Appends go through a single locked | |
| read-modify-write cycle that writes to a sibling ``*.tmp`` file and | |
| atomically ``os.replace``s it onto the target path, so a crash can only | |
| leave either the old array or the new one — never a truncated file. | |
| The lock is held on a dedicated ``*.lock`` file via ``fcntl.flock`` with | |
| a caller-configurable timeout (default 5s). We never lock the data file | |
| itself: that way an ``os.replace`` inside the critical section can't | |
| race against a reader holding a shared lock on the old inode. | |
| Reads are cached by mtime so hot-path episodes don't re-parse the file | |
| on every ``reset()``. Corrupt trailers (from a pre-atomic-write era or | |
| a partial disk write) log a warning and fall back to empty — we prefer | |
| a running trainer over one that dies because of a bad card. | |
| """ | |
| from __future__ import annotations | |
| import contextlib | |
| import json | |
| import os | |
| import time | |
| from collections.abc import Callable, Iterator | |
| from dataclasses import asdict | |
| from pathlib import Path | |
| from typing import Any, Literal, TypeVar, cast | |
| from skill_library.entries import DriftAdaptationCard, PlaybookEntry | |
| from utilities.logger import get_module_logger | |
| _LOG = get_module_logger(__name__) | |
| DEFAULT_STORE_DIR = Path("outputs") / "skill_library" | |
| PLAYBOOK_FILENAME = "playbook.json" | |
| DRIFT_CARDS_FILENAME = "drift_cards.json" | |
| DEFAULT_LOCK_TIMEOUT_S: float = 5.0 | |
| T = TypeVar("T") | |
| try: | |
| import fcntl | |
| def _try_lock_exclusive(fh: Any) -> bool: | |
| try: | |
| fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) | |
| return True | |
| except BlockingIOError: | |
| return False | |
| def _unlock(fh: Any) -> None: | |
| fcntl.flock(fh.fileno(), fcntl.LOCK_UN) | |
| _HAS_FCNTL = True | |
| except ImportError: | |
| _HAS_FCNTL = False | |
| def _try_lock_exclusive(fh: Any) -> bool: | |
| return True | |
| def _unlock(fh: Any) -> None: | |
| return None | |
| def _locked(path: Path, timeout_s: float) -> Iterator[None]: | |
| """Poll-acquire an exclusive flock on ``path`` within ``timeout_s``.""" | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| deadline = time.monotonic() + timeout_s | |
| with path.open("a+") as fh: | |
| while not _try_lock_exclusive(fh): | |
| if time.monotonic() >= deadline: | |
| raise TimeoutError(f"could not acquire {path} within {timeout_s}s") | |
| time.sleep(0.02) | |
| try: | |
| yield | |
| finally: | |
| if _HAS_FCNTL: | |
| _unlock(fh) | |
| def _atomic_write_json(path: Path, payload: list[dict[str, Any]]) -> None: | |
| tmp = path.with_suffix(path.suffix + ".tmp") | |
| text = json.dumps(payload, indent=2) | |
| with tmp.open("w", encoding="utf-8") as fh: | |
| fh.write(text) | |
| fh.flush() | |
| os.fsync(fh.fileno()) | |
| os.replace(tmp, path) | |
| def _read_json_array(path: Path) -> list[Any]: | |
| if not path.exists(): | |
| return [] | |
| try: | |
| raw = path.read_text(encoding="utf-8") | |
| except OSError as exc: | |
| _LOG.warning("skill-store read failed for %s: %s", path, exc) | |
| return [] | |
| if not raw.strip(): | |
| return [] | |
| try: | |
| data = json.loads(raw) | |
| except json.JSONDecodeError as exc: | |
| _LOG.warning("skill-store corrupt at %s (%s); returning empty", path, exc) | |
| return [] | |
| return data if isinstance(data, list) else [] | |
| class Store: | |
| """Append-only JSON store for learned playbook entries + drift cards.""" | |
| def __init__( | |
| self, | |
| directory: Path | None = None, | |
| lock_timeout_s: float = DEFAULT_LOCK_TIMEOUT_S, | |
| ) -> None: | |
| self.dir = Path(directory) if directory is not None else DEFAULT_STORE_DIR | |
| self.lock_timeout_s = lock_timeout_s | |
| self._playbook_cache: tuple[tuple[PlaybookEntry, ...], float] | None = None | |
| self._drift_cache: tuple[tuple[DriftAdaptationCard, ...], float] | None = None | |
| def playbook_path(self) -> Path: | |
| return self.dir / PLAYBOOK_FILENAME | |
| def drift_cards_path(self) -> Path: | |
| return self.dir / DRIFT_CARDS_FILENAME | |
| def read_playbook(self) -> tuple[PlaybookEntry, ...]: | |
| return self._read_cached( | |
| self.playbook_path(), | |
| cache_attr="_playbook_cache", | |
| decode=_entry_from_dict, | |
| ) | |
| def read_drift_cards(self) -> tuple[DriftAdaptationCard, ...]: | |
| return self._read_cached( | |
| self.drift_cards_path(), | |
| cache_attr="_drift_cache", | |
| decode=lambda d: DriftAdaptationCard(**d), | |
| ) | |
| def append_playbook(self, entry: PlaybookEntry) -> None: | |
| self._locked_append( | |
| self.playbook_path(), | |
| encode_new=_entry_to_dict, | |
| new_item=entry, | |
| ) | |
| self._playbook_cache = None | |
| def append_drift_card(self, card: DriftAdaptationCard) -> None: | |
| self._locked_append( | |
| self.drift_cards_path(), | |
| encode_new=asdict, | |
| new_item=card, | |
| ) | |
| self._drift_cache = None | |
| def _read_cached( | |
| self, | |
| path: Path, | |
| *, | |
| cache_attr: str, | |
| decode: Callable[[dict[str, Any]], T], | |
| ) -> tuple[T, ...]: | |
| mtime = _safe_mtime(path) | |
| # ``getattr``/``setattr`` is intentional — the same implementation | |
| # services both the playbook and drift-card caches, whose Python | |
| # types differ. The cast below restores the precise | |
| # ``(tuple[T, ...], float) | None`` shape for mypy. | |
| cached = cast("tuple[tuple[T, ...], float] | None", getattr(self, cache_attr)) | |
| if cached is not None and cached[1] == mtime: | |
| return cached[0] | |
| items: list[T] = [] | |
| for d in _read_json_array(path): | |
| try: | |
| items.append(decode(d)) | |
| except (TypeError, KeyError, ValueError) as exc: | |
| _LOG.warning("skipping malformed store entry %s: %s", d, exc) | |
| tup = tuple(items) | |
| setattr(self, cache_attr, (tup, mtime)) | |
| return tup | |
| def _locked_append( | |
| self, | |
| path: Path, | |
| *, | |
| encode_new: Callable[[Any], dict[str, Any]], | |
| new_item: Any, | |
| ) -> None: | |
| self.dir.mkdir(parents=True, exist_ok=True) | |
| lock_path = path.with_suffix(path.suffix + ".lock") | |
| with _locked(lock_path, self.lock_timeout_s): | |
| existing = _read_json_array(path) | |
| existing.append(encode_new(new_item)) | |
| _atomic_write_json(path, existing) | |
| def _safe_mtime(path: Path) -> float: | |
| try: | |
| return path.stat().st_mtime | |
| except FileNotFoundError: | |
| return 0.0 | |
| def _entry_to_dict(e: PlaybookEntry) -> dict[str, Any]: | |
| return { | |
| "tag_set": sorted(e.tag_set), | |
| "before_snippet": e.before_snippet, | |
| "after_snippet": e.after_snippet, | |
| "avg_speedup": e.avg_speedup, | |
| "scenario_family": e.scenario_family, | |
| "source": e.source, | |
| } | |
| def _entry_from_dict(d: dict[str, Any]) -> PlaybookEntry: | |
| source: Literal["preseed", "learned"] = d.get("source", "learned") | |
| return PlaybookEntry( | |
| tag_set=frozenset(d.get("tag_set") or []), | |
| before_snippet=d["before_snippet"], | |
| after_snippet=d["after_snippet"], | |
| avg_speedup=float(d["avg_speedup"]), | |
| scenario_family=d["scenario_family"], | |
| source=source, | |
| ) | |
| def cleanup_stale_session_dirs(root: Path, ttl_hours: float) -> int: | |
| """Remove session subdirectories under *root* whose mtime is older than *ttl_hours*. | |
| Returns the number of directories removed. Errors on individual | |
| subdirectories are logged and skipped so a single bad entry cannot abort | |
| the sweep. Pass ``ttl_hours=0`` to disable (returns 0 immediately). | |
| """ | |
| import shutil | |
| if ttl_hours <= 0 or not root.exists(): | |
| return 0 | |
| cutoff = time.time() - ttl_hours * 3600 | |
| removed = 0 | |
| for session_dir in root.iterdir(): | |
| if not session_dir.is_dir(): | |
| continue | |
| try: | |
| if session_dir.stat().st_mtime < cutoff: | |
| shutil.rmtree(session_dir, ignore_errors=True) | |
| removed += 1 | |
| except OSError as exc: | |
| _LOG.warning("cleanup_stale_session_dirs: skipping %s: %s", session_dir, exc) | |
| return removed | |
| __all__ = ["DEFAULT_LOCK_TIMEOUT_S", "DEFAULT_STORE_DIR", "Store", "cleanup_stale_session_dirs"] | |