| """ |
| Exotic analytical detection heads on GPU. |
| |
| Track 1: Random projection pursuit — test N random projections, keep the best. |
| Track 2: Nonlinear feature expansion — quadratic cross-terms + random Fourier features. |
| """ |
|
|
| import json |
| import os |
| import sys |
| import time |
|
|
| import torch |
| import torch.nn.functional as F |
|
|
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) |
| sys.path.insert(0, SCRIPT_DIR) |
|
|
| COCO_ROOT = os.environ.get("ARENA_COCO_ROOT", "coco") |
| VAL_CACHE = os.environ.get("ARENA_VAL_CACHE", "val_cache/val.pt") |
| NUM_CLASSES = 80 |
| DEVICE = "cuda" |
|
|
|
|
| def cofiber_decompose(f, n_scales): |
| cofibers = [] |
| residual = f |
| for _ in range(n_scales - 1): |
| omega = F.avg_pool2d(residual, 2) |
| sigma_omega = F.interpolate(omega, size=residual.shape[2:], mode="bilinear", align_corners=False) |
| cofibers.append(residual - sigma_omega) |
| residual = omega |
| cofibers.append(residual) |
| return cofibers |
|
|
|
|
| def make_locations(sizes, strides): |
| locs = [] |
| for (h, w), s in zip(sizes, strides): |
| ys = (torch.arange(h, dtype=torch.float32) + 0.5) * s |
| xs = (torch.arange(w, dtype=torch.float32) + 0.5) * s |
| gy, gx = torch.meshgrid(ys, xs, indexing="ij") |
| locs.append(torch.stack([gx.flatten(), gy.flatten()], -1)) |
| return locs |
|
|
|
|
| def assign_targets(loc, boxes, labels, stride, sr): |
| n = loc.shape[0] |
| if boxes.numel() == 0: |
| return torch.full((n,), -1, dtype=torch.long) |
| areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) |
| l = loc[:, None, 0] - boxes[None, :, 0] |
| t = loc[:, None, 1] - boxes[None, :, 1] |
| r = boxes[None, :, 2] - loc[:, None, 0] |
| b = boxes[None, :, 3] - loc[:, None, 1] |
| ltrb = torch.stack([l, t, r, b], -1) |
| in_box = ltrb.min(-1).values > 0 |
| cx = (boxes[:, 0] + boxes[:, 2]) / 2 |
| cy = (boxes[:, 1] + boxes[:, 3]) / 2 |
| rad = stride * 1.5 |
| in_center = ((loc[:, None, 0] >= cx - rad) & (loc[:, None, 0] <= cx + rad) & |
| (loc[:, None, 1] >= cy - rad) & (loc[:, None, 1] <= cy + rad)) |
| max_d = ltrb.max(-1).values |
| in_level = (max_d >= sr[0]) & (max_d <= sr[1]) |
| pos = in_box & in_center & in_level |
| a = areas[None, :].expand_as(pos).clone() |
| a[~pos] = float("inf") |
| matched = a.argmin(1) |
| is_pos = a.gather(1, matched[:, None]).squeeze(1) < float("inf") |
| ct = torch.full((n,), -1, dtype=torch.long) |
| ct[is_pos] = labels[matched[is_pos]] |
| return ct |
|
|
|
|
| def build_val_data(val_path, n_images=500): |
| val = torch.load(val_path, map_location="cpu", weights_only=False) |
| from pycocotools.coco import COCO |
| ann_file = os.path.join(COCO_ROOT, "annotations", "instances_val2017.json") |
| coco = COCO(ann_file) |
| cat_ids = sorted(coco.getCatIds()) |
| cat_to_idx = {c: i for i, c in enumerate(cat_ids)} |
| strides = [16, 32, 64] |
| H = 640 // 16 |
| sizes = [(H, H), (H // 2, H // 2), (H // 4, H // 4)] |
| sr = [(-1, 128), (128, 256), (256, float("inf"))] |
| locs = make_locations(sizes, strides) |
| all_f, all_cls = [], [] |
| for idx in range(min(n_images, len(val))): |
| item = val[idx] |
| spatial = item["spatial"].unsqueeze(0).float() |
| img_id = item["img_id"]; scale = item["scale"] |
| ann_ids = coco.getAnnIds(imgIds=int(img_id), iscrowd=False) |
| anns = coco.loadAnns(ann_ids) |
| boxes, labels = [], [] |
| for ann in anns: |
| x, y, w, h = ann["bbox"] |
| if w < 1 or h < 1: continue |
| boxes.append([x*scale, y*scale, (x+w)*scale, (y+h)*scale]) |
| labels.append(cat_to_idx[ann["category_id"]]) |
| boxes_t = torch.tensor(boxes, dtype=torch.float32) if boxes else torch.zeros(0, 4) |
| labels_t = torch.tensor(labels, dtype=torch.long) if labels else torch.zeros(0, dtype=torch.long) |
| cofibers = cofiber_decompose(spatial, 3) |
| for sci, cof in enumerate(cofibers): |
| B, C, Hc, Wc = cof.shape |
| f = F.layer_norm(cof.permute(0, 2, 3, 1).reshape(-1, C), [C]) |
| ct = assign_targets(locs[sci], boxes_t, labels_t, strides[sci], sr[sci]) |
| all_f.append(f); all_cls.append(ct) |
| return torch.cat(all_f).to(DEVICE), torch.cat(all_cls).to(DEVICE) |
|
|
|
|
| def solve_and_score(features_pos, y_cls, features_all, cls_targets, pos_mask, lam=0.1): |
| """Solve least-squares and return classification accuracy.""" |
| fd = features_pos.shape[1] |
| fa = torch.cat([features_pos, torch.ones(features_pos.shape[0], 1, device=DEVICE)], 1) |
| I = torch.eye(fd + 1, device=DEVICE) |
| n = features_pos.shape[0] |
| try: |
| W = torch.linalg.solve(fa.T @ fa + lam * I * n, fa.T @ y_cls) |
| except Exception: |
| return 0.0 |
| scores = features_all @ W[:fd] + W[fd] |
| pred = scores.argmax(1) |
| correct = (pred[pos_mask] == cls_targets[pos_mask]).sum().item() |
| return correct / max(pos_mask.sum().item(), 1) |
|
|
|
|
| def main(): |
| print("=" * 60) |
| print("Exotic Analytical Detection Heads (GPU)") |
| print("=" * 60, flush=True) |
|
|
| print("Building val data...", flush=True) |
| features, cls_targets = build_val_data(VAL_CACHE, 500) |
| pos = cls_targets >= 0 |
| n_pos = pos.sum().item() |
| f_pos = features[pos] |
| y_cls = torch.zeros(n_pos, NUM_CLASSES, device=DEVICE) |
| y_cls[torch.arange(n_pos, device=DEVICE), cls_targets[pos]] = 1.0 |
| print(f" {features.shape[0]} locations, {n_pos} positives", flush=True) |
|
|
| results = [] |
|
|
| |
| |
| |
| t0 = time.time() |
| acc = solve_and_score(f_pos, y_cls, features, cls_targets, pos) |
| print(f"\nBaseline (768 dims): acc={acc:.4f} [{time.time()-t0:.2f}s]", flush=True) |
| results.append({"name": "baseline_768", "acc": acc, "dims": 768}) |
|
|
| |
| |
| |
| print(f"\n--- Track 1: Random Projection Pursuit ---", flush=True) |
| for K in [10, 20, 50, 100, 200]: |
| N_PROJ = 500 |
| best_acc = 0.0 |
| best_seed = -1 |
| t0 = time.time() |
| for seed in range(N_PROJ): |
| torch.manual_seed(seed) |
| proj = torch.randn(768, K, device=DEVICE) / (K ** 0.5) |
| f_proj = features @ proj |
| fp_proj = f_proj[pos] |
| acc = solve_and_score(fp_proj, y_cls, f_proj, cls_targets, pos) |
| if acc > best_acc: |
| best_acc = acc |
| best_seed = seed |
| elapsed = time.time() - t0 |
| n_params = K * NUM_CLASSES + NUM_CLASSES |
| print(f" K={K:3d}: best_acc={best_acc:.4f} (seed={best_seed}, " |
| f"{n_params} params, {elapsed:.1f}s, {N_PROJ} projections)", flush=True) |
| results.append({"name": f"random_proj_K{K}", "acc": best_acc, |
| "dims": K, "params": n_params, "seed": best_seed}) |
|
|
| |
| |
| |
| print(f"\n--- Track 2a: Quadratic Feature Expansion ---", flush=True) |
| |
| greedy_path = os.path.join(SCRIPT_DIR, "analytical_variants", "greedy_forward_gpu.json") |
| if os.path.isfile(greedy_path): |
| with open(greedy_path) as f: |
| greedy = json.load(f) |
| greedy_dims = greedy["selected_dims"] |
| else: |
| greedy_dims = list(range(20)) |
|
|
| for K in [5, 10, 20, 30]: |
| t0 = time.time() |
| dims = greedy_dims[:K] |
| f_sub = features[:, dims] |
| |
| quad_features = [] |
| for i in range(K): |
| for j in range(i, K): |
| quad_features.append(f_sub[:, i] * f_sub[:, j]) |
| f_quad = torch.stack(quad_features, dim=1) |
| |
| f_expanded = torch.cat([f_sub, f_quad], dim=1) |
| n_expanded = f_expanded.shape[1] |
| fp_exp = f_expanded[pos] |
| acc = solve_and_score(fp_exp, y_cls, f_expanded, cls_targets, pos) |
| n_params = n_expanded * NUM_CLASSES + NUM_CLASSES |
| elapsed = time.time() - t0 |
| print(f" top-{K} + quadratic: {n_expanded} dims, acc={acc:.4f} " |
| f"({n_params} params, {elapsed:.2f}s)", flush=True) |
| results.append({"name": f"quadratic_top{K}", "acc": acc, |
| "dims": n_expanded, "params": n_params}) |
|
|
| |
| |
| |
| print(f"\n--- Track 2b: Random Fourier Features ---", flush=True) |
| for K_rff in [50, 100, 200, 500]: |
| t0 = time.time() |
| |
| |
| sub = features[:5000] |
| dists = torch.cdist(sub[:500], sub[:500]) |
| sigma = dists.median().item() |
| if sigma < 1e-6: |
| sigma = 1.0 |
|
|
| torch.manual_seed(42) |
| W_rff = torch.randn(768, K_rff, device=DEVICE) / sigma |
| b_rff = torch.rand(K_rff, device=DEVICE) * 2 * 3.14159 |
|
|
| |
| rff = (2.0 / K_rff) ** 0.5 * torch.cos(features @ W_rff + b_rff) |
|
|
| |
| f_combined = torch.cat([features, rff], dim=1) |
| fp_comb = f_combined[pos] |
| acc = solve_and_score(fp_comb, y_cls, f_combined, cls_targets, pos) |
| n_dims = f_combined.shape[1] |
| n_params = n_dims * NUM_CLASSES + NUM_CLASSES |
| elapsed = time.time() - t0 |
| print(f" 768 + {K_rff} RFF: {n_dims} dims, acc={acc:.4f} " |
| f"({n_params} params, sigma={sigma:.2f}, {elapsed:.2f}s)", flush=True) |
| results.append({"name": f"rff_{K_rff}", "acc": acc, |
| "dims": n_dims, "params": n_params}) |
|
|
| |
| |
| |
| print(f"\n--- Track 2c: Pure Random Fourier Features (no raw) ---", flush=True) |
| for K_rff in [200, 500, 1000]: |
| t0 = time.time() |
| torch.manual_seed(42) |
| W_rff = torch.randn(768, K_rff, device=DEVICE) / sigma |
| b_rff = torch.rand(K_rff, device=DEVICE) * 2 * 3.14159 |
| rff = (2.0 / K_rff) ** 0.5 * torch.cos(features @ W_rff + b_rff) |
| fp_rff = rff[pos] |
| acc = solve_and_score(fp_rff, y_cls, rff, cls_targets, pos) |
| n_params = K_rff * NUM_CLASSES + NUM_CLASSES |
| elapsed = time.time() - t0 |
| print(f" {K_rff} pure RFF: acc={acc:.4f} ({n_params} params, {elapsed:.2f}s)", flush=True) |
| results.append({"name": f"pure_rff_{K_rff}", "acc": acc, |
| "dims": K_rff, "params": n_params}) |
|
|
| |
| |
| |
| print(f"\n{'='*60}") |
| print("Ranked by accuracy:") |
| for r in sorted(results, key=lambda x: -x["acc"]): |
| print(f" {r['name']:25s}: acc={r['acc']:.4f} dims={r.get('dims', '?')} " |
| f"params={r.get('params', '?')}") |
|
|
| out = os.path.join(SCRIPT_DIR, "analytical_variants", "exotic_gpu.json") |
| with open(out, "w") as f: |
| json.dump(results, f, indent=2) |
| print(f"\nSaved: {out}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|