File size: 6,017 Bytes
458c8e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# agent/runtime_ctx.py
from __future__ import annotations

from contextvars import ContextVar
from typing import Any, Dict, List, Optional, Union

# ---------------- Versioned dataset store ----------------
# Each new mutating step can create a new "version".
# You can address versions as: "current", "base", "prev", "@-1", "@-2", "@3" (0-based).
_VERSIONS_CV: ContextVar[Optional[List[Dict[str, Any]]]] = ContextVar("VERSIONS", default=None)
_CUR_INDEX_CV: ContextVar[int] = ContextVar("CUR_INDEX", default=-1)
_VERS_META_CV: ContextVar[Optional[List[Dict[str, Any]]]] = ContextVar("VERS_META", default=None)

# Legacy singletons (fallback across tasks)
_STORE: Dict[str, Any] = {
    "versions": [],            # list of df_payloads
    "version_meta": [],        # parallel list of metadata dicts
    "cur_index": -1,
    # kept for backward compat with old getters:
    "df_payload": None,        # alias of current
    "base_df_payload": None,   # alias of versions[0]
    "sota_bundled": None,
    "df_summary": None,
}

_SOTA_BUNDLED_CV: ContextVar[Optional[list]] = ContextVar("SOTA_BUNDLED", default=None)
_DF_SUMMARY_CV: ContextVar[Optional[Dict[str, Any]]] = ContextVar("DF_SUMMARY", default=None)


# -------- internal helpers --------
def _get_versions() -> List[Dict[str, Any]]:
    return _VERSIONS_CV.get() or _STORE["versions"]

def _get_meta() -> List[Dict[str, Any]]:
    return _VERS_META_CV.get() or _STORE["version_meta"]

def _set_versions(vers: List[Dict[str, Any]], meta: List[Dict[str, Any]], cur: int) -> None:
    _VERSIONS_CV.set(vers)
    _VERS_META_CV.set(meta)
    _CUR_INDEX_CV.set(cur)
    _STORE["versions"] = vers
    _STORE["version_meta"] = meta
    _STORE["cur_index"] = cur
    # keep legacy aliases in sync
    _STORE["df_payload"] = vers[cur] if (0 <= cur < len(vers)) else None
    _STORE["base_df_payload"] = vers[0] if vers else None


# =========================
# Init / Set / Annotate
# =========================
def init_dataset(p: Optional[Dict[str, Any]]) -> None:
    """Initialize version stack with a single BASE version."""
    vers = [] if p is None else [p]
    meta = [] if p is None else [dict(tag="base")]
    cur = -1 if p is None else 0
    _set_versions(vers, meta, cur)

def set_df_payload(p: Optional[Dict[str, Any]], *, new_version: bool = True) -> None:
    """
    Set CURRENT dataset.
    - new_version=True: truncate any forward history and append p (like a new commit).
    - new_version=False: replace the current version in place (no new snapshot).
    """
    vers = list(_get_versions())
    meta = list(_get_meta())
    cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"]

    if cur < 0 or not vers:
        # not initialized yet
        init_dataset(p)
        return

    if new_version:
        # drop any versions after current (no branching for simplicity)
        vers = vers[:cur + 1]
        meta = meta[:cur + 1]
        vers.append(p)
        meta.append({})
        cur = len(vers) - 1
    else:
        vers[cur] = p

    _set_versions(vers, meta, cur)

def annotate_current(**kv) -> None:
    """Attach metadata to the current version (e.g., step/params/stats)."""
    vers = list(_get_versions())
    meta = list(_get_meta())
    cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"]
    if 0 <= cur < len(meta):
        meta[cur] = {**meta[cur], **kv}
        _set_versions(vers, meta, cur)


# =========================
# Getters / Navigation
# =========================
def _resolve_index(spec: Union[str, int, None]) -> int:
    vers = _get_versions()
    cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"]
    if spec is None or spec == "current":
        return cur
    if spec == "base":
        return 0 if vers else -1
    if spec == "prev":
        return max(-1, cur - 1)
    if isinstance(spec, int):
        idx = spec if spec >= 0 else len(vers) + spec
        return idx
    # strings like "@-1", "@3"
    if isinstance(spec, str) and spec.startswith("@"):
        try:
            n = int(spec[1:])
        except Exception:
            return cur
        idx = n if n >= 0 else len(vers) + n
        return idx
    return cur

def get_df_payload(version: Union[str, int, None] = None) -> Optional[Dict[str, Any]]:
    """Return dataset payload for the requested version (default: current)."""
    vers = _get_versions()
    idx = _resolve_index(version)
    if 0 <= idx < len(vers):
        return vers[idx]
    # legacy fallback
    return _STORE["df_payload"]

def get_base_df_payload() -> Optional[Dict[str, Any]]:
    return get_df_payload("base")

def get_prev_df_payload() -> Optional[Dict[str, Any]]:
    return get_df_payload("prev")

def list_versions() -> Dict[str, Any]:
    """Lightweight overview for debugging/UI."""
    vers = _get_versions()
    meta = _get_meta()
    cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"]
    return {
        "count": len(vers),
        "current_index": cur,
        "has_base": bool(vers),
        "meta": meta,  # [{tag:..., step:..., params:...}, ...]
    }

def reset_current_to(version: Union[str, int]) -> None:
    """Move the current pointer to a prior version (no deletion)."""
    vers = _get_versions()
    meta = _get_meta()
    idx = _resolve_index(version)
    if 0 <= idx < len(vers):
        _set_versions(vers, meta, idx)

def reset_current_to_base() -> None:
    reset_current_to("base")


# =========================
# SOTA / Summary passthrough
# =========================
def set_sota_bundled(b: Optional[list]) -> None:
    _SOTA_BUNDLED_CV.set(b)
    _STORE["sota_bundled"] = b

def get_sota_bundled() -> Optional[list]:
    return _SOTA_BUNDLED_CV.get() or _STORE["sota_bundled"]

def set_df_summary(s: Optional[Dict[str, Any]]) -> None:
    _DF_SUMMARY_CV.set(s)
    _STORE["df_summary"] = s

def get_df_summary() -> Optional[Dict[str, Any]]:
    return _DF_SUMMARY_CV.get() or _STORE["df_summary"]