#!/usr/bin/env python3 """ Derived from Andrej Karpathy's nanochat project. MIT License Copyright (c) 2025 Andrej Karpathy Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. """ from __future__ import annotations import argparse import csv from dataclasses import dataclass import json import math from pathlib import Path import statistics import numpy as np @dataclass(frozen=True) class CurvePoint: dropout: float mean_val_loss: float std_val_loss: float n: int @dataclass(frozen=True) class CalibrationCell: source: str run_dir: Path run_mode: str model_name: str n_layer: int n_head: int n_embd: int parameters: int unique_tokens: int sampled_tokens: int best_grid_dropout: float best_quad_dropout: float target_dropout: float best_val_loss: float best_val_std: float boundary_optimum: bool bracketed_optimum: bool weight: float curve: tuple[CurvePoint, ...] @property def x_model_pressure(self) -> float: return math.log10(self.parameters / self.unique_tokens) @property def x_sample_pressure(self) -> float: return math.log10(self.sampled_tokens / self.unique_tokens) FEATURE_SETS = { "base": ( ("A", "log10(P / U)"), ("B", "log10(C / U)"), ("C0", "1"), ), "interaction": ( ("A", "log10(P / U)"), ("B", "log10(C / U)"), ("D", "log10(P / U) * log10(C / U)"), ("C0", "1"), ), "quadratic": ( ("A", "log10(P / U)"), ("B", "log10(C / U)"), ("Qp", "log10(P / U)^2"), ("Qc", "log10(C / U)^2"), ("C0", "1"), ), "full_quadratic": ( ("A", "log10(P / U)"), ("B", "log10(C / U)"), ("D", "log10(P / U) * log10(C / U)"), ("Qp", "log10(P / U)^2"), ("Qc", "log10(C / U)^2"), ("C0", "1"), ), } def feature_vector(cell: CalibrationCell, feature_set: str) -> list[float]: x_model = cell.x_model_pressure x_sample = cell.x_sample_pressure if feature_set == "base": return [x_model, x_sample, 1.0] if feature_set == "interaction": return [x_model, x_sample, x_model * x_sample, 1.0] if feature_set == "quadratic": return [x_model, x_sample, x_model * x_model, x_sample * x_sample, 1.0] if feature_set == "full_quadratic": return [ x_model, x_sample, x_model * x_sample, x_model * x_model, x_sample * x_sample, 1.0, ] raise ValueError(f"unknown feature set: {feature_set}") def coefficient_names(feature_set: str) -> list[str]: return [name for name, _ in FEATURE_SETS[feature_set]] def formula_terms(feature_set: str, coef: np.ndarray) -> list[str]: terms: list[str] = [] for value, (_, label) in zip(coef, FEATURE_SETS[feature_set]): if label == "1": terms.append(f"{value:+.6f}") else: terms.append(f"{value:+.6f} * {label}") return terms def predict_dropout(cell: CalibrationCell, coef: np.ndarray, feature_set: str) -> float: return float(np.array(feature_vector(cell, feature_set), dtype=np.float64) @ coef) def parse_curve(raw: str) -> list[CurvePoint]: data = json.loads(raw) points: list[CurvePoint] = [] if isinstance(data, dict): for dropout, value in data.items(): points.append( CurvePoint( dropout=float(dropout), mean_val_loss=float(value), std_val_loss=0.0, n=1, ) ) else: for item in data: mean_val_loss = item.get("mean_val_loss") if mean_val_loss is None: mean_val_loss = item.get("val_loss") if mean_val_loss is None: mean_val_loss = item.get("eval_loss") if mean_val_loss is None: raise ValueError(f"curve point has no validation-loss field: {item}") points.append( CurvePoint( dropout=float(item["dropout"]), mean_val_loss=float(mean_val_loss), std_val_loss=float(item.get("std_val_loss", 0.0)), n=int(item.get("n", 1)), ) ) return sorted(points, key=lambda point: point.dropout) def quadratic_minimum(points: list[CurvePoint]) -> tuple[float, bool]: if not points: raise ValueError("cannot estimate optimum from an empty curve") best_index = min(range(len(points)), key=lambda index: points[index].mean_val_loss) if best_index == 0 or best_index == len(points) - 1: return points[best_index].dropout, False left, mid, right = points[best_index - 1 : best_index + 2] x1, y1 = left.dropout, left.mean_val_loss x2, y2 = mid.dropout, mid.mean_val_loss x3, y3 = right.dropout, right.mean_val_loss denominator = (x1 - x2) * (x1 - x3) * (x2 - x3) if abs(denominator) < 1e-12: return mid.dropout, False a = (x3 * (y2 - y1) + x2 * (y1 - y3) + x1 * (y3 - y2)) / denominator b = ( x3 * x3 * (y1 - y2) + x2 * x2 * (y3 - y1) + x1 * x1 * (y2 - y3) ) / denominator if a <= 0.0: return mid.dropout, False optimum = -b / (2.0 * a) low = min(x1, x3) high = max(x1, x3) return max(low, min(high, optimum)), True def load_metrics_by_cell(run_dir: Path) -> dict[tuple[str, int], int]: metrics_path = run_dir / "metrics.jsonl" if not metrics_path.exists(): return {} sampled_tokens: dict[tuple[str, int], int] = {} for line in metrics_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue row = json.loads(line) model = str(row["model_name"]) token_limit = int(row["token_limit"]) key = (model, token_limit) sampled_tokens[key] = max( int(row.get("tokens_seen", 0)), sampled_tokens.get(key, 0), ) return sampled_tokens def source_label(run_dir: Path) -> str: parts = run_dir.parts if len(parts) >= 3: return "/".join(parts[-3:]) return str(run_dir) def load_cells(run_dirs: list[Path], target: str, weighting: str) -> list[CalibrationCell]: cells: list[CalibrationCell] = [] for run_dir in run_dirs: selection_path = run_dir / "model_selection.csv" if not selection_path.exists(): raise FileNotFoundError(f"missing model_selection.csv under {run_dir}") sampled_by_cell = load_metrics_by_cell(run_dir) with selection_path.open(newline="", encoding="utf-8") as handle: for row in csv.DictReader(handle): curve = parse_curve(row["curve_json"]) if not curve: continue best_grid = float(row["best_dropout"]) best_quad, bracketed = quadratic_minimum(curve) target_dropout = best_quad if target == "quad" else best_grid model_name = row["model_name"] unique_tokens = int(float(row["token_limit"])) sampled_tokens = sampled_by_cell.get((model_name, unique_tokens), 0) if not sampled_tokens: sampled_tokens = max( unique_tokens, int(float(row.get("tokens_seen", unique_tokens))), ) rates = [point.dropout for point in curve] boundary = best_grid in {min(rates), max(rates)} best = min(curve, key=lambda point: point.mean_val_loss) loss_span = max(point.mean_val_loss for point in curve) - best.mean_val_loss weight = 1.0 if weighting == "heuristic": if boundary: weight *= 0.30 if not bracketed: weight *= 0.50 if loss_span < 0.02: weight *= 0.50 if best.std_val_loss > 0.0: weight *= 1.0 / (1.0 + 20.0 * best.std_val_loss) cells.append( CalibrationCell( source=source_label(run_dir), run_dir=run_dir, run_mode=row["run_mode"], model_name=model_name, n_layer=int(float(row["n_layer"])), n_head=int(float(row["n_head"])), n_embd=int(float(row["n_embd"])), parameters=int(float(row["parameters"])), unique_tokens=unique_tokens, sampled_tokens=sampled_tokens, best_grid_dropout=best_grid, best_quad_dropout=best_quad, target_dropout=target_dropout, best_val_loss=float(row["best_val_loss"]), best_val_std=float(row["best_val_std"]), boundary_optimum=boundary, bracketed_optimum=bracketed, weight=max(weight, 0.05), curve=tuple(curve), ) ) return cells def fit_coefficients( cells: list[CalibrationCell], feature_set: str, ) -> tuple[np.ndarray, dict[str, float]]: feature_count = len(FEATURE_SETS[feature_set]) if len(cells) < feature_count: raise ValueError( f"need at least {feature_count} cells to fit feature set {feature_set}" ) x = np.array( [feature_vector(cell, feature_set) for cell in cells], dtype=np.float64, ) y = np.array([cell.target_dropout for cell in cells], dtype=np.float64) weights = np.array([cell.weight for cell in cells], dtype=np.float64) sqrt_w = np.sqrt(weights) coef, *_ = np.linalg.lstsq(x * sqrt_w[:, None], y * sqrt_w, rcond=None) pred = x @ coef errors = pred - y metrics = { "n": float(len(cells)), "rmse": float(np.sqrt(np.mean(errors * errors))), "mae": float(np.mean(np.abs(errors))), "bias": float(np.mean(errors)), "weighted_rmse": float(np.sqrt(np.average(errors * errors, weights=weights))), "weighted_mae": float(np.average(np.abs(errors), weights=weights)), } return coef, metrics def grouped_cv( cells: list[CalibrationCell], key_name: str, feature_set: str, ) -> dict[str, float]: groups: dict[str, list[CalibrationCell]] = {} for cell in cells: if key_name == "model": key = cell.model_name elif key_name == "prefix": key = str(cell.unique_tokens) elif key_name == "source": key = cell.source else: raise ValueError(f"unknown cv key: {key_name}") groups.setdefault(key, []).append(cell) errors: list[float] = [] for key, held_out in groups.items(): train = [cell for cell in cells if cell not in held_out] if len(train) < len(FEATURE_SETS[feature_set]): continue coef, _ = fit_coefficients(train, feature_set) for cell in held_out: prediction = predict_dropout(cell, coef, feature_set) errors.append(prediction - cell.target_dropout) if not errors: return {"n": 0.0, "rmse": float("nan"), "mae": float("nan"), "bias": float("nan")} return { "n": float(len(errors)), "rmse": math.sqrt(statistics.fmean(error * error for error in errors)), "mae": statistics.fmean(abs(error) for error in errors), "bias": statistics.fmean(errors), } def suggest_fine_rates( cell: CalibrationCell, min_rate: float, max_rate: float, spacing: float, ) -> list[float]: center = cell.target_dropout candidates = { round(max(min_rate, min(max_rate, center + delta)), 3) for delta in (-2 * spacing, -spacing, 0.0, spacing, 2 * spacing) } existing = {round(point.dropout, 3) for point in cell.curve} return sorted(rate for rate in candidates if rate not in existing) def write_cells( path: Path, cells: list[CalibrationCell], coef: np.ndarray, feature_set: str, ) -> None: fieldnames = [ "source", "run_dir", "run_mode", "model_name", "n_layer", "n_head", "n_embd", "parameters", "unique_tokens", "sampled_tokens", "x_model_pressure", "x_sample_pressure", "best_grid_dropout", "best_quad_dropout", "target_dropout", "predicted_dropout", "residual", "best_val_loss", "best_val_std", "boundary_optimum", "bracketed_optimum", "weight", ] with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() for cell in cells: prediction = predict_dropout(cell, coef, feature_set) writer.writerow( { "source": cell.source, "run_dir": str(cell.run_dir), "run_mode": cell.run_mode, "model_name": cell.model_name, "n_layer": cell.n_layer, "n_head": cell.n_head, "n_embd": cell.n_embd, "parameters": cell.parameters, "unique_tokens": cell.unique_tokens, "sampled_tokens": cell.sampled_tokens, "x_model_pressure": cell.x_model_pressure, "x_sample_pressure": cell.x_sample_pressure, "best_grid_dropout": cell.best_grid_dropout, "best_quad_dropout": cell.best_quad_dropout, "target_dropout": cell.target_dropout, "predicted_dropout": prediction, "residual": prediction - cell.target_dropout, "best_val_loss": cell.best_val_loss, "best_val_std": cell.best_val_std, "boundary_optimum": cell.boundary_optimum, "bracketed_optimum": cell.bracketed_optimum, "weight": cell.weight, } ) def write_suggestions( path: Path, cells: list[CalibrationCell], min_rate: float, max_rate: float, spacing: float, ) -> None: fieldnames = [ "source", "model_name", "unique_tokens", "target_dropout", "boundary_optimum", "bracketed_optimum", "suggested_rates", ] with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() for cell in cells: writer.writerow( { "source": cell.source, "model_name": cell.model_name, "unique_tokens": cell.unique_tokens, "target_dropout": f"{cell.target_dropout:.4f}", "boundary_optimum": cell.boundary_optimum, "bracketed_optimum": cell.bracketed_optimum, "suggested_rates": " ".join( f"{rate:.3f}" for rate in suggest_fine_rates(cell, min_rate, max_rate, spacing) ), } ) def write_report( path: Path, cells: list[CalibrationCell], coef: np.ndarray, metrics: dict[str, float], cv: dict[str, dict[str, float]], target: str, feature_set: str, ) -> None: lines = [ "# Dropout Coefficient Fit Diagnostics", "", f"Target: `{target}`", f"Feature set: `{feature_set}`", "", "## Coefficients", "", "| Coefficient | Term | Value |", "|---|---|---:|", ] for value, (name, label) in zip(coef, FEATURE_SETS[feature_set]): lines.append(f"| `{name}` | `{label}` | {value:.6f} |") terms = formula_terms(feature_set, coef) lines.extend( [ "", "Formula:", "", "```text", "p = clamp(p_min, p_max,", " " + "\n ".join(terms) + ")", "```", "", "## Fit Metrics", "", "| Metric | Value |", "|---|---:|", ] ) for key in ["n", "rmse", "mae", "bias", "weighted_rmse", "weighted_mae"]: lines.append(f"| `{key}` | {metrics[key]:.6f} |") lines.extend(["", "## Cross-Validation", "", "| Holdout | n | RMSE | MAE | Bias |", "|---|---:|---:|---:|---:|"]) for key, item in cv.items(): lines.append( f"| `{key}` | {item['n']:.0f} | {item['rmse']:.6f} | " f"{item['mae']:.6f} | {item['bias']:.6f} |" ) lines.extend( [ "", "## Calibration Cells", "", "| Source | Model | Params | Unique | Sampled | Grid p | Quad p | Target p | Pred p | Residual | Weight | Bracketed | Boundary |", "|---|---|---:|---:|---:|---:|---:|---:|---:|---:|---:|---|---|", ] ) for cell in sorted(cells, key=lambda c: (c.source, c.parameters, c.unique_tokens)): prediction = predict_dropout(cell, coef, feature_set) lines.append( "| " f"`{cell.source}` | `{cell.model_name}` | {cell.parameters:,} | " f"{cell.unique_tokens:,} | {cell.sampled_tokens:,} | " f"{cell.best_grid_dropout:.3f} | {cell.best_quad_dropout:.3f} | " f"{cell.target_dropout:.3f} | {prediction:.3f} | " f"{prediction - cell.target_dropout:+.3f} | {cell.weight:.3f} | " f"{cell.bracketed_optimum} | {cell.boundary_optimum} |" ) path.write_text("\n".join(lines) + "\n", encoding="utf-8") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Fit dropout pressure coefficients from saved static sweep outputs." ) parser.add_argument("--run-dirs", nargs="+", type=Path, required=True) parser.add_argument("--output-dir", type=Path, required=True) parser.add_argument("--target", choices=["grid", "quad"], default="quad") parser.add_argument( "--weighting", choices=["none", "heuristic"], default="heuristic", help="Use equal weights or downweight boundary/flat/noisy optima.", ) parser.add_argument("--min-rate", type=float, default=0.0) parser.add_argument("--max-rate", type=float, default=0.65) parser.add_argument("--fine-spacing", type=float, default=0.02) parser.add_argument( "--feature-set", choices=sorted(FEATURE_SETS), default="base", help="Feature family used to map pressure variables to dropout.", ) return parser def main() -> None: args = build_parser().parse_args() args.output_dir.mkdir(parents=True, exist_ok=True) cells = load_cells(args.run_dirs, args.target, args.weighting) coef, metrics = fit_coefficients(cells, args.feature_set) cv = { "leave_model": grouped_cv(cells, "model", args.feature_set), "leave_prefix": grouped_cv(cells, "prefix", args.feature_set), } if len({cell.source for cell in cells}) > 1: cv["leave_source"] = grouped_cv(cells, "source", args.feature_set) coefficient_values = { name: float(value) for name, value in zip(coefficient_names(args.feature_set), coef) } coefficients = { "target": args.target, "feature_set": args.feature_set, "formula": "p = " + " ".join(formula_terms(args.feature_set, coef)), "weighting": args.weighting, "coefficients": coefficient_values, "metrics": metrics, "cv": cv, "run_dirs": [str(path) for path in args.run_dirs], } coefficients.update(coefficient_values) (args.output_dir / "coefficients.json").write_text( json.dumps(coefficients, indent=2), encoding="utf-8", ) write_cells(args.output_dir / "calibration_cells.csv", cells, coef, args.feature_set) write_suggestions( args.output_dir / "next_dropout_suggestions.csv", cells, args.min_rate, args.max_rate, args.fine_spacing, ) write_report( args.output_dir / "fit_diagnostics.md", cells, coef, metrics, cv, args.target, args.feature_set, ) print( json.dumps( { "output_dir": str(args.output_dir), "cells": len(cells), "feature_set": args.feature_set, "coefficients": coefficient_values, "rmse": metrics["rmse"], "mae": metrics["mae"], }, indent=2, ) ) if __name__ == "__main__": main()