File size: 6,017 Bytes
458c8e2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# agent/runtime_ctx.py
from __future__ import annotations
from contextvars import ContextVar
from typing import Any, Dict, List, Optional, Union
# ---------------- Versioned dataset store ----------------
# Each new mutating step can create a new "version".
# You can address versions as: "current", "base", "prev", "@-1", "@-2", "@3" (0-based).
_VERSIONS_CV: ContextVar[Optional[List[Dict[str, Any]]]] = ContextVar("VERSIONS", default=None)
_CUR_INDEX_CV: ContextVar[int] = ContextVar("CUR_INDEX", default=-1)
_VERS_META_CV: ContextVar[Optional[List[Dict[str, Any]]]] = ContextVar("VERS_META", default=None)
# Legacy singletons (fallback across tasks)
_STORE: Dict[str, Any] = {
"versions": [], # list of df_payloads
"version_meta": [], # parallel list of metadata dicts
"cur_index": -1,
# kept for backward compat with old getters:
"df_payload": None, # alias of current
"base_df_payload": None, # alias of versions[0]
"sota_bundled": None,
"df_summary": None,
}
_SOTA_BUNDLED_CV: ContextVar[Optional[list]] = ContextVar("SOTA_BUNDLED", default=None)
_DF_SUMMARY_CV: ContextVar[Optional[Dict[str, Any]]] = ContextVar("DF_SUMMARY", default=None)
# -------- internal helpers --------
def _get_versions() -> List[Dict[str, Any]]:
return _VERSIONS_CV.get() or _STORE["versions"]
def _get_meta() -> List[Dict[str, Any]]:
return _VERS_META_CV.get() or _STORE["version_meta"]
def _set_versions(vers: List[Dict[str, Any]], meta: List[Dict[str, Any]], cur: int) -> None:
_VERSIONS_CV.set(vers)
_VERS_META_CV.set(meta)
_CUR_INDEX_CV.set(cur)
_STORE["versions"] = vers
_STORE["version_meta"] = meta
_STORE["cur_index"] = cur
# keep legacy aliases in sync
_STORE["df_payload"] = vers[cur] if (0 <= cur < len(vers)) else None
_STORE["base_df_payload"] = vers[0] if vers else None
# =========================
# Init / Set / Annotate
# =========================
def init_dataset(p: Optional[Dict[str, Any]]) -> None:
"""Initialize version stack with a single BASE version."""
vers = [] if p is None else [p]
meta = [] if p is None else [dict(tag="base")]
cur = -1 if p is None else 0
_set_versions(vers, meta, cur)
def set_df_payload(p: Optional[Dict[str, Any]], *, new_version: bool = True) -> None:
"""
Set CURRENT dataset.
- new_version=True: truncate any forward history and append p (like a new commit).
- new_version=False: replace the current version in place (no new snapshot).
"""
vers = list(_get_versions())
meta = list(_get_meta())
cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"]
if cur < 0 or not vers:
# not initialized yet
init_dataset(p)
return
if new_version:
# drop any versions after current (no branching for simplicity)
vers = vers[:cur + 1]
meta = meta[:cur + 1]
vers.append(p)
meta.append({})
cur = len(vers) - 1
else:
vers[cur] = p
_set_versions(vers, meta, cur)
def annotate_current(**kv) -> None:
"""Attach metadata to the current version (e.g., step/params/stats)."""
vers = list(_get_versions())
meta = list(_get_meta())
cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"]
if 0 <= cur < len(meta):
meta[cur] = {**meta[cur], **kv}
_set_versions(vers, meta, cur)
# =========================
# Getters / Navigation
# =========================
def _resolve_index(spec: Union[str, int, None]) -> int:
vers = _get_versions()
cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"]
if spec is None or spec == "current":
return cur
if spec == "base":
return 0 if vers else -1
if spec == "prev":
return max(-1, cur - 1)
if isinstance(spec, int):
idx = spec if spec >= 0 else len(vers) + spec
return idx
# strings like "@-1", "@3"
if isinstance(spec, str) and spec.startswith("@"):
try:
n = int(spec[1:])
except Exception:
return cur
idx = n if n >= 0 else len(vers) + n
return idx
return cur
def get_df_payload(version: Union[str, int, None] = None) -> Optional[Dict[str, Any]]:
"""Return dataset payload for the requested version (default: current)."""
vers = _get_versions()
idx = _resolve_index(version)
if 0 <= idx < len(vers):
return vers[idx]
# legacy fallback
return _STORE["df_payload"]
def get_base_df_payload() -> Optional[Dict[str, Any]]:
return get_df_payload("base")
def get_prev_df_payload() -> Optional[Dict[str, Any]]:
return get_df_payload("prev")
def list_versions() -> Dict[str, Any]:
"""Lightweight overview for debugging/UI."""
vers = _get_versions()
meta = _get_meta()
cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"]
return {
"count": len(vers),
"current_index": cur,
"has_base": bool(vers),
"meta": meta, # [{tag:..., step:..., params:...}, ...]
}
def reset_current_to(version: Union[str, int]) -> None:
"""Move the current pointer to a prior version (no deletion)."""
vers = _get_versions()
meta = _get_meta()
idx = _resolve_index(version)
if 0 <= idx < len(vers):
_set_versions(vers, meta, idx)
def reset_current_to_base() -> None:
reset_current_to("base")
# =========================
# SOTA / Summary passthrough
# =========================
def set_sota_bundled(b: Optional[list]) -> None:
_SOTA_BUNDLED_CV.set(b)
_STORE["sota_bundled"] = b
def get_sota_bundled() -> Optional[list]:
return _SOTA_BUNDLED_CV.get() or _STORE["sota_bundled"]
def set_df_summary(s: Optional[Dict[str, Any]]) -> None:
_DF_SUMMARY_CV.set(s)
_STORE["df_summary"] = s
def get_df_summary() -> Optional[Dict[str, Any]]:
return _DF_SUMMARY_CV.get() or _STORE["df_summary"]
|