VoiceDirector / utils /tracking.py
dsa2dsads's picture
demo: package VoiceDirector stage-1 integration app
c0c4a30 verified
import json
import subprocess
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class RunMetadata:
run_id: str
experiment_name: str
hypothesis_ids: list[str]
baseline_run_id: str | None
route_id: str
seed: int
code_version: str
data_version: str
started_at: str
status: str
class LocalJsonlTracker:
def __init__(self, base_dir: Path | str) -> None:
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
self.runs_path = self.base_dir / "runs.jsonl"
self.events_path = self.base_dir / "events.jsonl"
def start(self, metadata: RunMetadata) -> None:
payload = {"event_type": "run_started", "timestamp": _utc_now(), **asdict(metadata)}
self._append(self.runs_path, payload)
def log_event(self, run_id: str, event_type: str, payload: dict[str, Any]) -> None:
self._append(
self.events_path,
{
"timestamp": _utc_now(),
"run_id": run_id,
"event_type": event_type,
"payload": payload,
},
)
def log_metric(self, run_id: str, name: str, value: float, step: int | None = None, split: str | None = None) -> None:
metric_payload: dict[str, Any] = {"name": name, "value": value}
if step is not None:
metric_payload["step"] = step
if split is not None:
metric_payload["split"] = split
self.log_event(run_id, "metric", metric_payload)
def log_params(self, run_id: str, params: dict[str, Any]) -> None:
self.log_event(run_id, "params", params)
def log_artifact(self, run_id: str, artifact_path: str, *, kind: str | None = None) -> None:
payload: dict[str, Any] = {"artifact_path": artifact_path}
if kind is not None:
payload["kind"] = kind
self.log_event(run_id, "artifact", payload)
def log_environment(self, run_id: str, environment: dict[str, Any]) -> None:
self.log_event(run_id, "environment", environment)
def finish(self, run_id: str, status: str, summary: dict[str, Any] | None = None) -> None:
self._append(
self.runs_path,
{
"event_type": "run_finished",
"timestamp": _utc_now(),
"run_id": run_id,
"status": status,
"summary": summary or {},
},
)
def _append(self, path: Path, payload: dict[str, Any]) -> None:
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload, ensure_ascii=False) + "\n")
def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def make_run_id(prefix: str) -> str:
return f"{prefix}-{datetime.now(timezone.utc):%Y%m%d-%H%M%S}"
def resolve_code_version(working_dir: Path | str) -> str:
resolved = Path(working_dir)
try:
completed = subprocess.run(
["git", "-C", str(resolved), "rev-parse", "--short", "HEAD"],
check=True,
capture_output=True,
text=True,
)
short_sha = completed.stdout.strip()
dirty = subprocess.run(
["git", "-C", str(resolved), "status", "--porcelain"],
check=True,
capture_output=True,
text=True,
)
return f"{short_sha}{'-dirty' if dirty.stdout.strip() else ''}"
except Exception:
return "workspace-dirty-or-unavailable"
def collect_environment_snapshot() -> dict[str, Any]:
package_versions = {
"gradio": _package_version("gradio"),
"edge_tts": _package_version("edge_tts"),
"faster_whisper": _package_version("faster_whisper"),
"funasr": _package_version("funasr"),
"torch": _package_version("torch"),
"torchaudio": _package_version("torchaudio"),
}
return {
"python": subprocess.run(
["python", "-c", "import platform; print(platform.python_version())"],
capture_output=True,
text=True,
check=False,
).stdout.strip()
or None,
"package_versions": package_versions,
}
def _package_version(module_name: str) -> str | None:
try:
module = __import__(module_name)
except Exception:
return None
return getattr(module, "__version__", None)