#!/usr/bin/env python3 """Standalone GPU HTM micro-canary for HYDRA/Feather. This intentionally bypasses the full language-model forward path and exercises only the HTMLayer CUDA path that failed in the H200 optimal-strict canary. It prints JSON lines so HF job logs can be parsed mechanically. """ from __future__ import annotations import argparse import json import os import sys import time import traceback from pathlib import Path from typing import Any import torch def ensure_repo_on_path() -> None: """Make overlay package imports work from both /app/scripts and repo-root runs.""" candidates = [ Path('/workspace/feather'), Path(__file__).resolve().parents[1] if len(Path(__file__).resolve().parents) > 1 else None, ] for candidate in candidates: if candidate and (candidate / 'subsystems' / 'htm.py').exists(): candidate_s = str(candidate) if candidate_s not in sys.path: sys.path.insert(0, candidate_s) return def build_htm_env(mode: str) -> dict[str, str]: """Return env overrides for the requested HTM diagnostic mode.""" if mode not in {"batched-fused", "fused", "cuda"}: raise ValueError(f"unknown mode: {mode}") return { "HYDRA_FORCE_HTM_CPU": "0", "HYDRA_HTM_FUSED": "1" if mode in {"batched-fused", "fused"} else "0", "HYDRA_HTM_BATCHED_FUSED": "1" if mode == "batched-fused" else "0", # Strict only for batched-fused: the goal is to catch missing batched # entrypoints loudly. The other modes are deliberate diagnostic bisection # modes and should be allowed to exercise narrower paths. "HYDRA_STRICT_OPTIMAL_COMPONENTS": "1" if mode == "batched-fused" else "0", } def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--mode", choices=["batched-fused", "fused", "cuda"], default="batched-fused") parser.add_argument("--batch", type=int, default=int(os.environ.get("HYDRA_BATCH_SIZE", "4"))) parser.add_argument("--seq", type=int, default=int(os.environ.get("HYDRA_HTM_MICRO_SEQ", os.environ.get("HYDRA_MAX_SEQ_LEN", "512")))) parser.add_argument("--input-bits", type=int, default=int(os.environ.get("HYDRA_HTM_INPUT_BITS", "16384"))) parser.add_argument("--n-columns", type=int, default=int(os.environ.get("HYDRA_HTM_COLUMNS", "2048"))) parser.add_argument("--cells-per-column", type=int, default=int(os.environ.get("HYDRA_HTM_CELLS_PER_COLUMN", "32"))) parser.add_argument("--active-bits", type=int, default=int(os.environ.get("HYDRA_HTM_ACTIVE_BITS", "256"))) parser.add_argument("--seed", type=int, default=1234) parser.add_argument("--learn", action="store_true") parser.add_argument("--sync-each", action="store_true", help="use HTMLayer.forward instead of forward_async/forward_await") parser.add_argument("--dry-run", action="store_true") return parser.parse_args(argv) def emit(event: str, **payload: Any) -> None: print(json.dumps({"event": event, **payload}, sort_keys=True), flush=True) def make_sparse_sdr(*, batch: int, seq: int, input_bits: int, active_bits: int, device: str, seed: int): import torch if active_bits <= 0 or active_bits > input_bits: raise ValueError("active_bits must be in [1, input_bits]") gen = torch.Generator(device="cpu") gen.manual_seed(seed) sdr = torch.zeros((batch, seq, input_bits), dtype=torch.uint8, device="cpu") for b in range(batch): for t in range(seq): idx = torch.randperm(input_bits, generator=gen)[:active_bits] sdr[b, t, idx] = 1 return sdr.to(device, non_blocking=False) def _plan_payload(args: argparse.Namespace, env: dict[str, str]) -> dict[str, Any]: return { "mode": args.mode, "shape": {"batch": args.batch, "seq": args.seq, "input_bits": args.input_bits}, "htm": {"n_columns": args.n_columns, "cells_per_column": args.cells_per_column, "active_bits": args.active_bits}, "learn": bool(args.learn), "sync_each": bool(args.sync_each), "env": env, } def main(argv: list[str] | None = None) -> int: args = parse_args(argv) env = build_htm_env(args.mode) os.environ.update(env) emit("plan", **_plan_payload(args, env)) if args.dry_run: return 0 import torch ensure_repo_on_path() from subsystems.htm import HTMLayer emit( "cuda_state", torch_cuda_available=torch.cuda.is_available(), device_count=torch.cuda.device_count() if torch.cuda.is_available() else 0, device_name=torch.cuda.get_device_name(0) if torch.cuda.is_available() else None, ) if not torch.cuda.is_available(): raise RuntimeError("CUDA is required for HTM GPU micro-canary") device = "cuda" sdr = make_sparse_sdr( batch=args.batch, seq=args.seq, input_bits=args.input_bits, active_bits=args.active_bits, device=device, seed=args.seed, ) emit("sdr_ready", dtype=str(sdr.dtype), shape=list(sdr.shape), active_total=int(sdr.sum().item())) layer = HTMLayer( input_bits=args.input_bits, n_columns=args.n_columns, cells_per_column=args.cells_per_column, batch_size=args.batch, seed=args.seed, learn=args.learn, use_gpu=True, reset_each_forward=True, ).to(device) if args.learn: layer.train() else: layer.eval() emit("layer_ready", use_gpu=bool(getattr(layer, "_use_gpu", False)), region_count=len(getattr(layer, "_regions", []))) start = time.perf_counter() if args.sync_each: out = layer(sdr) else: handle = layer.forward_async(sdr) emit("forward_submitted", handle_keys=sorted(handle.keys())) out = layer.forward_await(handle) torch.cuda.synchronize() elapsed_ms = (time.perf_counter() - start) * 1000.0 emit("success", elapsed_ms=round(elapsed_ms, 3), output_shape=list(out.shape), output_dtype=str(out.dtype)) return 0 if __name__ == "__main__": raise SystemExit(main())