File size: 4,503 Bytes
c0c4a30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import json
import subprocess
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any


@dataclass(frozen=True)
class RunMetadata:
    run_id: str
    experiment_name: str
    hypothesis_ids: list[str]
    baseline_run_id: str | None
    route_id: str
    seed: int
    code_version: str
    data_version: str
    started_at: str
    status: str


class LocalJsonlTracker:
    def __init__(self, base_dir: Path | str) -> None:
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(parents=True, exist_ok=True)
        self.runs_path = self.base_dir / "runs.jsonl"
        self.events_path = self.base_dir / "events.jsonl"

    def start(self, metadata: RunMetadata) -> None:
        payload = {"event_type": "run_started", "timestamp": _utc_now(), **asdict(metadata)}
        self._append(self.runs_path, payload)

    def log_event(self, run_id: str, event_type: str, payload: dict[str, Any]) -> None:
        self._append(
            self.events_path,
            {
                "timestamp": _utc_now(),
                "run_id": run_id,
                "event_type": event_type,
                "payload": payload,
            },
        )

    def log_metric(self, run_id: str, name: str, value: float, step: int | None = None, split: str | None = None) -> None:
        metric_payload: dict[str, Any] = {"name": name, "value": value}
        if step is not None:
            metric_payload["step"] = step
        if split is not None:
            metric_payload["split"] = split
        self.log_event(run_id, "metric", metric_payload)

    def log_params(self, run_id: str, params: dict[str, Any]) -> None:
        self.log_event(run_id, "params", params)

    def log_artifact(self, run_id: str, artifact_path: str, *, kind: str | None = None) -> None:
        payload: dict[str, Any] = {"artifact_path": artifact_path}
        if kind is not None:
            payload["kind"] = kind
        self.log_event(run_id, "artifact", payload)

    def log_environment(self, run_id: str, environment: dict[str, Any]) -> None:
        self.log_event(run_id, "environment", environment)

    def finish(self, run_id: str, status: str, summary: dict[str, Any] | None = None) -> None:
        self._append(
            self.runs_path,
            {
                "event_type": "run_finished",
                "timestamp": _utc_now(),
                "run_id": run_id,
                "status": status,
                "summary": summary or {},
            },
        )

    def _append(self, path: Path, payload: dict[str, Any]) -> None:
        with path.open("a", encoding="utf-8") as handle:
            handle.write(json.dumps(payload, ensure_ascii=False) + "\n")


def _utc_now() -> str:
    return datetime.now(timezone.utc).isoformat()


def make_run_id(prefix: str) -> str:
    return f"{prefix}-{datetime.now(timezone.utc):%Y%m%d-%H%M%S}"


def resolve_code_version(working_dir: Path | str) -> str:
    resolved = Path(working_dir)
    try:
        completed = subprocess.run(
            ["git", "-C", str(resolved), "rev-parse", "--short", "HEAD"],
            check=True,
            capture_output=True,
            text=True,
        )
        short_sha = completed.stdout.strip()
        dirty = subprocess.run(
            ["git", "-C", str(resolved), "status", "--porcelain"],
            check=True,
            capture_output=True,
            text=True,
        )
        return f"{short_sha}{'-dirty' if dirty.stdout.strip() else ''}"
    except Exception:
        return "workspace-dirty-or-unavailable"


def collect_environment_snapshot() -> dict[str, Any]:
    package_versions = {
        "gradio": _package_version("gradio"),
        "edge_tts": _package_version("edge_tts"),
        "faster_whisper": _package_version("faster_whisper"),
        "funasr": _package_version("funasr"),
        "torch": _package_version("torch"),
        "torchaudio": _package_version("torchaudio"),
    }
    return {
        "python": subprocess.run(
            ["python", "-c", "import platform; print(platform.python_version())"],
            capture_output=True,
            text=True,
            check=False,
        ).stdout.strip()
        or None,
        "package_versions": package_versions,
    }


def _package_version(module_name: str) -> str | None:
    try:
        module = __import__(module_name)
    except Exception:
        return None
    return getattr(module, "__version__", None)