|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from contextvars import ContextVar |
|
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_VERSIONS_CV: ContextVar[Optional[List[Dict[str, Any]]]] = ContextVar("VERSIONS", default=None) |
|
|
_CUR_INDEX_CV: ContextVar[int] = ContextVar("CUR_INDEX", default=-1) |
|
|
_VERS_META_CV: ContextVar[Optional[List[Dict[str, Any]]]] = ContextVar("VERS_META", default=None) |
|
|
|
|
|
|
|
|
_STORE: Dict[str, Any] = { |
|
|
"versions": [], |
|
|
"version_meta": [], |
|
|
"cur_index": -1, |
|
|
|
|
|
"df_payload": None, |
|
|
"base_df_payload": None, |
|
|
"sota_bundled": None, |
|
|
"df_summary": None, |
|
|
} |
|
|
|
|
|
_SOTA_BUNDLED_CV: ContextVar[Optional[list]] = ContextVar("SOTA_BUNDLED", default=None) |
|
|
_DF_SUMMARY_CV: ContextVar[Optional[Dict[str, Any]]] = ContextVar("DF_SUMMARY", default=None) |
|
|
|
|
|
|
|
|
|
|
|
def _get_versions() -> List[Dict[str, Any]]: |
|
|
return _VERSIONS_CV.get() or _STORE["versions"] |
|
|
|
|
|
def _get_meta() -> List[Dict[str, Any]]: |
|
|
return _VERS_META_CV.get() or _STORE["version_meta"] |
|
|
|
|
|
def _set_versions(vers: List[Dict[str, Any]], meta: List[Dict[str, Any]], cur: int) -> None: |
|
|
_VERSIONS_CV.set(vers) |
|
|
_VERS_META_CV.set(meta) |
|
|
_CUR_INDEX_CV.set(cur) |
|
|
_STORE["versions"] = vers |
|
|
_STORE["version_meta"] = meta |
|
|
_STORE["cur_index"] = cur |
|
|
|
|
|
_STORE["df_payload"] = vers[cur] if (0 <= cur < len(vers)) else None |
|
|
_STORE["base_df_payload"] = vers[0] if vers else None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init_dataset(p: Optional[Dict[str, Any]]) -> None: |
|
|
"""Initialize version stack with a single BASE version.""" |
|
|
vers = [] if p is None else [p] |
|
|
meta = [] if p is None else [dict(tag="base")] |
|
|
cur = -1 if p is None else 0 |
|
|
_set_versions(vers, meta, cur) |
|
|
|
|
|
def set_df_payload(p: Optional[Dict[str, Any]], *, new_version: bool = True) -> None: |
|
|
""" |
|
|
Set CURRENT dataset. |
|
|
- new_version=True: truncate any forward history and append p (like a new commit). |
|
|
- new_version=False: replace the current version in place (no new snapshot). |
|
|
""" |
|
|
vers = list(_get_versions()) |
|
|
meta = list(_get_meta()) |
|
|
cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"] |
|
|
|
|
|
if cur < 0 or not vers: |
|
|
|
|
|
init_dataset(p) |
|
|
return |
|
|
|
|
|
if new_version: |
|
|
|
|
|
vers = vers[:cur + 1] |
|
|
meta = meta[:cur + 1] |
|
|
vers.append(p) |
|
|
meta.append({}) |
|
|
cur = len(vers) - 1 |
|
|
else: |
|
|
vers[cur] = p |
|
|
|
|
|
_set_versions(vers, meta, cur) |
|
|
|
|
|
def annotate_current(**kv) -> None: |
|
|
"""Attach metadata to the current version (e.g., step/params/stats).""" |
|
|
vers = list(_get_versions()) |
|
|
meta = list(_get_meta()) |
|
|
cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"] |
|
|
if 0 <= cur < len(meta): |
|
|
meta[cur] = {**meta[cur], **kv} |
|
|
_set_versions(vers, meta, cur) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _resolve_index(spec: Union[str, int, None]) -> int: |
|
|
vers = _get_versions() |
|
|
cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"] |
|
|
if spec is None or spec == "current": |
|
|
return cur |
|
|
if spec == "base": |
|
|
return 0 if vers else -1 |
|
|
if spec == "prev": |
|
|
return max(-1, cur - 1) |
|
|
if isinstance(spec, int): |
|
|
idx = spec if spec >= 0 else len(vers) + spec |
|
|
return idx |
|
|
|
|
|
if isinstance(spec, str) and spec.startswith("@"): |
|
|
try: |
|
|
n = int(spec[1:]) |
|
|
except Exception: |
|
|
return cur |
|
|
idx = n if n >= 0 else len(vers) + n |
|
|
return idx |
|
|
return cur |
|
|
|
|
|
def get_df_payload(version: Union[str, int, None] = None) -> Optional[Dict[str, Any]]: |
|
|
"""Return dataset payload for the requested version (default: current).""" |
|
|
vers = _get_versions() |
|
|
idx = _resolve_index(version) |
|
|
if 0 <= idx < len(vers): |
|
|
return vers[idx] |
|
|
|
|
|
return _STORE["df_payload"] |
|
|
|
|
|
def get_base_df_payload() -> Optional[Dict[str, Any]]: |
|
|
return get_df_payload("base") |
|
|
|
|
|
def get_prev_df_payload() -> Optional[Dict[str, Any]]: |
|
|
return get_df_payload("prev") |
|
|
|
|
|
def list_versions() -> Dict[str, Any]: |
|
|
"""Lightweight overview for debugging/UI.""" |
|
|
vers = _get_versions() |
|
|
meta = _get_meta() |
|
|
cur = _CUR_INDEX_CV.get() if _CUR_INDEX_CV.get() is not None else _STORE["cur_index"] |
|
|
return { |
|
|
"count": len(vers), |
|
|
"current_index": cur, |
|
|
"has_base": bool(vers), |
|
|
"meta": meta, |
|
|
} |
|
|
|
|
|
def reset_current_to(version: Union[str, int]) -> None: |
|
|
"""Move the current pointer to a prior version (no deletion).""" |
|
|
vers = _get_versions() |
|
|
meta = _get_meta() |
|
|
idx = _resolve_index(version) |
|
|
if 0 <= idx < len(vers): |
|
|
_set_versions(vers, meta, idx) |
|
|
|
|
|
def reset_current_to_base() -> None: |
|
|
reset_current_to("base") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_sota_bundled(b: Optional[list]) -> None: |
|
|
_SOTA_BUNDLED_CV.set(b) |
|
|
_STORE["sota_bundled"] = b |
|
|
|
|
|
def get_sota_bundled() -> Optional[list]: |
|
|
return _SOTA_BUNDLED_CV.get() or _STORE["sota_bundled"] |
|
|
|
|
|
def set_df_summary(s: Optional[Dict[str, Any]]) -> None: |
|
|
_DF_SUMMARY_CV.set(s) |
|
|
_STORE["df_summary"] = s |
|
|
|
|
|
def get_df_summary() -> Optional[Dict[str, Any]]: |
|
|
return _DF_SUMMARY_CV.get() or _STORE["df_summary"] |
|
|
|