| """Feather training step benchmark — local CPU/GPU smoke plus JSON TPS manifest. |
| |
| Usage (CPU smoke): |
| HYDRA_BATCH_SIZE=1 HYDRA_TOTAL_BATCH=1024 HYDRA_N_LAYER=2 HYDRA_D_MODEL=128 \ |
| HYDRA_GPU_BF16_TFLOPS=0.1 HYDRA_CPU_THREADS=4 \ |
| python scripts/benchmark_step.py --steps 3 --manifest-out /tmp/bench.json |
| |
| Usage (GPU / A10G-like subset): |
| HYDRA_BATCH_SIZE=4 HYDRA_TOTAL_BATCH=32768 HYDRA_N_LAYER=6 HYDRA_D_MODEL=384 \ |
| HYDRA_GPU_BF16_TFLOPS=125.0 \ |
| python scripts/benchmark_step.py --seq-len 2048 --steps 20 --manifest-out bench.json |
| |
| The step loop preserves CE-loss training semantics (model(x, y) + backward). It |
| is still synthetic data and therefore smoke/relative-TPS evidence, not corpus |
| quality evidence. Set HYDRA_PROFILE_FORWARD=0 for TPS rows; profiling synchronizes |
| and the emitted manifest marks such rows attribution-only. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import math |
| import os |
| import subprocess |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Any |
|
|
| CD = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| os.chdir(CD) |
| sys.path.insert(0, CD) |
|
|
| import torch |
|
|
| try: |
| from hydra.config import ( |
| SEED, D_MODEL, N_LAYER, D_STATE, HEADDIM, N_HEADS, EXPAND, |
| DEVICE_BATCH_SIZE, TOTAL_BATCH_SIZE, GPU_BF16_PEAK_FLOPS, |
| ADAM_BETAS, |
| ) |
| from hydra.model import PostSemClawModel |
| from hydra.config import PostSemClawConfig |
| from prepare import Tokenizer |
| except Exception as e: |
| print(f"[benchmark] import failed: {e}") |
| raise |
|
|
| try: |
| from harness.tps_manifest_validity import normalize_tps_manifest |
| except Exception: |
| normalize_tps_manifest = None |
|
|
|
|
| ENV_ECHO_KEYS = [ |
| "HYDRA_PROFILE_FORWARD", |
| "HYDRA_MODEL_COMPILE", |
| "HYDRA_MUON_COMPILE", |
| "HYDRA_FUSED_DEVICE_STEP", |
| "HYDRA_FORCE_HTM_CPU", |
| "HYDRA_HTM_FUSED", |
| "HYDRA_HTM_BATCHED_FUSED", |
| "HYDRA_FUSED_SDR_PROJECT", |
| "HYDRA_DISABLE_FUSED_SDR_TRITON", |
| "HYDRA_USE_NEMOTRON", |
| "HYDRA_TARGET_SHARDS", |
| "HYDRA_TOKEN_CACHE_GB", |
| "HYDRA_DISABLE_TOKEN_CACHE", |
| ] |
|
|
|
|
| def _truthy_env(name: str) -> bool: |
| return os.environ.get(name, "0").strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| def _git_sha() -> str: |
| try: |
| return subprocess.run( |
| ["git", "rev-parse", "--short=12", "HEAD"], |
| cwd=CD, |
| text=True, |
| capture_output=True, |
| check=True, |
| timeout=5, |
| ).stdout.strip() |
| except Exception: |
| return "unknown" |
|
|
|
|
| def _warmup(model, x, y, autocast_ctx, n: int = 2): |
| for _ in range(n): |
| with autocast_ctx: |
| loss = model(x, y) |
| loss.backward() |
| model.zero_grad(set_to_none=True) |
|
|
|
|
| def _env_echo() -> dict[str, str]: |
| return {key: os.environ[key] for key in ENV_ECHO_KEYS if key in os.environ} |
|
|
|
|
| def _write_manifest(path: str | None, manifest: dict[str, Any]) -> None: |
| if normalize_tps_manifest is not None: |
| manifest = normalize_tps_manifest(manifest) |
| text = json.dumps(manifest, indent=2, sort_keys=True) + "\n" |
| if not path: |
| print("[benchmark_manifest] " + json.dumps(manifest, sort_keys=True), flush=True) |
| return |
| out = Path(path) |
| out.parent.mkdir(parents=True, exist_ok=True) |
| out.write_text(text, encoding="utf-8") |
| print(f"[benchmark] manifest written: {out}", flush=True) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| p = argparse.ArgumentParser(description="Feather CE step TPS/profiling smoke benchmark") |
| p.add_argument("--steps", type=int, default=int(os.environ.get("BENCH_STEPS", "20"))) |
| p.add_argument("--warmup", type=int, default=int(os.environ.get("BENCH_WARMUP_STEPS", "3"))) |
| p.add_argument("--seq-len", type=int, default=int(os.environ.get("BENCH_SEQ_LEN", os.environ.get("HYDRA_SEQUENCE_LEN", "512")))) |
| p.add_argument("--engram-columns", type=int, default=int(os.environ.get("BENCH_ENGRAM_COLUMNS", "4096"))) |
| p.add_argument("--engram-key-dim", type=int, default=int(os.environ.get("BENCH_ENGRAM_KEY_DIM", "64"))) |
| p.add_argument("--engram-layer-idx", type=int, default=int(os.environ.get("BENCH_ENGRAM_LAYER_IDX", "1"))) |
| p.add_argument("--vocab-size", type=int, default=int(os.environ.get("BENCH_VOCAB_SIZE", "0")), help="Synthetic vocab size for tokenizer-free smoke runs.") |
| p.add_argument("--manifest-out", default=os.environ.get("BENCH_MANIFEST_OUT")) |
| p.add_argument("--task-id", default=os.environ.get("HERMES_KANBAN_TASK", "")) |
| p.add_argument("--run-id", default=os.environ.get("FEATHER_RUN_ID", "local-benchmark-step")) |
| p.add_argument("--runtime-profile", default=os.environ.get("FEATHER_HF_RUNTIME_PROFILE", "local_synthetic_step")) |
| p.add_argument("--metric-role", choices=["tps", "profile"], default=("profile" if _truthy_env("HYDRA_PROFILE_FORWARD") else "tps")) |
| p.add_argument("--active-duplicate-jobs", type=int, default=None, help="Set after HF duplicate-active-job preflight; omitted locally.") |
| return p.parse_args() |
|
|
|
|
| def main(): |
| args = parse_args() |
| torch.manual_seed(SEED) |
| device_str = "cuda" if torch.cuda.is_available() else "cpu" |
| device = torch.device(device_str) |
| if device_str == "cuda": |
| torch.cuda.manual_seed(SEED) |
| torch.set_float32_matmul_precision("high") |
| torch.backends.cuda.matmul.allow_tf32 = True |
| torch.backends.cudnn.allow_tf32 = True |
| else: |
| _cpu_threads = int(os.environ.get("HYDRA_CPU_THREADS", str(min(os.cpu_count() or 4, 8)))) |
| torch.set_num_threads(_cpu_threads) |
| print(f"[CPU] torch.set_num_threads={_cpu_threads}") |
|
|
| if args.vocab_size > 0: |
| vocab_size = args.vocab_size |
| print(f"[benchmark] using synthetic vocab_size={vocab_size}", flush=True) |
| else: |
| tokenizer = Tokenizer.from_directory() |
| vocab_size = tokenizer.get_vocab_size() |
|
|
| config = PostSemClawConfig( |
| sequence_len=args.seq_len, |
| vocab_size=vocab_size, |
| n_layer=N_LAYER, |
| d_model=D_MODEL, |
| d_state=D_STATE, |
| headdim=HEADDIM, |
| n_heads=N_HEADS, |
| expand=EXPAND, |
| engram_n_columns=args.engram_columns, |
| engram_key_dim=args.engram_key_dim, |
| engram_layer_idx=args.engram_layer_idx, |
| ) |
|
|
| model = PostSemClawModel(config).to(device) |
| model.train() |
|
|
| tokens_per_fwdbwd = DEVICE_BATCH_SIZE * config.sequence_len |
| assert TOTAL_BATCH_SIZE % tokens_per_fwdbwd == 0 |
| grad_accum_steps = TOTAL_BATCH_SIZE // tokens_per_fwdbwd |
|
|
| try: |
| optimizer = model.setup_optimizer( |
| unembedding_lr=0.005, |
| embedding_lr=1.0, |
| scalar_lr=0.5, |
| adam_betas=ADAM_BETAS, |
| matrix_lr=0.12, |
| weight_decay=0.01, |
| ) |
| use_optimizer = True |
| except Exception as e: |
| print(f"[benchmark] optimizer setup skipped: {e}") |
| optimizer = None |
| use_optimizer = False |
|
|
| torch.manual_seed(SEED + 1) |
| x = torch.randint(0, vocab_size, (DEVICE_BATCH_SIZE, config.sequence_len), device=device) |
| y = torch.randint(0, vocab_size, (DEVICE_BATCH_SIZE, config.sequence_len), device=device) |
|
|
| autocast_ctx = torch.amp.autocast( |
| device_type=device_str, dtype=torch.bfloat16, enabled=(device_str == "cuda") |
| ) |
|
|
| _warmup(model, x, y, autocast_ctx, n=args.warmup) |
| if device_str == "cuda": |
| torch.cuda.synchronize() |
| torch.cuda.reset_peak_memory_stats() |
|
|
| step_durations: list[float] = [] |
| t0 = time.time() |
| last_loss = None |
| for _step in range(args.steps): |
| s0 = time.time() |
| for _ in range(grad_accum_steps): |
| with autocast_ctx: |
| loss = model(x, y) |
| last_loss = float(loss.detach().cpu()) |
| loss.backward() |
| if use_optimizer: |
| optimizer.step() |
| model.zero_grad(set_to_none=True) |
| if device_str == "cuda": |
| torch.cuda.synchronize() |
| step_durations.append(time.time() - s0) |
|
|
| dt = time.time() - t0 |
| tok_per_sec = int(args.steps * TOTAL_BATCH_SIZE / dt) |
| ms_per_step = dt * 1000 / args.steps |
| vram_mib = 0.0 |
| if device_str == "cuda": |
| vram_mib = torch.cuda.max_memory_allocated() / 1024 / 1024 |
|
|
| sorted_step_tps = sorted(TOTAL_BATCH_SIZE / d for d in step_durations if d > 0) |
| median_tps = sorted_step_tps[len(sorted_step_tps) // 2] if sorted_step_tps else tok_per_sec |
| p90_tps = sorted_step_tps[int((len(sorted_step_tps) - 1) * 0.9)] if sorted_step_tps else tok_per_sec |
| max_tps = max(sorted_step_tps) if sorted_step_tps else tok_per_sec |
| achieved_flops = 6.0 * sum(p.numel() for p in model.parameters()) * tok_per_sec |
| mfu = achieved_flops / (GPU_BF16_PEAK_FLOPS * 1e12) if GPU_BF16_PEAK_FLOPS else 0.0 |
|
|
| print( |
| f"steps={args.steps} tok/s={tok_per_sec} ms/step={ms_per_step:.1f} " |
| f"total_batch={TOTAL_BATCH_SIZE} device_batch={DEVICE_BATCH_SIZE} " |
| f"accum={grad_accum_steps} seq_len={config.sequence_len} " |
| f"n_layer={N_LAYER} d_model={D_MODEL} device={device_str} " |
| f"vram_mib={vram_mib:.0f} mfu={mfu:.4f} loss={last_loss}", |
| flush=True, |
| ) |
|
|
| duplicate_check = {"performed": False, "reason": "local_synthetic_benchmark"} |
| if args.active_duplicate_jobs is not None: |
| duplicate_check = {"performed": True, "active_matching_jobs": args.active_duplicate_jobs} |
| manifest = { |
| "task_id": args.task_id, |
| "run_id": args.run_id, |
| "git_sha": _git_sha(), |
| "metric_role": args.metric_role, |
| "hardware": {"flavor": device_str, "cuda_arch": torch.cuda.get_device_name(0) if device_str == "cuda" else "cpu"}, |
| "runtime_profile": args.runtime_profile, |
| "no_paid_launch_without_gate": True, |
| "duplicate_active_job_check": duplicate_check, |
| "env": _env_echo(), |
| "model": { |
| "sequence_len": config.sequence_len, |
| "n_layer": N_LAYER, |
| "d_model": D_MODEL, |
| "engram_n_columns": args.engram_columns, |
| "engram_key_dim": args.engram_key_dim, |
| }, |
| "receipts": { |
| "profile_forward": _truthy_env("HYDRA_PROFILE_FORWARD") or args.metric_role == "profile", |
| "htm_gpu_verified": _truthy_env("HYDRA_FORCE_HTM_CPU") is False and device_str == "cuda", |
| "training_tps_window": {"median": median_tps, "p90": p90_tps, "max": max_tps}, |
| "flavor_verified": device_str, |
| }, |
| "metrics": { |
| "steps": args.steps, |
| "tok_per_sec": tok_per_sec, |
| "ms_per_step": ms_per_step, |
| "vram_mib": vram_mib, |
| "mfu_estimate": mfu, |
| "last_loss": last_loss, |
| }, |
| } |
| _write_manifest(args.manifest_out, manifest) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|