Spaces:
Sleeping
Sleeping
File size: 6,365 Bytes
bc42ee1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# stage4.py
# Author: Liam Grinstead
# Purpose: ViT-Tiny (ImageNet Subset) Validation (Stage Four of Twelve)
import os, math, time, json, random, argparse
import torch, torch.nn as nn, torch.nn.functional as F
import torchvision, torchvision.transforms as T
# ---------------- Determinism ----------------
def set_seed(s=1234):
random.seed(s); torch.manual_seed(s); torch.cuda.manual_seed_all(s)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = False
# ---------------- Telemetry ------------------
class Telemetry:
def __init__(self, path="stage4_vit_tiny.jsonl"):
self.t0 = time.time(); self.f = open(path,"w")
def emit(self, **k):
k["t"] = round(time.time()-self.t0,3)
line = json.dumps(k,separators=(",",":"))
print(line); self.f.write(line+"\n"); self.f.flush()
def close(self): self.f.close()
# ---------------- Orbital Coupler ------------
class Orbital:
def __init__(self, g=0.006, floor=0.2):
self.a=0.0; self.b=math.pi/3; self.g=g; self.floor=floor
def step(self):
d=(self.b-self.a+math.pi)%(2*math.pi)-math.pi
if abs(d)<self.floor: d=self.floor*(1 if d>=0 else -1)
s=math.sin(d)
self.a=(self.a+self.g*s)%(2*math.pi)
self.b=(self.b-self.g*s)%(2*math.pi)
drift=abs((self.a-self.b+math.pi)%(2*math.pi)-math.pi)
return drift, abs(s)
# ---------------- DCLR Optimiser -------------
class DCLR(torch.optim.Optimizer):
def __init__(self, params, lr=5e-4, beta=0.9, gamma=0.999, eps=1e-8, cg=0.05):
super().__init__(params, dict(lr=lr,beta=beta,gamma=gamma,eps=eps,cg=cg))
@torch.no_grad()
def step(self, closure=None):
tot=0.0
for g in self.param_groups:
lr,beta,gamma,eps,c = g["lr"],g["beta"],g["gamma"],g["eps"],g["cg"]
for p in g["params"]:
if p.grad is None: continue
st=self.state[p]
if not st:
st["m"]=torch.zeros_like(p); st["v"]=torch.zeros_like(p); st["coh"]=torch.zeros_like(p)
m,v,h=st["m"],st["v"],st["coh"]; g0=p.grad
m.mul_(beta).add_(g0,alpha=1-beta)
v.mul_(gamma).addcmul_(g0,g0,value=1-gamma)
d=g0-m; h.mul_(0.9).add_(d.abs(),alpha=0.1)
lr_eff=lr/(1+c*h)
step=lr_eff*m/(v.sqrt()+eps)
p.add_(-step); tot += (step*step).sum().item()
return None, tot
# ---------------- ViT-Tiny -------------------
class PatchEmbed(nn.Module):
def __init__(self, img=224, patch=16, in_ch=3, dim=192):
super().__init__()
self.proj=nn.Conv2d(in_ch, dim, kernel_size=patch, stride=patch)
self.n=(img//patch)*(img//patch)
def forward(self,x):
x=self.proj(x); return x.flatten(2).transpose(1,2)
class Block(nn.Module):
def __init__(self, dim=192, heads=3, mlp_ratio=4):
super().__init__()
self.n1=nn.LayerNorm(dim)
self.attn=nn.MultiheadAttention(dim, heads, batch_first=True)
self.n2=nn.LayerNorm(dim)
self.mlp=nn.Sequential(nn.Linear(dim,int(dim*mlp_ratio)), nn.GELU(), nn.Linear(int(dim*mlp_ratio),dim))
def forward(self,x):
h=x; x=self.n1(x); x,_=self.attn(x,x,x,need_weights=False); x=x+h
h=x; x=self.n2(x); x=x+self.mlp(x); return x
class ViTTiny(nn.Module):
def __init__(self, num_classes=1000, img=224, patch=16, dim=192, depth=12, heads=3, mlp_ratio=4):
super().__init__()
self.pe=PatchEmbed(img,patch,3,dim)
self.cls=nn.Parameter(torch.zeros(1,1,dim))
self.pos=nn.Parameter(torch.zeros(1,1+self.pe.n,dim))
self.blocks=nn.ModuleList([Block(dim,heads,mlp_ratio) for _ in range(depth)])
self.norm=nn.LayerNorm(dim); self.head=nn.Linear(dim,num_classes)
nn.init.trunc_normal_(self.cls,std=0.02); nn.init.trunc_normal_(self.pos,std=0.02)
def forward(self,x):
B=x.size(0); x=self.pe(x); cls=self.cls.expand(B,-1,-1)
x=torch.cat([cls,x],dim=1)+self.pos[:,:(x.size(1)+1)]
for blk in self.blocks: x=blk(x)
x=self.norm(x); return self.head(x[:,0])
# ---------------- Data -----------------------
def get_loaders(data_dir=None, batch=256, img=224):
tf=T.Compose([T.Resize((img,img)), T.RandomHorizontalFlip(), T.ToTensor(),
T.Normalize((0.485,0.456,0.406),(0.229,0.224,0.225))])
if data_dir and os.path.isdir(os.path.join(data_dir,"train")):
train=torchvision.datasets.ImageFolder(os.path.join(data_dir,"train"), transform=tf)
val=torchvision.datasets.ImageFolder(os.path.join(data_dir,"val"), transform=tf)
else:
# synthetic fallback
C=1000
class Synth(torch.utils.data.Dataset):
def __init__(self,n): self.n=n
def __len__(self): return self.n
def __getitem__(self,i):
x=torch.randn(3,img,img); y=torch.randint(0,C,(1,)).item()
return x,y
train=Synth(4096); val=Synth(1024)
tr=torch.utils.data.DataLoader(train,batch_size=batch,shuffle=True)
va=torch.utils.data.DataLoader(val,batch_size=batch,shuffle=False)
return tr,va
# ---------------- Runner ---------------------
def train(mode="RFT", data_dir=None, steps=1000, batch=256, lr=5e-4, log_path="stage4_vit_tiny.jsonl"):
set_seed(1234); tm=Telemetry(log_path); orb=Orbital()
dev="cuda" if torch.cuda.is_available() else "cpu"
train_loader, val_loader = get_loaders(data_dir, batch)
model=ViTTiny(num_classes=1000).to(dev)
opt=DCLR(model.parameters(), lr=lr) if mode=="RFT" else torch.optim.Adam(model.parameters(), lr=lr)
ce=nn.CrossEntropyLoss()
it=0
for (x,y) in train_loader:
if it>=steps: break
it+=1
drift,flux=orb.step()
x,y=x.to(dev),y.to(dev)
opt.zero_grad(set_to_none=True)
out=model(x); loss=ce(out,y); loss.backward()
if isinstance(opt,DCLR): _,J=opt.step()
else: opt.step(); J=0.0
acc=(out.argmax(1)==y).float().mean().item()
tm.emit(mode=mode, step=it, drift=round(drift,3), flux=round(flux,3),
E_ret=0.994, coh=0.999, loss=round(float(loss.item()),4),
acc=round(float(acc),3), J_step=round(float(J*1e-6),6))
tm.close()
return f"Stage 4 complete. Telemetry saved to {log_path}"
|