PAWN / pawn /logging.py
thomas-schweich's picture
Consolidate all training logging through MetricsLogger
b103659
"""Unified JSONL metrics logging for PAWN training pipelines.
Every training script — pretraining, multi-model, adapters — uses MetricsLogger
to ensure consistent, information-rich records. One JSON object per line.
Guaranteed baseline fields on every record:
type, step, timestamp, elapsed
Config records additionally include:
hostname, git_hash, git_tag, slug (if set)
Train/val records additionally include:
mem/* (system + GPU), lr (if provided)
"""
import json
import math
import os
import socket
import subprocess
import time
from datetime import datetime
from pathlib import Path
import psutil
try:
import torch
except ImportError:
torch = None
# ---------------------------------------------------------------------------
# Git info (cached on first call)
# ---------------------------------------------------------------------------
_git_info: dict[str, str | None] | None = None
def _get_git_info() -> dict[str, str | None]:
global _git_info
if _git_info is not None:
return _git_info
git_hash = os.environ.get("PAWN_GIT_HASH")
git_tag = os.environ.get("PAWN_GIT_TAG")
if not git_hash:
try:
git_hash = subprocess.check_output(
["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL, text=True
).strip()
except Exception:
pass
if not git_tag:
try:
git_tag = subprocess.check_output(
["git", "tag", "--points-at", "HEAD"], stderr=subprocess.DEVNULL, text=True
).strip() or None
except Exception:
pass
_git_info = {"git_hash": git_hash, "git_tag": git_tag}
return _git_info
# ---------------------------------------------------------------------------
# Slug generator
# ---------------------------------------------------------------------------
_ADJECTIVES = [
"amber", "bold", "calm", "deft", "eager", "fair", "grim", "hale",
"keen", "lush", "mild", "neat", "pale", "quick", "rare", "sly",
"taut", "vast", "warm", "zesty", "brisk", "crisp", "dense", "fleet",
"grand", "hardy", "jolly", "lucid", "noble", "prime", "stark", "vivid",
]
_ANIMALS = [
"puma", "lynx", "hawk", "wolf", "bear", "deer", "fox", "owl",
"pike", "wren", "crane", "otter", "raven", "cobra", "heron", "bison",
"finch", "marten", "osprey", "falcon", "badger", "salmon", "condor",
"coyote", "ferret", "jackal", "marmot", "parrot", "turtle", "walrus",
]
def random_slug() -> str:
import random
return f"{random.choice(_ADJECTIVES)}-{random.choice(_ANIMALS)}"
# ---------------------------------------------------------------------------
# JSON helpers
# ---------------------------------------------------------------------------
def _sanitize(obj: object) -> object:
"""Replace NaN/Inf with None for valid JSON."""
if isinstance(obj, float) and (math.isnan(obj) or math.isinf(obj)):
return None
if isinstance(obj, dict):
return {k: _sanitize(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_sanitize(v) for v in obj]
return obj
def _json_default(obj: object) -> object:
"""JSON serializer for types not natively supported."""
if torch is not None and isinstance(obj, torch.Tensor):
return obj.item() if obj.numel() == 1 else obj.tolist()
if hasattr(obj, "item"): # numpy scalar
return obj.item()
if isinstance(obj, Path):
return str(obj)
return str(obj)
# ---------------------------------------------------------------------------
# MetricsLogger
# ---------------------------------------------------------------------------
class MetricsLogger:
"""JSONL metrics logger with rich baseline fields on every record.
Usage::
logger = MetricsLogger("logs", run_prefix="film", device="cuda")
logger.log_config(run_type="film", model=cfg.__dict__, training={...})
# Step-based training (pretraining)
logger.log_train(step=100, lr=3e-4, loss=3.5, accuracy=0.06,
step_time=0.34, games_per_sec=750)
logger.log_val(step=100, loss=3.6, accuracy=0.05, patience=2)
# Epoch-based training (adapters)
logger.log_train(step=500, epoch=3, lr=1e-4, train_loss=2.1,
val_loss=2.3, val_top1=0.12, epoch_time_s=45)
logger.close()
"""
def __init__(
self,
log_dir: str | Path,
run_prefix: str = "run",
device: str = "cpu",
slug: str | None = None,
suffix: str = "",
):
"""Create a logger for a new run.
Args:
log_dir: Parent directory for all runs.
run_prefix: Prefix for the run directory name.
device: Device string for GPU memory stats.
slug: Human-readable slug (e.g. "zesty-osprey"). Auto-generated if None.
suffix: Extra suffix for run directory name (e.g. variant name).
"""
self.slug = slug or random_slug()
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
parts = [run_prefix, ts]
if suffix:
parts.append(suffix)
parts.append(self.slug)
dir_name = "_".join(parts)
self.run_dir = Path(log_dir) / dir_name
self.run_dir.mkdir(parents=True, exist_ok=True)
self.metrics_path = self.run_dir / "metrics.jsonl"
self._file = open(self.metrics_path, "a")
self._proc = psutil.Process()
self._device = device
self._start_time = time.time()
@property
def path(self) -> Path:
return self.metrics_path
# -----------------------------------------------------------------------
# Config
# -----------------------------------------------------------------------
def log_config(self, **kwargs: object) -> None:
"""Log a config record. Called once at start of training.
All keyword arguments are included in the record. Common fields:
run_type, model, training, param_count, variant, etc.
"""
record: dict[str, object] = {"type": "config"}
record.update(kwargs)
record["slug"] = self.slug
record["hostname"] = socket.gethostname()
record["timestamp"] = datetime.now().isoformat()
record.update(_get_git_info())
self._write(record)
# -----------------------------------------------------------------------
# Config file (JSON, for checkpoint bundling)
# -----------------------------------------------------------------------
def write_config_json(self, **kwargs: object) -> Path:
"""Write config.json alongside metrics.jsonl. Returns the path."""
data: dict[str, object] = {}
data.update(kwargs)
data["slug"] = self.slug
data.update(_get_git_info())
path = self.run_dir / "config.json"
with open(path, "w") as f:
json.dump(data, f, indent=2, default=_json_default)
return path
# -----------------------------------------------------------------------
# Train records
# -----------------------------------------------------------------------
def log_train(
self,
step: int,
epoch: int | None = None,
**metrics: object,
) -> None:
"""Log a training metrics record.
Standard fields (pass as kwargs):
lr, grad_norm, loss, accuracy, step_time, games_per_sec,
train_loss, train_top1, etc.
Adapter-specific fields are passed through as-is:
film/gamma_norm_L0, lora/B_norm_q, adapter/up_norm, etc.
"""
record: dict[str, object] = {"type": "train", "step": step}
if epoch is not None:
record["epoch"] = epoch
# Normalize common field names: accept both flat and prefixed
for k, v in metrics.items():
record[k] = v
self._add_baseline(record)
self._write(record)
# -----------------------------------------------------------------------
# Validation records
# -----------------------------------------------------------------------
def log_val(
self,
step: int,
epoch: int | None = None,
**metrics: object,
) -> None:
"""Log a validation metrics record.
Standard fields (pass as kwargs):
loss (or val/loss), accuracy, top5_accuracy,
patience, best_val_loss, best_val_step, etc.
"""
record: dict[str, object] = {"type": "val", "step": step}
if epoch is not None:
record["epoch"] = epoch
for k, v in metrics.items():
record[k] = v
self._add_baseline(record)
self._write(record)
# -----------------------------------------------------------------------
# Generic log (backward compat)
# -----------------------------------------------------------------------
def log(
self,
metrics: dict,
step: int | None = None,
epoch: int | None = None,
record_type: str = "train",
include_resources: bool = True,
) -> None:
"""Log a generic metrics record. Prefer log_train/log_val for new code."""
record: dict[str, object] = {"type": record_type}
if step is not None:
record["step"] = step
if epoch is not None:
record["epoch"] = epoch
record.update(metrics)
self._add_baseline(record, include_resources=include_resources)
self._write(record)
# -----------------------------------------------------------------------
# Internals
# -----------------------------------------------------------------------
def _add_baseline(self, record: dict, include_resources: bool = True) -> None:
"""Add timestamp, elapsed, and resource stats to a record."""
record["timestamp"] = datetime.now().isoformat()
record["elapsed"] = round(time.time() - self._start_time, 3)
if include_resources:
self._add_resource_stats(record)
def _add_resource_stats(self, record: dict) -> None:
"""Add memory and CPU stats to a record."""
mem_info = self._proc.memory_info()
sys_mem = psutil.virtual_memory()
per_cpu = psutil.cpu_percent(interval=None, percpu=True)
record["mem/system_rss_gb"] = round(mem_info.rss / (1024**3), 3)
record["mem/system_used_gb"] = round(sys_mem.used / (1024**3), 3)
record["mem/system_total_gb"] = round(sys_mem.total / (1024**3), 3)
record["mem/cpu_percent"] = round(sum(per_cpu) if per_cpu else 0.0, 1)
if self._device != "cpu" and torch is not None and torch.cuda.is_available():
record["mem/gpu_peak_gb"] = round(torch.cuda.max_memory_allocated() / (1024**3), 3)
record["mem/gpu_reserved_gb"] = round(torch.cuda.memory_reserved() / (1024**3), 3)
record["mem/gpu_current_gb"] = round(torch.cuda.memory_allocated() / (1024**3), 3)
torch.cuda.reset_peak_memory_stats()
def _write(self, record: dict) -> None:
"""Write a record, sanitizing NaN/Inf to null."""
self._file.write(json.dumps(_sanitize(record), default=_json_default) + "\n")
self._file.flush()
def close(self) -> None:
if self._file and not self._file.closed:
self._file.close()
def __enter__(self) -> "MetricsLogger":
return self
def __exit__(self, *args: object) -> None:
self.close()