File size: 8,418 Bytes
5850885
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""JSON-backed playbook and drift-card store with file locking.

Each store file is a JSON array. Appends go through a single locked
read-modify-write cycle that writes to a sibling ``*.tmp`` file and
atomically ``os.replace``s it onto the target path, so a crash can only
leave either the old array or the new one — never a truncated file.

The lock is held on a dedicated ``*.lock`` file via ``fcntl.flock`` with
a caller-configurable timeout (default 5s). We never lock the data file
itself: that way an ``os.replace`` inside the critical section can't
race against a reader holding a shared lock on the old inode.

Reads are cached by mtime so hot-path episodes don't re-parse the file
on every ``reset()``. Corrupt trailers (from a pre-atomic-write era or
a partial disk write) log a warning and fall back to empty — we prefer
a running trainer over one that dies because of a bad card.
"""

from __future__ import annotations

import contextlib
import json
import os
import time
from collections.abc import Callable, Iterator
from dataclasses import asdict
from pathlib import Path
from typing import Any, Literal, TypeVar, cast

from skill_library.entries import DriftAdaptationCard, PlaybookEntry
from utilities.logger import get_module_logger

_LOG = get_module_logger(__name__)

DEFAULT_STORE_DIR = Path("outputs") / "skill_library"
PLAYBOOK_FILENAME = "playbook.json"
DRIFT_CARDS_FILENAME = "drift_cards.json"
DEFAULT_LOCK_TIMEOUT_S: float = 5.0

T = TypeVar("T")


try:
    import fcntl

    def _try_lock_exclusive(fh: Any) -> bool:
        try:
            fcntl.flock(fh.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
            return True
        except BlockingIOError:
            return False

    def _unlock(fh: Any) -> None:
        fcntl.flock(fh.fileno(), fcntl.LOCK_UN)

    _HAS_FCNTL = True

except ImportError:
    _HAS_FCNTL = False

    def _try_lock_exclusive(fh: Any) -> bool:
        return True

    def _unlock(fh: Any) -> None:
        return None


@contextlib.contextmanager
def _locked(path: Path, timeout_s: float) -> Iterator[None]:
    """Poll-acquire an exclusive flock on ``path`` within ``timeout_s``."""
    path.parent.mkdir(parents=True, exist_ok=True)
    deadline = time.monotonic() + timeout_s
    with path.open("a+") as fh:
        while not _try_lock_exclusive(fh):
            if time.monotonic() >= deadline:
                raise TimeoutError(f"could not acquire {path} within {timeout_s}s")
            time.sleep(0.02)
        try:
            yield
        finally:
            if _HAS_FCNTL:
                _unlock(fh)


def _atomic_write_json(path: Path, payload: list[dict[str, Any]]) -> None:
    tmp = path.with_suffix(path.suffix + ".tmp")
    text = json.dumps(payload, indent=2)
    with tmp.open("w", encoding="utf-8") as fh:
        fh.write(text)
        fh.flush()
        os.fsync(fh.fileno())
    os.replace(tmp, path)


def _read_json_array(path: Path) -> list[Any]:
    if not path.exists():
        return []
    try:
        raw = path.read_text(encoding="utf-8")
    except OSError as exc:
        _LOG.warning("skill-store read failed for %s: %s", path, exc)
        return []
    if not raw.strip():
        return []
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as exc:
        _LOG.warning("skill-store corrupt at %s (%s); returning empty", path, exc)
        return []
    return data if isinstance(data, list) else []


class Store:
    """Append-only JSON store for learned playbook entries + drift cards."""

    def __init__(
        self,
        directory: Path | None = None,
        lock_timeout_s: float = DEFAULT_LOCK_TIMEOUT_S,
    ) -> None:
        self.dir = Path(directory) if directory is not None else DEFAULT_STORE_DIR
        self.lock_timeout_s = lock_timeout_s
        self._playbook_cache: tuple[tuple[PlaybookEntry, ...], float] | None = None
        self._drift_cache: tuple[tuple[DriftAdaptationCard, ...], float] | None = None

    def playbook_path(self) -> Path:
        return self.dir / PLAYBOOK_FILENAME

    def drift_cards_path(self) -> Path:
        return self.dir / DRIFT_CARDS_FILENAME

    def read_playbook(self) -> tuple[PlaybookEntry, ...]:
        return self._read_cached(
            self.playbook_path(),
            cache_attr="_playbook_cache",
            decode=_entry_from_dict,
        )

    def read_drift_cards(self) -> tuple[DriftAdaptationCard, ...]:
        return self._read_cached(
            self.drift_cards_path(),
            cache_attr="_drift_cache",
            decode=lambda d: DriftAdaptationCard(**d),
        )

    def append_playbook(self, entry: PlaybookEntry) -> None:
        self._locked_append(
            self.playbook_path(),
            encode_new=_entry_to_dict,
            new_item=entry,
        )
        self._playbook_cache = None

    def append_drift_card(self, card: DriftAdaptationCard) -> None:
        self._locked_append(
            self.drift_cards_path(),
            encode_new=asdict,
            new_item=card,
        )
        self._drift_cache = None

    def _read_cached(
        self,
        path: Path,
        *,
        cache_attr: str,
        decode: Callable[[dict[str, Any]], T],
    ) -> tuple[T, ...]:
        mtime = _safe_mtime(path)
        # ``getattr``/``setattr`` is intentional — the same implementation
        # services both the playbook and drift-card caches, whose Python
        # types differ. The cast below restores the precise
        # ``(tuple[T, ...], float) | None`` shape for mypy.
        cached = cast("tuple[tuple[T, ...], float] | None", getattr(self, cache_attr))
        if cached is not None and cached[1] == mtime:
            return cached[0]
        items: list[T] = []
        for d in _read_json_array(path):
            try:
                items.append(decode(d))
            except (TypeError, KeyError, ValueError) as exc:
                _LOG.warning("skipping malformed store entry %s: %s", d, exc)
        tup = tuple(items)
        setattr(self, cache_attr, (tup, mtime))
        return tup

    def _locked_append(
        self,
        path: Path,
        *,
        encode_new: Callable[[Any], dict[str, Any]],
        new_item: Any,
    ) -> None:
        self.dir.mkdir(parents=True, exist_ok=True)
        lock_path = path.with_suffix(path.suffix + ".lock")
        with _locked(lock_path, self.lock_timeout_s):
            existing = _read_json_array(path)
            existing.append(encode_new(new_item))
            _atomic_write_json(path, existing)


def _safe_mtime(path: Path) -> float:
    try:
        return path.stat().st_mtime
    except FileNotFoundError:
        return 0.0


def _entry_to_dict(e: PlaybookEntry) -> dict[str, Any]:
    return {
        "tag_set": sorted(e.tag_set),
        "before_snippet": e.before_snippet,
        "after_snippet": e.after_snippet,
        "avg_speedup": e.avg_speedup,
        "scenario_family": e.scenario_family,
        "source": e.source,
    }


def _entry_from_dict(d: dict[str, Any]) -> PlaybookEntry:
    source: Literal["preseed", "learned"] = d.get("source", "learned")
    return PlaybookEntry(
        tag_set=frozenset(d.get("tag_set") or []),
        before_snippet=d["before_snippet"],
        after_snippet=d["after_snippet"],
        avg_speedup=float(d["avg_speedup"]),
        scenario_family=d["scenario_family"],
        source=source,
    )


def cleanup_stale_session_dirs(root: Path, ttl_hours: float) -> int:
    """Remove session subdirectories under *root* whose mtime is older than *ttl_hours*.

    Returns the number of directories removed.  Errors on individual
    subdirectories are logged and skipped so a single bad entry cannot abort
    the sweep.  Pass ``ttl_hours=0`` to disable (returns 0 immediately).
    """
    import shutil

    if ttl_hours <= 0 or not root.exists():
        return 0
    cutoff = time.time() - ttl_hours * 3600
    removed = 0
    for session_dir in root.iterdir():
        if not session_dir.is_dir():
            continue
        try:
            if session_dir.stat().st_mtime < cutoff:
                shutil.rmtree(session_dir, ignore_errors=True)
                removed += 1
        except OSError as exc:
            _LOG.warning("cleanup_stale_session_dirs: skipping %s: %s", session_dir, exc)
    return removed


__all__ = ["DEFAULT_LOCK_TIMEOUT_S", "DEFAULT_STORE_DIR", "Store", "cleanup_stale_session_dirs"]