cofiber-detection / analytical /scripts /analytical_exotic_gpu.py
phanerozoic's picture
update repository
dbbceb8
"""
Exotic analytical detection heads on GPU.
Track 1: Random projection pursuit — test N random projections, keep the best.
Track 2: Nonlinear feature expansion — quadratic cross-terms + random Fourier features.
"""
import json
import os
import sys
import time
import torch
import torch.nn.functional as F
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
COCO_ROOT = os.environ.get("ARENA_COCO_ROOT", "coco")
VAL_CACHE = os.environ.get("ARENA_VAL_CACHE", "val_cache/val.pt")
NUM_CLASSES = 80
DEVICE = "cuda"
def cofiber_decompose(f, n_scales):
cofibers = []
residual = f
for _ in range(n_scales - 1):
omega = F.avg_pool2d(residual, 2)
sigma_omega = F.interpolate(omega, size=residual.shape[2:], mode="bilinear", align_corners=False)
cofibers.append(residual - sigma_omega)
residual = omega
cofibers.append(residual)
return cofibers
def make_locations(sizes, strides):
locs = []
for (h, w), s in zip(sizes, strides):
ys = (torch.arange(h, dtype=torch.float32) + 0.5) * s
xs = (torch.arange(w, dtype=torch.float32) + 0.5) * s
gy, gx = torch.meshgrid(ys, xs, indexing="ij")
locs.append(torch.stack([gx.flatten(), gy.flatten()], -1))
return locs
def assign_targets(loc, boxes, labels, stride, sr):
n = loc.shape[0]
if boxes.numel() == 0:
return torch.full((n,), -1, dtype=torch.long)
areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
l = loc[:, None, 0] - boxes[None, :, 0]
t = loc[:, None, 1] - boxes[None, :, 1]
r = boxes[None, :, 2] - loc[:, None, 0]
b = boxes[None, :, 3] - loc[:, None, 1]
ltrb = torch.stack([l, t, r, b], -1)
in_box = ltrb.min(-1).values > 0
cx = (boxes[:, 0] + boxes[:, 2]) / 2
cy = (boxes[:, 1] + boxes[:, 3]) / 2
rad = stride * 1.5
in_center = ((loc[:, None, 0] >= cx - rad) & (loc[:, None, 0] <= cx + rad) &
(loc[:, None, 1] >= cy - rad) & (loc[:, None, 1] <= cy + rad))
max_d = ltrb.max(-1).values
in_level = (max_d >= sr[0]) & (max_d <= sr[1])
pos = in_box & in_center & in_level
a = areas[None, :].expand_as(pos).clone()
a[~pos] = float("inf")
matched = a.argmin(1)
is_pos = a.gather(1, matched[:, None]).squeeze(1) < float("inf")
ct = torch.full((n,), -1, dtype=torch.long)
ct[is_pos] = labels[matched[is_pos]]
return ct
def build_val_data(val_path, n_images=500):
val = torch.load(val_path, map_location="cpu", weights_only=False)
from pycocotools.coco import COCO
ann_file = os.path.join(COCO_ROOT, "annotations", "instances_val2017.json")
coco = COCO(ann_file)
cat_ids = sorted(coco.getCatIds())
cat_to_idx = {c: i for i, c in enumerate(cat_ids)}
strides = [16, 32, 64]
H = 640 // 16
sizes = [(H, H), (H // 2, H // 2), (H // 4, H // 4)]
sr = [(-1, 128), (128, 256), (256, float("inf"))]
locs = make_locations(sizes, strides)
all_f, all_cls = [], []
for idx in range(min(n_images, len(val))):
item = val[idx]
spatial = item["spatial"].unsqueeze(0).float()
img_id = item["img_id"]; scale = item["scale"]
ann_ids = coco.getAnnIds(imgIds=int(img_id), iscrowd=False)
anns = coco.loadAnns(ann_ids)
boxes, labels = [], []
for ann in anns:
x, y, w, h = ann["bbox"]
if w < 1 or h < 1: continue
boxes.append([x*scale, y*scale, (x+w)*scale, (y+h)*scale])
labels.append(cat_to_idx[ann["category_id"]])
boxes_t = torch.tensor(boxes, dtype=torch.float32) if boxes else torch.zeros(0, 4)
labels_t = torch.tensor(labels, dtype=torch.long) if labels else torch.zeros(0, dtype=torch.long)
cofibers = cofiber_decompose(spatial, 3)
for sci, cof in enumerate(cofibers):
B, C, Hc, Wc = cof.shape
f = F.layer_norm(cof.permute(0, 2, 3, 1).reshape(-1, C), [C])
ct = assign_targets(locs[sci], boxes_t, labels_t, strides[sci], sr[sci])
all_f.append(f); all_cls.append(ct)
return torch.cat(all_f).to(DEVICE), torch.cat(all_cls).to(DEVICE)
def solve_and_score(features_pos, y_cls, features_all, cls_targets, pos_mask, lam=0.1):
"""Solve least-squares and return classification accuracy."""
fd = features_pos.shape[1]
fa = torch.cat([features_pos, torch.ones(features_pos.shape[0], 1, device=DEVICE)], 1)
I = torch.eye(fd + 1, device=DEVICE)
n = features_pos.shape[0]
try:
W = torch.linalg.solve(fa.T @ fa + lam * I * n, fa.T @ y_cls)
except Exception:
return 0.0
scores = features_all @ W[:fd] + W[fd]
pred = scores.argmax(1)
correct = (pred[pos_mask] == cls_targets[pos_mask]).sum().item()
return correct / max(pos_mask.sum().item(), 1)
def main():
print("=" * 60)
print("Exotic Analytical Detection Heads (GPU)")
print("=" * 60, flush=True)
print("Building val data...", flush=True)
features, cls_targets = build_val_data(VAL_CACHE, 500)
pos = cls_targets >= 0
n_pos = pos.sum().item()
f_pos = features[pos]
y_cls = torch.zeros(n_pos, NUM_CLASSES, device=DEVICE)
y_cls[torch.arange(n_pos, device=DEVICE), cls_targets[pos]] = 1.0
print(f" {features.shape[0]} locations, {n_pos} positives", flush=True)
results = []
# =====================================================
# Baseline: full 768 dims
# =====================================================
t0 = time.time()
acc = solve_and_score(f_pos, y_cls, features, cls_targets, pos)
print(f"\nBaseline (768 dims): acc={acc:.4f} [{time.time()-t0:.2f}s]", flush=True)
results.append({"name": "baseline_768", "acc": acc, "dims": 768})
# =====================================================
# Track 1: Random Projection Pursuit
# =====================================================
print(f"\n--- Track 1: Random Projection Pursuit ---", flush=True)
for K in [10, 20, 50, 100, 200]:
N_PROJ = 500
best_acc = 0.0
best_seed = -1
t0 = time.time()
for seed in range(N_PROJ):
torch.manual_seed(seed)
proj = torch.randn(768, K, device=DEVICE) / (K ** 0.5)
f_proj = features @ proj
fp_proj = f_proj[pos]
acc = solve_and_score(fp_proj, y_cls, f_proj, cls_targets, pos)
if acc > best_acc:
best_acc = acc
best_seed = seed
elapsed = time.time() - t0
n_params = K * NUM_CLASSES + NUM_CLASSES
print(f" K={K:3d}: best_acc={best_acc:.4f} (seed={best_seed}, "
f"{n_params} params, {elapsed:.1f}s, {N_PROJ} projections)", flush=True)
results.append({"name": f"random_proj_K{K}", "acc": best_acc,
"dims": K, "params": n_params, "seed": best_seed})
# =====================================================
# Track 2a: Quadratic expansion on top greedy dims
# =====================================================
print(f"\n--- Track 2a: Quadratic Feature Expansion ---", flush=True)
# Load greedy dims
greedy_path = os.path.join(SCRIPT_DIR, "analytical_variants", "greedy_forward_gpu.json")
if os.path.isfile(greedy_path):
with open(greedy_path) as f:
greedy = json.load(f)
greedy_dims = greedy["selected_dims"]
else:
greedy_dims = list(range(20))
for K in [5, 10, 20, 30]:
t0 = time.time()
dims = greedy_dims[:K]
f_sub = features[:, dims] # (N, K)
# Quadratic: all pairwise products x_i * x_j (including x_i^2)
quad_features = []
for i in range(K):
for j in range(i, K):
quad_features.append(f_sub[:, i] * f_sub[:, j])
f_quad = torch.stack(quad_features, dim=1) # (N, K*(K+1)/2)
# Concatenate linear + quadratic
f_expanded = torch.cat([f_sub, f_quad], dim=1)
n_expanded = f_expanded.shape[1]
fp_exp = f_expanded[pos]
acc = solve_and_score(fp_exp, y_cls, f_expanded, cls_targets, pos)
n_params = n_expanded * NUM_CLASSES + NUM_CLASSES
elapsed = time.time() - t0
print(f" top-{K} + quadratic: {n_expanded} dims, acc={acc:.4f} "
f"({n_params} params, {elapsed:.2f}s)", flush=True)
results.append({"name": f"quadratic_top{K}", "acc": acc,
"dims": n_expanded, "params": n_params})
# =====================================================
# Track 2b: Random Fourier Features (RBF kernel approx)
# =====================================================
print(f"\n--- Track 2b: Random Fourier Features ---", flush=True)
for K_rff in [50, 100, 200, 500]:
t0 = time.time()
# sigma controls the kernel width — use median heuristic
# For speed, estimate from a subsample
sub = features[:5000]
dists = torch.cdist(sub[:500], sub[:500])
sigma = dists.median().item()
if sigma < 1e-6:
sigma = 1.0
torch.manual_seed(42)
W_rff = torch.randn(768, K_rff, device=DEVICE) / sigma
b_rff = torch.rand(K_rff, device=DEVICE) * 2 * 3.14159
# phi(x) = sqrt(2/K) * cos(Wx + b)
rff = (2.0 / K_rff) ** 0.5 * torch.cos(features @ W_rff + b_rff)
# Concatenate with raw features
f_combined = torch.cat([features, rff], dim=1)
fp_comb = f_combined[pos]
acc = solve_and_score(fp_comb, y_cls, f_combined, cls_targets, pos)
n_dims = f_combined.shape[1]
n_params = n_dims * NUM_CLASSES + NUM_CLASSES
elapsed = time.time() - t0
print(f" 768 + {K_rff} RFF: {n_dims} dims, acc={acc:.4f} "
f"({n_params} params, sigma={sigma:.2f}, {elapsed:.2f}s)", flush=True)
results.append({"name": f"rff_{K_rff}", "acc": acc,
"dims": n_dims, "params": n_params})
# =====================================================
# Track 2c: Pure RFF (no raw features)
# =====================================================
print(f"\n--- Track 2c: Pure Random Fourier Features (no raw) ---", flush=True)
for K_rff in [200, 500, 1000]:
t0 = time.time()
torch.manual_seed(42)
W_rff = torch.randn(768, K_rff, device=DEVICE) / sigma
b_rff = torch.rand(K_rff, device=DEVICE) * 2 * 3.14159
rff = (2.0 / K_rff) ** 0.5 * torch.cos(features @ W_rff + b_rff)
fp_rff = rff[pos]
acc = solve_and_score(fp_rff, y_cls, rff, cls_targets, pos)
n_params = K_rff * NUM_CLASSES + NUM_CLASSES
elapsed = time.time() - t0
print(f" {K_rff} pure RFF: acc={acc:.4f} ({n_params} params, {elapsed:.2f}s)", flush=True)
results.append({"name": f"pure_rff_{K_rff}", "acc": acc,
"dims": K_rff, "params": n_params})
# =====================================================
# Summary
# =====================================================
print(f"\n{'='*60}")
print("Ranked by accuracy:")
for r in sorted(results, key=lambda x: -x["acc"]):
print(f" {r['name']:25s}: acc={r['acc']:.4f} dims={r.get('dims', '?')} "
f"params={r.get('params', '?')}")
out = os.path.join(SCRIPT_DIR, "analytical_variants", "exotic_gpu.json")
with open(out, "w") as f:
json.dump(results, f, indent=2)
print(f"\nSaved: {out}")
if __name__ == "__main__":
main()