File size: 4,474 Bytes
22741d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c475135
 
22741d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
"""Poll the most recent icarus112 HF Job and write one-line tps/bpb summary.

No-bypass policy: pure read-only observation. Never touches the job's state.
"""
from __future__ import annotations

import datetime as _dt
import json
import os
import re
import sys
import urllib.error
import urllib.request
from pathlib import Path

# Prefer ~/.hf_token file over env (env may have a stale/expired token from
# the Claude shell snapshot). Falls back to env if file missing.
_TOKEN_FILE = Path.home() / ".hf_token"
if _TOKEN_FILE.exists():
    TOKEN = _TOKEN_FILE.read_text().strip()
else:
    TOKEN = os.environ.get("HF_TOKEN", "")
NAMESPACE = "GAInTech"
# Legacy namespace reference: icarus112 (pre-2026-05 rename)
LOGDIR = Path(__file__).resolve().parents[1] / ".logs"
LOGDIR.mkdir(parents=True, exist_ok=True)
SUMMARY = LOGDIR / "hf_validation.log"
RAW = LOGDIR / "hf_job_raw.log"


def _get(url: str) -> str:
    req = urllib.request.Request(url, headers={"Authorization": f"Bearer {TOKEN}"})
    try:
        with urllib.request.urlopen(req, timeout=30) as r:
            return r.read().decode("utf-8", errors="replace")
    except urllib.error.HTTPError as e:
        return f"__HTTP_{e.code}__"
    except Exception as e:
        return f"__ERR_{type(e).__name__}__"


def _pick_job(blob: str) -> tuple[str, str, str]:
    """Return (job_id, stage, flavor) for the job we want to monitor."""
    try:
        data = json.loads(blob)
    except Exception:
        return ("", "?", "?")
    if isinstance(data, dict) and "jobs" in data:
        data = data["jobs"]
    if not isinstance(data, list) or not data:
        return ("", "?", "?")

    def _stage(j: dict) -> str:
        return str((j.get("status") or {}).get("stage", "")).upper()

    # Sort by createdAt descending — newest first.
    data = sorted(data, key=lambda j: j.get("createdAt", ""), reverse=True)
    running = [j for j in data if _stage(j) == "RUNNING"]
    picked = running[0] if running else data[0]
    jid = picked.get("id") or ""
    st = _stage(picked) or "?"
    flavor = picked.get("flavor") or picked.get("hardware") or "?"
    return jid, st, str(flavor)


def _parse_metrics(logs: str) -> dict[str, str]:
    out: dict[str, str] = {}
    # Training patterns emitted by hydra/training.py:
    #   step=<int>   tok/s=<num>   tps=<num>   val_bpb=<num>   bpb=<num>
    last_step = re.findall(r"step[=:\s]+(\d+)", logs, re.IGNORECASE)
    if last_step:
        out["step"] = last_step[-1]
    last_tps = re.findall(r"(?:tok/?s|tps)[=:\s]+([\d.]+)", logs, re.IGNORECASE)
    if last_tps:
        out["tok/s"] = last_tps[-1]
    last_bpb = re.findall(r"(?:val_)?bpb[=:\s]+([\d.]+)", logs, re.IGNORECASE)
    if last_bpb:
        out["bpb"] = last_bpb[-1]
    # Loss as a tertiary signal
    last_loss = re.findall(r"\bloss[=:\s]+([\d.]+)", logs, re.IGNORECASE)
    if last_loss:
        out["loss"] = last_loss[-1]
    return out


def main() -> int:
    ts = _dt.datetime.now(_dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

    # 1. Find the most recent job (namespace-scoped endpoint).
    jobs_blob = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}")
    if jobs_blob.startswith("__"):
        SUMMARY.open("a").write(f"[{ts}] api_err jobs={jobs_blob}\n")
        return 0

    jid, stage, flavor = _pick_job(jobs_blob)
    if not jid:
        SUMMARY.open("a").write(f"[{ts}] no_job\n")
        return 0

    # 2. Re-query the single job for fresh stage (list endpoint can lag).
    detail = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}")
    try:
        dj = json.loads(detail)
        stage = (dj.get("status") or {}).get("stage", stage) or stage
        flavor = dj.get("flavor") or flavor
    except Exception:
        pass

    # 3. Pull logs only if the job is live (otherwise no metrics to parse).
    logs = ""
    if str(stage).upper() in {"RUNNING", "COMPLETED", "ERROR", "ERRORED"}:
        logs = _get(f"https://huggingface.co/api/jobs/{NAMESPACE}/{jid}/logs")
        RAW.write_text(logs)

    metrics = _parse_metrics(logs) if logs and not logs.startswith("__") else {}

    parts = [f"job={jid}", f"flavor={flavor}", f"stage={stage}"]
    for k in ("step", "tok/s", "bpb", "loss"):
        if k in metrics:
            parts.append(f"{k}={metrics[k]}")
        else:
            parts.append(f"{k}=?")
    SUMMARY.open("a").write(f"[{ts}] " + " ".join(parts) + "\n")
    return 0


if __name__ == "__main__":
    sys.exit(main())