| |
| """Poll the most recent icarus112 HF Job and write one-line tps/bpb summary. |
| |
| No-bypass policy: pure read-only observation. Never touches the job's state. |
| """ |
| from __future__ import annotations |
|
|
| import datetime as _dt |
| import json |
| import os |
| import re |
| import sys |
| import urllib.error |
| import urllib.request |
| from pathlib import Path |
|
|
| |
| |
| _TOKEN_FILE = Path.home() / ".hf_token" |
| if _TOKEN_FILE.exists(): |
| TOKEN = _TOKEN_FILE.read_text().strip() |
| else: |
| TOKEN = os.environ.get("HF_TOKEN", "") |
| NAMESPACE = "GAInTech" |
| |
| LOGDIR = Path(__file__).resolve().parents[1] / ".logs" |
| LOGDIR.mkdir(parents=True, exist_ok=True) |
| SUMMARY = LOGDIR / "hf_validation.log" |
| RAW = LOGDIR / "hf_job_raw.log" |
|
|
|
|
| def _get(url: str) -> str: |
| req = urllib.request.Request(url, headers={"Authorization": f"Bearer {TOKEN}"}) |
| try: |
| with urllib.request.urlopen(req, timeout=30) as r: |
| return r.read().decode("utf-8", errors="replace") |
| except urllib.error.HTTPError as e: |
| return f"__HTTP_{e.code}__" |
| except Exception as e: |
| return f"__ERR_{type(e).__name__}__" |
|
|
|
|
| def _pick_job(blob: str) -> tuple[str, str, str]: |
| """Return (job_id, stage, flavor) for the job we want to monitor.""" |
| try: |
| data = json.loads(blob) |
| except Exception: |
| return ("", "?", "?") |
| if isinstance(data, dict) and "jobs" in data: |
| data = data["jobs"] |
| if not isinstance(data, list) or not data: |
| return ("", "?", "?") |
|
|
| def _stage(j: dict) -> str: |
| return str((j.get("status") or {}).get("stage", "")).upper() |
|
|
| |
| data = sorted(data, key=lambda j: j.get("createdAt", ""), reverse=True) |
| running = [j for j in data if _stage(j) == "RUNNING"] |
| picked = running[0] if running else data[0] |
| jid = picked.get("id") or "" |
| st = _stage(picked) or "?" |
| flavor = picked.get("flavor") or picked.get("hardware") or "?" |
| return jid, st, str(flavor) |
|
|
|
|
| def _parse_metrics(logs: str) -> dict[str, str]: |
| out: dict[str, str] = {} |
| |
| |
| last_step = re.findall(r"step[=:\s]+(\d+)", logs, re.IGNORECASE) |
| if last_step: |
| out["step"] = last_step[-1] |
| last_tps = re.findall(r"(?:tok/?s|tps)[=:\s]+([\d.]+)", logs, re.IGNORECASE) |
| if last_tps: |
| out["tok/s"] = last_tps[-1] |
| last_bpb = re.findall(r"(?:val_)?bpb[=:\s]+([\d.]+)", logs, re.IGNORECASE) |
| if last_bpb: |
| out["bpb"] = last_bpb[-1] |
| |
| last_loss = re.findall(r"\bloss[=:\s]+([\d.]+)", logs, re.IGNORECASE) |
| if last_loss: |
| out["loss"] = last_loss[-1] |
| return out |
|
|
|
|
| def main() -> int: |
| ts = _dt.datetime.now(_dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") |
|
|
| |
| jobs_blob = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}") |
| if jobs_blob.startswith("__"): |
| SUMMARY.open("a").write(f"[{ts}] api_err jobs={jobs_blob}\n") |
| return 0 |
|
|
| jid, stage, flavor = _pick_job(jobs_blob) |
| if not jid: |
| SUMMARY.open("a").write(f"[{ts}] no_job\n") |
| return 0 |
|
|
| |
| detail = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}") |
| try: |
| dj = json.loads(detail) |
| stage = (dj.get("status") or {}).get("stage", stage) or stage |
| flavor = dj.get("flavor") or flavor |
| except Exception: |
| pass |
|
|
| |
| logs = "" |
| if str(stage).upper() in {"RUNNING", "COMPLETED", "ERROR", "ERRORED"}: |
| logs = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}/logs") |
| RAW.write_text(logs) |
|
|
| metrics = _parse_metrics(logs) if logs and not logs.startswith("__") else {} |
|
|
| parts = [f"job={jid}", f"flavor={flavor}", f"stage={stage}"] |
| for k in ("step", "tok/s", "bpb", "loss"): |
| if k in metrics: |
| parts.append(f"{k}={metrics[k]}") |
| else: |
| parts.append(f"{k}=?") |
| SUMMARY.open("a").write(f"[{ts}] " + " ".join(parts) + "\n") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|