"""Feather training step benchmark — local CPU/GPU smoke plus JSON TPS manifest. Usage (CPU smoke): HYDRA_BATCH_SIZE=1 HYDRA_TOTAL_BATCH=1024 HYDRA_N_LAYER=2 HYDRA_D_MODEL=128 \ HYDRA_GPU_BF16_TFLOPS=0.1 HYDRA_CPU_THREADS=4 \ python scripts/benchmark_step.py --steps 3 --manifest-out /tmp/bench.json Usage (GPU / A10G-like subset): HYDRA_BATCH_SIZE=4 HYDRA_TOTAL_BATCH=32768 HYDRA_N_LAYER=6 HYDRA_D_MODEL=384 \ HYDRA_GPU_BF16_TFLOPS=125.0 \ python scripts/benchmark_step.py --seq-len 2048 --steps 20 --manifest-out bench.json The step loop preserves CE-loss training semantics (model(x, y) + backward). It is still synthetic data and therefore smoke/relative-TPS evidence, not corpus quality evidence. Set HYDRA_PROFILE_FORWARD=0 for TPS rows; profiling synchronizes and the emitted manifest marks such rows attribution-only. """ from __future__ import annotations import argparse import json import math import os import subprocess import sys import time from pathlib import Path from typing import Any CD = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) os.chdir(CD) sys.path.insert(0, CD) import torch try: from hydra.config import ( SEED, D_MODEL, N_LAYER, D_STATE, HEADDIM, N_HEADS, EXPAND, DEVICE_BATCH_SIZE, TOTAL_BATCH_SIZE, GPU_BF16_PEAK_FLOPS, ADAM_BETAS, ) from hydra.model import PostSemClawModel from hydra.config import PostSemClawConfig from prepare import Tokenizer except Exception as e: print(f"[benchmark] import failed: {e}") raise try: from harness.tps_manifest_validity import normalize_tps_manifest except Exception: normalize_tps_manifest = None ENV_ECHO_KEYS = [ "HYDRA_PROFILE_FORWARD", "HYDRA_MODEL_COMPILE", "HYDRA_MUON_COMPILE", "HYDRA_FUSED_DEVICE_STEP", "HYDRA_FORCE_HTM_CPU", "HYDRA_HTM_FUSED", "HYDRA_HTM_BATCHED_FUSED", "HYDRA_FUSED_SDR_PROJECT", "HYDRA_DISABLE_FUSED_SDR_TRITON", "HYDRA_USE_NEMOTRON", "HYDRA_TARGET_SHARDS", "HYDRA_TOKEN_CACHE_GB", "HYDRA_DISABLE_TOKEN_CACHE", ] def _truthy_env(name: str) -> bool: return os.environ.get(name, "0").strip().lower() in {"1", "true", "yes", "on"} def _git_sha() -> str: try: return subprocess.run( ["git", "rev-parse", "--short=12", "HEAD"], cwd=CD, text=True, capture_output=True, check=True, timeout=5, ).stdout.strip() except Exception: return "unknown" def _warmup(model, x, y, autocast_ctx, n: int = 2): for _ in range(n): with autocast_ctx: loss = model(x, y) loss.backward() model.zero_grad(set_to_none=True) def _env_echo() -> dict[str, str]: return {key: os.environ[key] for key in ENV_ECHO_KEYS if key in os.environ} def _write_manifest(path: str | None, manifest: dict[str, Any]) -> None: if normalize_tps_manifest is not None: manifest = normalize_tps_manifest(manifest) text = json.dumps(manifest, indent=2, sort_keys=True) + "\n" if not path: print("[benchmark_manifest] " + json.dumps(manifest, sort_keys=True), flush=True) return out = Path(path) out.parent.mkdir(parents=True, exist_ok=True) out.write_text(text, encoding="utf-8") print(f"[benchmark] manifest written: {out}", flush=True) def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Feather CE step TPS/profiling smoke benchmark") p.add_argument("--steps", type=int, default=int(os.environ.get("BENCH_STEPS", "20"))) p.add_argument("--warmup", type=int, default=int(os.environ.get("BENCH_WARMUP_STEPS", "3"))) p.add_argument("--seq-len", type=int, default=int(os.environ.get("BENCH_SEQ_LEN", os.environ.get("HYDRA_SEQUENCE_LEN", "512")))) p.add_argument("--engram-columns", type=int, default=int(os.environ.get("BENCH_ENGRAM_COLUMNS", "4096"))) p.add_argument("--engram-key-dim", type=int, default=int(os.environ.get("BENCH_ENGRAM_KEY_DIM", "64"))) p.add_argument("--engram-layer-idx", type=int, default=int(os.environ.get("BENCH_ENGRAM_LAYER_IDX", "1"))) p.add_argument("--vocab-size", type=int, default=int(os.environ.get("BENCH_VOCAB_SIZE", "0")), help="Synthetic vocab size for tokenizer-free smoke runs.") p.add_argument("--manifest-out", default=os.environ.get("BENCH_MANIFEST_OUT")) p.add_argument("--task-id", default=os.environ.get("HERMES_KANBAN_TASK", "")) p.add_argument("--run-id", default=os.environ.get("FEATHER_RUN_ID", "local-benchmark-step")) p.add_argument("--runtime-profile", default=os.environ.get("FEATHER_HF_RUNTIME_PROFILE", "local_synthetic_step")) p.add_argument("--metric-role", choices=["tps", "profile"], default=("profile" if _truthy_env("HYDRA_PROFILE_FORWARD") else "tps")) p.add_argument("--active-duplicate-jobs", type=int, default=None, help="Set after HF duplicate-active-job preflight; omitted locally.") return p.parse_args() def main(): args = parse_args() torch.manual_seed(SEED) device_str = "cuda" if torch.cuda.is_available() else "cpu" device = torch.device(device_str) if device_str == "cuda": torch.cuda.manual_seed(SEED) torch.set_float32_matmul_precision("high") torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True else: _cpu_threads = int(os.environ.get("HYDRA_CPU_THREADS", str(min(os.cpu_count() or 4, 8)))) torch.set_num_threads(_cpu_threads) print(f"[CPU] torch.set_num_threads={_cpu_threads}") if args.vocab_size > 0: vocab_size = args.vocab_size print(f"[benchmark] using synthetic vocab_size={vocab_size}", flush=True) else: tokenizer = Tokenizer.from_directory() vocab_size = tokenizer.get_vocab_size() config = PostSemClawConfig( sequence_len=args.seq_len, vocab_size=vocab_size, n_layer=N_LAYER, d_model=D_MODEL, d_state=D_STATE, headdim=HEADDIM, n_heads=N_HEADS, expand=EXPAND, engram_n_columns=args.engram_columns, engram_key_dim=args.engram_key_dim, engram_layer_idx=args.engram_layer_idx, ) model = PostSemClawModel(config).to(device) model.train() tokens_per_fwdbwd = DEVICE_BATCH_SIZE * config.sequence_len assert TOTAL_BATCH_SIZE % tokens_per_fwdbwd == 0 grad_accum_steps = TOTAL_BATCH_SIZE // tokens_per_fwdbwd try: optimizer = model.setup_optimizer( unembedding_lr=0.005, embedding_lr=1.0, scalar_lr=0.5, adam_betas=ADAM_BETAS, matrix_lr=0.12, weight_decay=0.01, ) use_optimizer = True except Exception as e: print(f"[benchmark] optimizer setup skipped: {e}") optimizer = None use_optimizer = False torch.manual_seed(SEED + 1) x = torch.randint(0, vocab_size, (DEVICE_BATCH_SIZE, config.sequence_len), device=device) y = torch.randint(0, vocab_size, (DEVICE_BATCH_SIZE, config.sequence_len), device=device) autocast_ctx = torch.amp.autocast( device_type=device_str, dtype=torch.bfloat16, enabled=(device_str == "cuda") ) _warmup(model, x, y, autocast_ctx, n=args.warmup) if device_str == "cuda": torch.cuda.synchronize() torch.cuda.reset_peak_memory_stats() step_durations: list[float] = [] t0 = time.time() last_loss = None for _step in range(args.steps): s0 = time.time() for _ in range(grad_accum_steps): with autocast_ctx: loss = model(x, y) last_loss = float(loss.detach().cpu()) loss.backward() if use_optimizer: optimizer.step() model.zero_grad(set_to_none=True) if device_str == "cuda": torch.cuda.synchronize() step_durations.append(time.time() - s0) dt = time.time() - t0 tok_per_sec = int(args.steps * TOTAL_BATCH_SIZE / dt) ms_per_step = dt * 1000 / args.steps vram_mib = 0.0 if device_str == "cuda": vram_mib = torch.cuda.max_memory_allocated() / 1024 / 1024 sorted_step_tps = sorted(TOTAL_BATCH_SIZE / d for d in step_durations if d > 0) median_tps = sorted_step_tps[len(sorted_step_tps) // 2] if sorted_step_tps else tok_per_sec p90_tps = sorted_step_tps[int((len(sorted_step_tps) - 1) * 0.9)] if sorted_step_tps else tok_per_sec max_tps = max(sorted_step_tps) if sorted_step_tps else tok_per_sec achieved_flops = 6.0 * sum(p.numel() for p in model.parameters()) * tok_per_sec mfu = achieved_flops / (GPU_BF16_PEAK_FLOPS * 1e12) if GPU_BF16_PEAK_FLOPS else 0.0 print( f"steps={args.steps} tok/s={tok_per_sec} ms/step={ms_per_step:.1f} " f"total_batch={TOTAL_BATCH_SIZE} device_batch={DEVICE_BATCH_SIZE} " f"accum={grad_accum_steps} seq_len={config.sequence_len} " f"n_layer={N_LAYER} d_model={D_MODEL} device={device_str} " f"vram_mib={vram_mib:.0f} mfu={mfu:.4f} loss={last_loss}", flush=True, ) duplicate_check = {"performed": False, "reason": "local_synthetic_benchmark"} if args.active_duplicate_jobs is not None: duplicate_check = {"performed": True, "active_matching_jobs": args.active_duplicate_jobs} manifest = { "task_id": args.task_id, "run_id": args.run_id, "git_sha": _git_sha(), "metric_role": args.metric_role, "hardware": {"flavor": device_str, "cuda_arch": torch.cuda.get_device_name(0) if device_str == "cuda" else "cpu"}, "runtime_profile": args.runtime_profile, "no_paid_launch_without_gate": True, "duplicate_active_job_check": duplicate_check, "env": _env_echo(), "model": { "sequence_len": config.sequence_len, "n_layer": N_LAYER, "d_model": D_MODEL, "engram_n_columns": args.engram_columns, "engram_key_dim": args.engram_key_dim, }, "receipts": { "profile_forward": _truthy_env("HYDRA_PROFILE_FORWARD") or args.metric_role == "profile", "htm_gpu_verified": _truthy_env("HYDRA_FORCE_HTM_CPU") is False and device_str == "cuda", "training_tps_window": {"median": median_tps, "p90": p90_tps, "max": max_tps}, "flavor_verified": device_str, }, "metrics": { "steps": args.steps, "tok_per_sec": tok_per_sec, "ms_per_step": ms_per_step, "vram_mib": vram_mib, "mfu_estimate": mfu, "last_loss": last_loss, }, } _write_manifest(args.manifest_out, manifest) if __name__ == "__main__": main()