visheshrathi's picture
Upload folder using huggingface_hub
5850885 verified
"""JSON-backed playbook and drift-card store with file locking.
Each store file is a JSON array. Appends go through a single locked
read-modify-write cycle that writes to a sibling ``*.tmp`` file and
atomically ``os.replace``s it onto the target path, so a crash can only
leave either the old array or the new one — never a truncated file.
The lock is held on a dedicated ``*.lock`` file via ``fcntl.flock`` with
a caller-configurable timeout (default 5s). We never lock the data file
itself: that way an ``os.replace`` inside the critical section can't
race against a reader holding a shared lock on the old inode.
Reads are cached by mtime so hot-path episodes don't re-parse the file
on every ``reset()``. Corrupt trailers (from a pre-atomic-write era or
a partial disk write) log a warning and fall back to empty — we prefer
a running trainer over one that dies because of a bad card.
"""
from __future__ import annotations
import contextlib
import json
import os
import time
from collections.abc import Callable, Iterator
from dataclasses import asdict
from pathlib import Path
from typing import Any, Literal, TypeVar, cast
from skill_library.entries import DriftAdaptationCard, PlaybookEntry
from utilities.logger import get_module_logger
_LOG = get_module_logger(__name__)
DEFAULT_STORE_DIR = Path("outputs") / "skill_library"
PLAYBOOK_FILENAME = "playbook.json"
DRIFT_CARDS_FILENAME = "drift_cards.json"
DEFAULT_LOCK_TIMEOUT_S: float = 5.0
T = TypeVar("T")
try:
import fcntl
def _try_lock_exclusive(fh: Any) -> bool:
try:
fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except BlockingIOError:
return False
def _unlock(fh: Any) -> None:
fcntl.flock(fh.fileno(), fcntl.LOCK_UN)
_HAS_FCNTL = True
except ImportError:
_HAS_FCNTL = False
def _try_lock_exclusive(fh: Any) -> bool:
return True
def _unlock(fh: Any) -> None:
return None
@contextlib.contextmanager
def _locked(path: Path, timeout_s: float) -> Iterator[None]:
"""Poll-acquire an exclusive flock on ``path`` within ``timeout_s``."""
path.parent.mkdir(parents=True, exist_ok=True)
deadline = time.monotonic() + timeout_s
with path.open("a+") as fh:
while not _try_lock_exclusive(fh):
if time.monotonic() >= deadline:
raise TimeoutError(f"could not acquire {path} within {timeout_s}s")
time.sleep(0.02)
try:
yield
finally:
if _HAS_FCNTL:
_unlock(fh)
def _atomic_write_json(path: Path, payload: list[dict[str, Any]]) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
text = json.dumps(payload, indent=2)
with tmp.open("w", encoding="utf-8") as fh:
fh.write(text)
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp, path)
def _read_json_array(path: Path) -> list[Any]:
if not path.exists():
return []
try:
raw = path.read_text(encoding="utf-8")
except OSError as exc:
_LOG.warning("skill-store read failed for %s: %s", path, exc)
return []
if not raw.strip():
return []
try:
data = json.loads(raw)
except json.JSONDecodeError as exc:
_LOG.warning("skill-store corrupt at %s (%s); returning empty", path, exc)
return []
return data if isinstance(data, list) else []
class Store:
"""Append-only JSON store for learned playbook entries + drift cards."""
def __init__(
self,
directory: Path | None = None,
lock_timeout_s: float = DEFAULT_LOCK_TIMEOUT_S,
) -> None:
self.dir = Path(directory) if directory is not None else DEFAULT_STORE_DIR
self.lock_timeout_s = lock_timeout_s
self._playbook_cache: tuple[tuple[PlaybookEntry, ...], float] | None = None
self._drift_cache: tuple[tuple[DriftAdaptationCard, ...], float] | None = None
def playbook_path(self) -> Path:
return self.dir / PLAYBOOK_FILENAME
def drift_cards_path(self) -> Path:
return self.dir / DRIFT_CARDS_FILENAME
def read_playbook(self) -> tuple[PlaybookEntry, ...]:
return self._read_cached(
self.playbook_path(),
cache_attr="_playbook_cache",
decode=_entry_from_dict,
)
def read_drift_cards(self) -> tuple[DriftAdaptationCard, ...]:
return self._read_cached(
self.drift_cards_path(),
cache_attr="_drift_cache",
decode=lambda d: DriftAdaptationCard(**d),
)
def append_playbook(self, entry: PlaybookEntry) -> None:
self._locked_append(
self.playbook_path(),
encode_new=_entry_to_dict,
new_item=entry,
)
self._playbook_cache = None
def append_drift_card(self, card: DriftAdaptationCard) -> None:
self._locked_append(
self.drift_cards_path(),
encode_new=asdict,
new_item=card,
)
self._drift_cache = None
def _read_cached(
self,
path: Path,
*,
cache_attr: str,
decode: Callable[[dict[str, Any]], T],
) -> tuple[T, ...]:
mtime = _safe_mtime(path)
# ``getattr``/``setattr`` is intentional — the same implementation
# services both the playbook and drift-card caches, whose Python
# types differ. The cast below restores the precise
# ``(tuple[T, ...], float) | None`` shape for mypy.
cached = cast("tuple[tuple[T, ...], float] | None", getattr(self, cache_attr))
if cached is not None and cached[1] == mtime:
return cached[0]
items: list[T] = []
for d in _read_json_array(path):
try:
items.append(decode(d))
except (TypeError, KeyError, ValueError) as exc:
_LOG.warning("skipping malformed store entry %s: %s", d, exc)
tup = tuple(items)
setattr(self, cache_attr, (tup, mtime))
return tup
def _locked_append(
self,
path: Path,
*,
encode_new: Callable[[Any], dict[str, Any]],
new_item: Any,
) -> None:
self.dir.mkdir(parents=True, exist_ok=True)
lock_path = path.with_suffix(path.suffix + ".lock")
with _locked(lock_path, self.lock_timeout_s):
existing = _read_json_array(path)
existing.append(encode_new(new_item))
_atomic_write_json(path, existing)
def _safe_mtime(path: Path) -> float:
try:
return path.stat().st_mtime
except FileNotFoundError:
return 0.0
def _entry_to_dict(e: PlaybookEntry) -> dict[str, Any]:
return {
"tag_set": sorted(e.tag_set),
"before_snippet": e.before_snippet,
"after_snippet": e.after_snippet,
"avg_speedup": e.avg_speedup,
"scenario_family": e.scenario_family,
"source": e.source,
}
def _entry_from_dict(d: dict[str, Any]) -> PlaybookEntry:
source: Literal["preseed", "learned"] = d.get("source", "learned")
return PlaybookEntry(
tag_set=frozenset(d.get("tag_set") or []),
before_snippet=d["before_snippet"],
after_snippet=d["after_snippet"],
avg_speedup=float(d["avg_speedup"]),
scenario_family=d["scenario_family"],
source=source,
)
def cleanup_stale_session_dirs(root: Path, ttl_hours: float) -> int:
"""Remove session subdirectories under *root* whose mtime is older than *ttl_hours*.
Returns the number of directories removed. Errors on individual
subdirectories are logged and skipped so a single bad entry cannot abort
the sweep. Pass ``ttl_hours=0`` to disable (returns 0 immediately).
"""
import shutil
if ttl_hours <= 0 or not root.exists():
return 0
cutoff = time.time() - ttl_hours * 3600
removed = 0
for session_dir in root.iterdir():
if not session_dir.is_dir():
continue
try:
if session_dir.stat().st_mtime < cutoff:
shutil.rmtree(session_dir, ignore_errors=True)
removed += 1
except OSError as exc:
_LOG.warning("cleanup_stale_session_dirs: skipping %s: %s", session_dir, exc)
return removed
__all__ = ["DEFAULT_LOCK_TIMEOUT_S", "DEFAULT_STORE_DIR", "Store", "cleanup_stale_session_dirs"]