| |
| """Monitor prod9 eval-then-train job and report English eval results. |
| |
| Polls HF job logs, extracts: |
| - Eval phase: BPB, PPL, ROUGE, BLEU |
| - Training phase: step, loss, bpb, tps, val_bpb |
| """ |
| from __future__ import annotations |
|
|
| import json, os, re, subprocess, sys |
| from pathlib import Path |
|
|
| NAMESPACE = "GAInTech" |
| JOB_ID_FILE = Path(__file__).resolve().parents[1] / ".logs" / "last_job_id.txt" |
|
|
|
|
| def get_job_id() -> str: |
| if JOB_ID_FILE.exists(): |
| return JOB_ID_FILE.read_text().strip() |
| return "" |
|
|
|
|
| def fetch_logs(job_id: str) -> str: |
| try: |
| r = subprocess.run( |
| ["hf", "jobs", "logs", "--namespace", NAMESPACE, job_id, "--tail", "200"], |
| capture_output=True, text=True, timeout=60, |
| ) |
| return r.stdout |
| except Exception as e: |
| return f"[ERROR] {e}" |
|
|
|
|
| def parse_eval_results(logs: str) -> dict | None: |
| """Extract English eval metrics from log.""" |
| |
| m = re.search(r"\[BASELINE\] bpb=([\d\.]+) ppl=([\d\.]+)", logs) |
| if not m: |
| return None |
| bpb, ppl = float(m.group(1)), float(m.group(2)) |
|
|
| |
| m2 = re.search( |
| r"\[ENGLISH_EVAL\] ROUGE-1=([\d\.]+) ROUGE-2=([\d\.]+) ROUGE-L=([\d\.]+) BLEU=([\d\.]+)", |
| logs, |
| ) |
| rouge1 = rouge2 = rougeL = bleu = None |
| if m2: |
| rouge1, rouge2, rougeL, bleu = map(float, m2.groups()) |
|
|
| return { |
| "bpb": bpb, |
| "ppl": ppl, |
| "rouge1": rouge1, |
| "rouge2": rouge2, |
| "rougeL": rougeL, |
| "bleu": bleu, |
| } |
|
|
|
|
| def parse_training_metrics(logs: str) -> list[dict]: |
| """Extract step/loss/bpb/tps lines from training log.""" |
| metrics = [] |
| for line in logs.splitlines(): |
| m = re.search(r"step=(\d+).*loss=([\d\.]+).*bpb=([\d\.]+).*tps=(\d+)", line) |
| if m: |
| metrics.append({ |
| "step": int(m.group(1)), |
| "loss": float(m.group(2)), |
| "bpb": float(m.group(3)), |
| "tps": int(m.group(4)), |
| }) |
| return metrics |
|
|
|
|
| def main() -> None: |
| job_id = get_job_id() |
| if not job_id: |
| print("[monitor] no job_id found", file=sys.stderr) |
| sys.exit(1) |
|
|
| logs = fetch_logs(job_id) |
|
|
| |
| eval_results = parse_eval_results(logs) |
| if eval_results: |
| print("[EVAL_RESULTS] baseline eval found:") |
| print(json.dumps(eval_results, indent=2)) |
| else: |
| print("[monitor] eval phase not yet complete or not found in tail") |
|
|
| |
| metrics = parse_training_metrics(logs) |
| if metrics: |
| latest = metrics[-1] |
| print(f"[TRAIN] latest step={latest['step']} loss={latest['loss']:.4f} bpb={latest['bpb']:.4f} tps={latest['tps']}") |
| if len(metrics) >= 2: |
| prev = metrics[-2] |
| bpb_delta = latest['bpb'] - prev['bpb'] |
| print(f"[TRAIN] delta bpb={bpb_delta:+.4f} (lower=better)") |
|
|
| |
| ckpt_matches = re.findall(r"\[ckpt\] saved .* \(step=(\d+)\)", logs) |
| if ckpt_matches: |
| print(f"[CKPT] latest checkpoint at step={ckpt_matches[-1]}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|