| """Push image-level person classifier to 99% precision + 95% recall. |
| Bigger model, focal loss, longer training, 5-fold CV.""" |
| import json, os, torch, torch.nn as nn, torch.nn.functional as F, time |
| from pycocotools.coco import COCO |
|
|
| COCO_ROOT = os.environ["ARENA_COCO_ROOT"] |
| VAL_CACHE = os.environ["ARENA_VAL_CACHE"] |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
| with open(os.path.join(SCRIPT_DIR, "..", "circuit", "evolved_extreme.json")) as f: |
| evolved = json.load(f) |
| dims = sorted(list(set([r for r in evolved if r["K"] == 100][0]["genome"]))) |
| N = len(dims) |
|
|
| val = torch.load(VAL_CACHE, map_location="cpu", weights_only=False) |
| coco = COCO(os.path.join(COCO_ROOT, "annotations", "instances_val2017.json")) |
|
|
| def cofiber_decompose(f, n_scales): |
| cofibers = []; residual = f |
| for _ in range(n_scales - 1): |
| omega = F.avg_pool2d(residual, 2) |
| sigma_omega = F.interpolate(omega, size=residual.shape[2:], mode="bilinear", align_corners=False) |
| cofibers.append(residual - sigma_omega); residual = omega |
| cofibers.append(residual); return cofibers |
|
|
| print("Pre-computing image vectors (92 evolved dims, max-pool)...", flush=True) |
| all_vecs = [] |
| all_labels = [] |
| for idx in range(len(val)): |
| item = val[idx] |
| spatial = item["spatial"].unsqueeze(0).float() |
| cofibers = cofiber_decompose(spatial, 3) |
| feats = [] |
| for cof in cofibers: |
| B, C, Hc, Wc = cof.shape |
| f = F.layer_norm(cof.permute(0,2,3,1).reshape(-1,C), [C]) |
| feats.append(f) |
| |
| all_f = torch.cat(feats)[:, dims] |
| vec = torch.cat([all_f.max(dim=0).values, all_f.mean(dim=0)]) |
| all_vecs.append(vec) |
| hp = len(coco.getAnnIds(imgIds=int(item["img_id"]), catIds=[1], iscrowd=False)) > 0 |
| all_labels.append(1.0 if hp else 0.0) |
| if (idx+1) % 1000 == 0: |
| print(f" {idx+1}/{len(val)}", flush=True) |
|
|
| X = torch.stack(all_vecs).cuda() |
| Y = torch.tensor(all_labels).cuda() |
| feat_dim = X.shape[1] |
| print(f" {len(Y)} images, {int(Y.sum())} person, {feat_dim} feature dims\n") |
|
|
| def focal_bce(logits, targets, alpha=0.25, gamma=2.0): |
| p = logits.sigmoid() |
| ce = F.binary_cross_entropy_with_logits(logits, targets, reduction="none") |
| pt = p * targets + (1-p) * (1-targets) |
| at = alpha * targets + (1-alpha) * (1-targets) |
| return (at * (1-pt)**gamma * ce).mean() |
|
|
| |
| architectures = [ |
| ("184->128->64->1", lambda d: nn.Sequential( |
| nn.Linear(d,128), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(128,64), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(64,1))), |
| ("184->256->128->64->1", lambda d: nn.Sequential( |
| nn.Linear(d,256), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(256,128), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(128,64), nn.GELU(), |
| nn.Linear(64,1))), |
| ("184->512->256->128->1", lambda d: nn.Sequential( |
| nn.Linear(d,512), nn.GELU(), nn.Dropout(0.15), |
| nn.Linear(512,256), nn.GELU(), nn.Dropout(0.15), |
| nn.Linear(256,128), nn.GELU(), |
| nn.Linear(128,1))), |
| ] |
|
|
| for name, make_model in architectures: |
| fold_size = 1000 |
| all_scores = [] |
| all_gt = [] |
|
|
| for fold in range(5): |
| test_mask = torch.zeros(len(val), dtype=torch.bool, device="cuda") |
| test_mask[fold*fold_size:(fold+1)*fold_size] = True |
| train_mask = ~test_mask |
|
|
| model = make_model(feat_dim).cuda() |
| n_params = sum(p.numel() for p in model.parameters()) |
| opt = torch.optim.AdamW(model.parameters(), lr=5e-4, weight_decay=1e-3) |
| scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=500) |
|
|
| model.train() |
| for epoch in range(500): |
| idx = torch.randperm(train_mask.sum().item(), device="cuda") |
| train_x = X[train_mask][idx] |
| train_y = Y[train_mask][idx] |
| for start in range(0, len(train_x), 128): |
| batch_x = train_x[start:start+128] |
| batch_y = train_y[start:start+128] |
| logits = model(batch_x).squeeze() |
| loss = focal_bce(logits, batch_y) |
| opt.zero_grad(); loss.backward(); opt.step() |
| scheduler.step() |
|
|
| model.eval() |
| with torch.no_grad(): |
| scores = model(X[test_mask]).squeeze().sigmoid() |
| all_scores.append(scores.cpu()) |
| all_gt.append(Y[test_mask].cpu()) |
|
|
| all_scores = torch.cat(all_scores) |
| all_gt = torch.cat(all_gt).bool() |
|
|
| print(f"\n{name} ({n_params} params):") |
| print(f" {'Thresh':>6s} {'TP':>5s} {'FP':>5s} {'FN':>5s} {'TN':>5s} {'Prec':>6s} {'Rec':>6s} {'F1':>6s}") |
| for t in [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95, 0.99]: |
| pred = all_scores > t |
| tp = (pred & all_gt).sum().item() |
| fp = (pred & ~all_gt).sum().item() |
| fn = (~pred & all_gt).sum().item() |
| tn = (~pred & ~all_gt).sum().item() |
| prec = tp / max(tp+fp, 1); rec = tp / max(tp+fn, 1) |
| f1 = 2*prec*rec / max(prec+rec, 1e-9) |
| marker = " ***" if prec >= 0.99 and rec >= 0.90 else (" <<" if prec >= 0.99 else "") |
| print(f" {t:6.2f} {tp:5d} {fp:5d} {fn:5d} {tn:5d} {prec:6.3f} {rec:6.3f} {f1:6.3f}{marker}") |
|
|