File size: 4,474 Bytes
22741d9 c475135 22741d9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | #!/usr/bin/env python3
"""Poll the most recent icarus112 HF Job and write one-line tps/bpb summary.
No-bypass policy: pure read-only observation. Never touches the job's state.
"""
from __future__ import annotations
import datetime as _dt
import json
import os
import re
import sys
import urllib.error
import urllib.request
from pathlib import Path
# Prefer ~/.hf_token file over env (env may have a stale/expired token from
# the Claude shell snapshot). Falls back to env if file missing.
_TOKEN_FILE = Path.home() / ".hf_token"
if _TOKEN_FILE.exists():
TOKEN = _TOKEN_FILE.read_text().strip()
else:
TOKEN = os.environ.get("HF_TOKEN", "")
NAMESPACE = "GAInTech"
# Legacy namespace reference: icarus112 (pre-2026-05 rename)
LOGDIR = Path(__file__).resolve().parents[1] / ".logs"
LOGDIR.mkdir(parents=True, exist_ok=True)
SUMMARY = LOGDIR / "hf_validation.log"
RAW = LOGDIR / "hf_job_raw.log"
def _get(url: str) -> str:
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {TOKEN}"})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
return f"__HTTP_{e.code}__"
except Exception as e:
return f"__ERR_{type(e).__name__}__"
def _pick_job(blob: str) -> tuple[str, str, str]:
"""Return (job_id, stage, flavor) for the job we want to monitor."""
try:
data = json.loads(blob)
except Exception:
return ("", "?", "?")
if isinstance(data, dict) and "jobs" in data:
data = data["jobs"]
if not isinstance(data, list) or not data:
return ("", "?", "?")
def _stage(j: dict) -> str:
return str((j.get("status") or {}).get("stage", "")).upper()
# Sort by createdAt descending — newest first.
data = sorted(data, key=lambda j: j.get("createdAt", ""), reverse=True)
running = [j for j in data if _stage(j) == "RUNNING"]
picked = running[0] if running else data[0]
jid = picked.get("id") or ""
st = _stage(picked) or "?"
flavor = picked.get("flavor") or picked.get("hardware") or "?"
return jid, st, str(flavor)
def _parse_metrics(logs: str) -> dict[str, str]:
out: dict[str, str] = {}
# Training patterns emitted by hydra/training.py:
# step=<int> tok/s=<num> tps=<num> val_bpb=<num> bpb=<num>
last_step = re.findall(r"step[=:\s]+(\d+)", logs, re.IGNORECASE)
if last_step:
out["step"] = last_step[-1]
last_tps = re.findall(r"(?:tok/?s|tps)[=:\s]+([\d.]+)", logs, re.IGNORECASE)
if last_tps:
out["tok/s"] = last_tps[-1]
last_bpb = re.findall(r"(?:val_)?bpb[=:\s]+([\d.]+)", logs, re.IGNORECASE)
if last_bpb:
out["bpb"] = last_bpb[-1]
# Loss as a tertiary signal
last_loss = re.findall(r"\bloss[=:\s]+([\d.]+)", logs, re.IGNORECASE)
if last_loss:
out["loss"] = last_loss[-1]
return out
def main() -> int:
ts = _dt.datetime.now(_dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# 1. Find the most recent job (namespace-scoped endpoint).
jobs_blob = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}")
if jobs_blob.startswith("__"):
SUMMARY.open("a").write(f"[{ts}] api_err jobs={jobs_blob}\n")
return 0
jid, stage, flavor = _pick_job(jobs_blob)
if not jid:
SUMMARY.open("a").write(f"[{ts}] no_job\n")
return 0
# 2. Re-query the single job for fresh stage (list endpoint can lag).
detail = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}")
try:
dj = json.loads(detail)
stage = (dj.get("status") or {}).get("stage", stage) or stage
flavor = dj.get("flavor") or flavor
except Exception:
pass
# 3. Pull logs only if the job is live (otherwise no metrics to parse).
logs = ""
if str(stage).upper() in {"RUNNING", "COMPLETED", "ERROR", "ERRORED"}:
logs = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}/logs")
RAW.write_text(logs)
metrics = _parse_metrics(logs) if logs and not logs.startswith("__") else {}
parts = [f"job={jid}", f"flavor={flavor}", f"stage={stage}"]
for k in ("step", "tok/s", "bpb", "loss"):
if k in metrics:
parts.append(f"{k}={metrics[k]}")
else:
parts.append(f"{k}=?")
SUMMARY.open("a").write(f"[{ts}] " + " ".join(parts) + "\n")
return 0
if __name__ == "__main__":
sys.exit(main())
|