icarus112's picture
Update Feather a10g-large training runtime image
c475135 verified
Raw
History Blame Contribute Delete
10.8 kB
"""Feather training step benchmark — local CPU/GPU smoke plus JSON TPS manifest.
Usage (CPU smoke):
HYDRA_BATCH_SIZE=1 HYDRA_TOTAL_BATCH=1024 HYDRA_N_LAYER=2 HYDRA_D_MODEL=128 \
HYDRA_GPU_BF16_TFLOPS=0.1 HYDRA_CPU_THREADS=4 \
python scripts/benchmark_step.py --steps 3 --manifest-out /tmp/bench.json
Usage (GPU / A10G-like subset):
HYDRA_BATCH_SIZE=4 HYDRA_TOTAL_BATCH=32768 HYDRA_N_LAYER=6 HYDRA_D_MODEL=384 \
HYDRA_GPU_BF16_TFLOPS=125.0 \
python scripts/benchmark_step.py --seq-len 2048 --steps 20 --manifest-out bench.json
The step loop preserves CE-loss training semantics (model(x, y) + backward). It
is still synthetic data and therefore smoke/relative-TPS evidence, not corpus
quality evidence. Set HYDRA_PROFILE_FORWARD=0 for TPS rows; profiling synchronizes
and the emitted manifest marks such rows attribution-only.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Any
CD = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.chdir(CD)
sys.path.insert(0, CD)
import torch
try:
from hydra.config import (
SEED, D_MODEL, N_LAYER, D_STATE, HEADDIM, N_HEADS, EXPAND,
DEVICE_BATCH_SIZE, TOTAL_BATCH_SIZE, GPU_BF16_PEAK_FLOPS,
ADAM_BETAS,
)
from hydra.model import PostSemClawModel
from hydra.config import PostSemClawConfig
from prepare import Tokenizer
except Exception as e:
print(f"[benchmark] import failed: {e}")
raise
try:
from harness.tps_manifest_validity import normalize_tps_manifest
except Exception:
normalize_tps_manifest = None
ENV_ECHO_KEYS = [
"HYDRA_PROFILE_FORWARD",
"HYDRA_MODEL_COMPILE",
"HYDRA_MUON_COMPILE",
"HYDRA_FUSED_DEVICE_STEP",
"HYDRA_FORCE_HTM_CPU",
"HYDRA_HTM_FUSED",
"HYDRA_HTM_BATCHED_FUSED",
"HYDRA_FUSED_SDR_PROJECT",
"HYDRA_DISABLE_FUSED_SDR_TRITON",
"HYDRA_USE_NEMOTRON",
"HYDRA_TARGET_SHARDS",
"HYDRA_TOKEN_CACHE_GB",
"HYDRA_DISABLE_TOKEN_CACHE",
]
def _truthy_env(name: str) -> bool:
return os.environ.get(name, "0").strip().lower() in {"1", "true", "yes", "on"}
def _git_sha() -> str:
try:
return subprocess.run(
["git", "rev-parse", "--short=12", "HEAD"],
cwd=CD,
text=True,
capture_output=True,
check=True,
timeout=5,
).stdout.strip()
except Exception:
return "unknown"
def _warmup(model, x, y, autocast_ctx, n: int = 2):
for _ in range(n):
with autocast_ctx:
loss = model(x, y)
loss.backward()
model.zero_grad(set_to_none=True)
def _env_echo() -> dict[str, str]:
return {key: os.environ[key] for key in ENV_ECHO_KEYS if key in os.environ}
def _write_manifest(path: str | None, manifest: dict[str, Any]) -> None:
if normalize_tps_manifest is not None:
manifest = normalize_tps_manifest(manifest)
text = json.dumps(manifest, indent=2, sort_keys=True) + "\n"
if not path:
print("[benchmark_manifest] " + json.dumps(manifest, sort_keys=True), flush=True)
return
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(text, encoding="utf-8")
print(f"[benchmark] manifest written: {out}", flush=True)
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Feather CE step TPS/profiling smoke benchmark")
p.add_argument("--steps", type=int, default=int(os.environ.get("BENCH_STEPS", "20")))
p.add_argument("--warmup", type=int, default=int(os.environ.get("BENCH_WARMUP_STEPS", "3")))
p.add_argument("--seq-len", type=int, default=int(os.environ.get("BENCH_SEQ_LEN", os.environ.get("HYDRA_SEQUENCE_LEN", "512"))))
p.add_argument("--engram-columns", type=int, default=int(os.environ.get("BENCH_ENGRAM_COLUMNS", "4096")))
p.add_argument("--engram-key-dim", type=int, default=int(os.environ.get("BENCH_ENGRAM_KEY_DIM", "64")))
p.add_argument("--engram-layer-idx", type=int, default=int(os.environ.get("BENCH_ENGRAM_LAYER_IDX", "1")))
p.add_argument("--vocab-size", type=int, default=int(os.environ.get("BENCH_VOCAB_SIZE", "0")), help="Synthetic vocab size for tokenizer-free smoke runs.")
p.add_argument("--manifest-out", default=os.environ.get("BENCH_MANIFEST_OUT"))
p.add_argument("--task-id", default=os.environ.get("HERMES_KANBAN_TASK", ""))
p.add_argument("--run-id", default=os.environ.get("FEATHER_RUN_ID", "local-benchmark-step"))
p.add_argument("--runtime-profile", default=os.environ.get("FEATHER_HF_RUNTIME_PROFILE", "local_synthetic_step"))
p.add_argument("--metric-role", choices=["tps", "profile"], default=("profile" if _truthy_env("HYDRA_PROFILE_FORWARD") else "tps"))
p.add_argument("--active-duplicate-jobs", type=int, default=None, help="Set after HF duplicate-active-job preflight; omitted locally.")
return p.parse_args()
def main():
args = parse_args()
torch.manual_seed(SEED)
device_str = "cuda" if torch.cuda.is_available() else "cpu"
device = torch.device(device_str)
if device_str == "cuda":
torch.cuda.manual_seed(SEED)
torch.set_float32_matmul_precision("high")
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
else:
_cpu_threads = int(os.environ.get("HYDRA_CPU_THREADS", str(min(os.cpu_count() or 4, 8))))
torch.set_num_threads(_cpu_threads)
print(f"[CPU] torch.set_num_threads={_cpu_threads}")
if args.vocab_size > 0:
vocab_size = args.vocab_size
print(f"[benchmark] using synthetic vocab_size={vocab_size}", flush=True)
else:
tokenizer = Tokenizer.from_directory()
vocab_size = tokenizer.get_vocab_size()
config = PostSemClawConfig(
sequence_len=args.seq_len,
vocab_size=vocab_size,
n_layer=N_LAYER,
d_model=D_MODEL,
d_state=D_STATE,
headdim=HEADDIM,
n_heads=N_HEADS,
expand=EXPAND,
engram_n_columns=args.engram_columns,
engram_key_dim=args.engram_key_dim,
engram_layer_idx=args.engram_layer_idx,
)
model = PostSemClawModel(config).to(device)
model.train()
tokens_per_fwdbwd = DEVICE_BATCH_SIZE * config.sequence_len
assert TOTAL_BATCH_SIZE % tokens_per_fwdbwd == 0
grad_accum_steps = TOTAL_BATCH_SIZE // tokens_per_fwdbwd
try:
optimizer = model.setup_optimizer(
unembedding_lr=0.005,
embedding_lr=1.0,
scalar_lr=0.5,
adam_betas=ADAM_BETAS,
matrix_lr=0.12,
weight_decay=0.01,
)
use_optimizer = True
except Exception as e:
print(f"[benchmark] optimizer setup skipped: {e}")
optimizer = None
use_optimizer = False
torch.manual_seed(SEED + 1)
x = torch.randint(0, vocab_size, (DEVICE_BATCH_SIZE, config.sequence_len), device=device)
y = torch.randint(0, vocab_size, (DEVICE_BATCH_SIZE, config.sequence_len), device=device)
autocast_ctx = torch.amp.autocast(
device_type=device_str, dtype=torch.bfloat16, enabled=(device_str == "cuda")
)
_warmup(model, x, y, autocast_ctx, n=args.warmup)
if device_str == "cuda":
torch.cuda.synchronize()
torch.cuda.reset_peak_memory_stats()
step_durations: list[float] = []
t0 = time.time()
last_loss = None
for _step in range(args.steps):
s0 = time.time()
for _ in range(grad_accum_steps):
with autocast_ctx:
loss = model(x, y)
last_loss = float(loss.detach().cpu())
loss.backward()
if use_optimizer:
optimizer.step()
model.zero_grad(set_to_none=True)
if device_str == "cuda":
torch.cuda.synchronize()
step_durations.append(time.time() - s0)
dt = time.time() - t0
tok_per_sec = int(args.steps * TOTAL_BATCH_SIZE / dt)
ms_per_step = dt * 1000 / args.steps
vram_mib = 0.0
if device_str == "cuda":
vram_mib = torch.cuda.max_memory_allocated() / 1024 / 1024
sorted_step_tps = sorted(TOTAL_BATCH_SIZE / d for d in step_durations if d > 0)
median_tps = sorted_step_tps[len(sorted_step_tps) // 2] if sorted_step_tps else tok_per_sec
p90_tps = sorted_step_tps[int((len(sorted_step_tps) - 1) * 0.9)] if sorted_step_tps else tok_per_sec
max_tps = max(sorted_step_tps) if sorted_step_tps else tok_per_sec
achieved_flops = 6.0 * sum(p.numel() for p in model.parameters()) * tok_per_sec
mfu = achieved_flops / (GPU_BF16_PEAK_FLOPS * 1e12) if GPU_BF16_PEAK_FLOPS else 0.0
print(
f"steps={args.steps} tok/s={tok_per_sec} ms/step={ms_per_step:.1f} "
f"total_batch={TOTAL_BATCH_SIZE} device_batch={DEVICE_BATCH_SIZE} "
f"accum={grad_accum_steps} seq_len={config.sequence_len} "
f"n_layer={N_LAYER} d_model={D_MODEL} device={device_str} "
f"vram_mib={vram_mib:.0f} mfu={mfu:.4f} loss={last_loss}",
flush=True,
)
duplicate_check = {"performed": False, "reason": "local_synthetic_benchmark"}
if args.active_duplicate_jobs is not None:
duplicate_check = {"performed": True, "active_matching_jobs": args.active_duplicate_jobs}
manifest = {
"task_id": args.task_id,
"run_id": args.run_id,
"git_sha": _git_sha(),
"metric_role": args.metric_role,
"hardware": {"flavor": device_str, "cuda_arch": torch.cuda.get_device_name(0) if device_str == "cuda" else "cpu"},
"runtime_profile": args.runtime_profile,
"no_paid_launch_without_gate": True,
"duplicate_active_job_check": duplicate_check,
"env": _env_echo(),
"model": {
"sequence_len": config.sequence_len,
"n_layer": N_LAYER,
"d_model": D_MODEL,
"engram_n_columns": args.engram_columns,
"engram_key_dim": args.engram_key_dim,
},
"receipts": {
"profile_forward": _truthy_env("HYDRA_PROFILE_FORWARD") or args.metric_role == "profile",
"htm_gpu_verified": _truthy_env("HYDRA_FORCE_HTM_CPU") is False and device_str == "cuda",
"training_tps_window": {"median": median_tps, "p90": p90_tps, "max": max_tps},
"flavor_verified": device_str,
},
"metrics": {
"steps": args.steps,
"tok_per_sec": tok_per_sec,
"ms_per_step": ms_per_step,
"vram_mib": vram_mib,
"mfu_estimate": mfu,
"last_loss": last_loss,
},
}
_write_manifest(args.manifest_out, manifest)
if __name__ == "__main__":
main()