#!/usr/bin/env python3 """Poll the most recent icarus112 HF Job and write one-line tps/bpb summary. No-bypass policy: pure read-only observation. Never touches the job's state. """ from __future__ import annotations import datetime as _dt import json import os import re import sys import urllib.error import urllib.request from pathlib import Path # Prefer ~/.hf_token file over env (env may have a stale/expired token from # the Claude shell snapshot). Falls back to env if file missing. _TOKEN_FILE = Path.home() / ".hf_token" if _TOKEN_FILE.exists(): TOKEN = _TOKEN_FILE.read_text().strip() else: TOKEN = os.environ.get("HF_TOKEN", "") NAMESPACE = "GAInTech" # Legacy namespace reference: icarus112 (pre-2026-05 rename) LOGDIR = Path(__file__).resolve().parents[1] / ".logs" LOGDIR.mkdir(parents=True, exist_ok=True) SUMMARY = LOGDIR / "hf_validation.log" RAW = LOGDIR / "hf_job_raw.log" def _get(url: str) -> str: req = urllib.request.Request(url, headers={"Authorization": f"Bearer {TOKEN}"}) try: with urllib.request.urlopen(req, timeout=30) as r: return r.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as e: return f"__HTTP_{e.code}__" except Exception as e: return f"__ERR_{type(e).__name__}__" def _pick_job(blob: str) -> tuple[str, str, str]: """Return (job_id, stage, flavor) for the job we want to monitor.""" try: data = json.loads(blob) except Exception: return ("", "?", "?") if isinstance(data, dict) and "jobs" in data: data = data["jobs"] if not isinstance(data, list) or not data: return ("", "?", "?") def _stage(j: dict) -> str: return str((j.get("status") or {}).get("stage", "")).upper() # Sort by createdAt descending — newest first. data = sorted(data, key=lambda j: j.get("createdAt", ""), reverse=True) running = [j for j in data if _stage(j) == "RUNNING"] picked = running[0] if running else data[0] jid = picked.get("id") or "" st = _stage(picked) or "?" flavor = picked.get("flavor") or picked.get("hardware") or "?" return jid, st, str(flavor) def _parse_metrics(logs: str) -> dict[str, str]: out: dict[str, str] = {} # Training patterns emitted by hydra/training.py: # step= tok/s= tps= val_bpb= bpb= last_step = re.findall(r"step[=:\s]+(\d+)", logs, re.IGNORECASE) if last_step: out["step"] = last_step[-1] last_tps = re.findall(r"(?:tok/?s|tps)[=:\s]+([\d.]+)", logs, re.IGNORECASE) if last_tps: out["tok/s"] = last_tps[-1] last_bpb = re.findall(r"(?:val_)?bpb[=:\s]+([\d.]+)", logs, re.IGNORECASE) if last_bpb: out["bpb"] = last_bpb[-1] # Loss as a tertiary signal last_loss = re.findall(r"\bloss[=:\s]+([\d.]+)", logs, re.IGNORECASE) if last_loss: out["loss"] = last_loss[-1] return out def main() -> int: ts = _dt.datetime.now(_dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") # 1. Find the most recent job (namespace-scoped endpoint). jobs_blob = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}") if jobs_blob.startswith("__"): SUMMARY.open("a").write(f"[{ts}] api_err jobs={jobs_blob}\n") return 0 jid, stage, flavor = _pick_job(jobs_blob) if not jid: SUMMARY.open("a").write(f"[{ts}] no_job\n") return 0 # 2. Re-query the single job for fresh stage (list endpoint can lag). detail = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}") try: dj = json.loads(detail) stage = (dj.get("status") or {}).get("stage", stage) or stage flavor = dj.get("flavor") or flavor except Exception: pass # 3. Pull logs only if the job is live (otherwise no metrics to parse). logs = "" if str(stage).upper() in {"RUNNING", "COMPLETED", "ERROR", "ERRORED"}: logs = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}/logs") RAW.write_text(logs) metrics = _parse_metrics(logs) if logs and not logs.startswith("__") else {} parts = [f"job={jid}", f"flavor={flavor}", f"stage={stage}"] for k in ("step", "tok/s", "bpb", "loss"): if k in metrics: parts.append(f"{k}={metrics[k]}") else: parts.append(f"{k}=?") SUMMARY.open("a").write(f"[{ts}] " + " ".join(parts) + "\n") return 0 if __name__ == "__main__": sys.exit(main())