feather-a10g-large-runtime / overlay /scripts /cron_validate_hf_job.py
icarus112's picture
Update Feather a10g-large training runtime image
c475135 verified
#!/usr/bin/env python3
"""Poll the most recent icarus112 HF Job and write one-line tps/bpb summary.
No-bypass policy: pure read-only observation. Never touches the job's state.
"""
from __future__ import annotations
import datetime as _dt
import json
import os
import re
import sys
import urllib.error
import urllib.request
from pathlib import Path
# Prefer ~/.hf_token file over env (env may have a stale/expired token from
# the Claude shell snapshot). Falls back to env if file missing.
_TOKEN_FILE = Path.home() / ".hf_token"
if _TOKEN_FILE.exists():
TOKEN = _TOKEN_FILE.read_text().strip()
else:
TOKEN = os.environ.get("HF_TOKEN", "")
NAMESPACE = "GAInTech"
# Legacy namespace reference: icarus112 (pre-2026-05 rename)
LOGDIR = Path(__file__).resolve().parents[1] / ".logs"
LOGDIR.mkdir(parents=True, exist_ok=True)
SUMMARY = LOGDIR / "hf_validation.log"
RAW = LOGDIR / "hf_job_raw.log"
def _get(url: str) -> str:
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {TOKEN}"})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
return f"__HTTP_{e.code}__"
except Exception as e:
return f"__ERR_{type(e).__name__}__"
def _pick_job(blob: str) -> tuple[str, str, str]:
"""Return (job_id, stage, flavor) for the job we want to monitor."""
try:
data = json.loads(blob)
except Exception:
return ("", "?", "?")
if isinstance(data, dict) and "jobs" in data:
data = data["jobs"]
if not isinstance(data, list) or not data:
return ("", "?", "?")
def _stage(j: dict) -> str:
return str((j.get("status") or {}).get("stage", "")).upper()
# Sort by createdAt descending — newest first.
data = sorted(data, key=lambda j: j.get("createdAt", ""), reverse=True)
running = [j for j in data if _stage(j) == "RUNNING"]
picked = running[0] if running else data[0]
jid = picked.get("id") or ""
st = _stage(picked) or "?"
flavor = picked.get("flavor") or picked.get("hardware") or "?"
return jid, st, str(flavor)
def _parse_metrics(logs: str) -> dict[str, str]:
out: dict[str, str] = {}
# Training patterns emitted by hydra/training.py:
# step=<int> tok/s=<num> tps=<num> val_bpb=<num> bpb=<num>
last_step = re.findall(r"step[=:\s]+(\d+)", logs, re.IGNORECASE)
if last_step:
out["step"] = last_step[-1]
last_tps = re.findall(r"(?:tok/?s|tps)[=:\s]+([\d.]+)", logs, re.IGNORECASE)
if last_tps:
out["tok/s"] = last_tps[-1]
last_bpb = re.findall(r"(?:val_)?bpb[=:\s]+([\d.]+)", logs, re.IGNORECASE)
if last_bpb:
out["bpb"] = last_bpb[-1]
# Loss as a tertiary signal
last_loss = re.findall(r"\bloss[=:\s]+([\d.]+)", logs, re.IGNORECASE)
if last_loss:
out["loss"] = last_loss[-1]
return out
def main() -> int:
ts = _dt.datetime.now(_dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# 1. Find the most recent job (namespace-scoped endpoint).
jobs_blob = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}")
if jobs_blob.startswith("__"):
SUMMARY.open("a").write(f"[{ts}] api_err jobs={jobs_blob}\n")
return 0
jid, stage, flavor = _pick_job(jobs_blob)
if not jid:
SUMMARY.open("a").write(f"[{ts}] no_job\n")
return 0
# 2. Re-query the single job for fresh stage (list endpoint can lag).
detail = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}")
try:
dj = json.loads(detail)
stage = (dj.get("status") or {}).get("stage", stage) or stage
flavor = dj.get("flavor") or flavor
except Exception:
pass
# 3. Pull logs only if the job is live (otherwise no metrics to parse).
logs = ""
if str(stage).upper() in {"RUNNING", "COMPLETED", "ERROR", "ERRORED"}:
logs = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}/logs")
RAW.write_text(logs)
metrics = _parse_metrics(logs) if logs and not logs.startswith("__") else {}
parts = [f"job={jid}", f"flavor={flavor}", f"stage={stage}"]
for k in ("step", "tok/s", "bpb", "loss"):
if k in metrics:
parts.append(f"{k}={metrics[k]}")
else:
parts.append(f"{k}=?")
SUMMARY.open("a").write(f"[{ts}] " + " ".join(parts) + "\n")
return 0
if __name__ == "__main__":
sys.exit(main())