import json import subprocess from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any @dataclass(frozen=True) class RunMetadata: run_id: str experiment_name: str hypothesis_ids: list[str] baseline_run_id: str | None route_id: str seed: int code_version: str data_version: str started_at: str status: str class LocalJsonlTracker: def __init__(self, base_dir: Path | str) -> None: self.base_dir = Path(base_dir) self.base_dir.mkdir(parents=True, exist_ok=True) self.runs_path = self.base_dir / "runs.jsonl" self.events_path = self.base_dir / "events.jsonl" def start(self, metadata: RunMetadata) -> None: payload = {"event_type": "run_started", "timestamp": _utc_now(), **asdict(metadata)} self._append(self.runs_path, payload) def log_event(self, run_id: str, event_type: str, payload: dict[str, Any]) -> None: self._append( self.events_path, { "timestamp": _utc_now(), "run_id": run_id, "event_type": event_type, "payload": payload, }, ) def log_metric(self, run_id: str, name: str, value: float, step: int | None = None, split: str | None = None) -> None: metric_payload: dict[str, Any] = {"name": name, "value": value} if step is not None: metric_payload["step"] = step if split is not None: metric_payload["split"] = split self.log_event(run_id, "metric", metric_payload) def log_params(self, run_id: str, params: dict[str, Any]) -> None: self.log_event(run_id, "params", params) def log_artifact(self, run_id: str, artifact_path: str, *, kind: str | None = None) -> None: payload: dict[str, Any] = {"artifact_path": artifact_path} if kind is not None: payload["kind"] = kind self.log_event(run_id, "artifact", payload) def log_environment(self, run_id: str, environment: dict[str, Any]) -> None: self.log_event(run_id, "environment", environment) def finish(self, run_id: str, status: str, summary: dict[str, Any] | None = None) -> None: self._append( self.runs_path, { "event_type": "run_finished", "timestamp": _utc_now(), "run_id": run_id, "status": status, "summary": summary or {}, }, ) def _append(self, path: Path, payload: dict[str, Any]) -> None: with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, ensure_ascii=False) + "\n") def _utc_now() -> str: return datetime.now(timezone.utc).isoformat() def make_run_id(prefix: str) -> str: return f"{prefix}-{datetime.now(timezone.utc):%Y%m%d-%H%M%S}" def resolve_code_version(working_dir: Path | str) -> str: resolved = Path(working_dir) try: completed = subprocess.run( ["git", "-C", str(resolved), "rev-parse", "--short", "HEAD"], check=True, capture_output=True, text=True, ) short_sha = completed.stdout.strip() dirty = subprocess.run( ["git", "-C", str(resolved), "status", "--porcelain"], check=True, capture_output=True, text=True, ) return f"{short_sha}{'-dirty' if dirty.stdout.strip() else ''}" except Exception: return "workspace-dirty-or-unavailable" def collect_environment_snapshot() -> dict[str, Any]: package_versions = { "gradio": _package_version("gradio"), "edge_tts": _package_version("edge_tts"), "faster_whisper": _package_version("faster_whisper"), "funasr": _package_version("funasr"), "torch": _package_version("torch"), "torchaudio": _package_version("torchaudio"), } return { "python": subprocess.run( ["python", "-c", "import platform; print(platform.python_version())"], capture_output=True, text=True, check=False, ).stdout.strip() or None, "package_versions": package_versions, } def _package_version(module_name: str) -> str | None: try: module = __import__(module_name) except Exception: return None return getattr(module, "__version__", None)