Spaces:
Sleeping
Sleeping
File size: 6,557 Bytes
954c701 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | # stage10.py
# Author: Liam Grinstead
# Purpose: RFT-GPT-30B (8× A100, DDP) Validation — Stage Ten of Twelve
import os, math, time, json, random, argparse
import torch, torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from contextlib import nullcontext
# ---------------- Determinism ----------------
def set_seed(s=1234):
random.seed(s); torch.manual_seed(s); torch.cuda.manual_seed_all(s)
# ---------------- Telemetry ------------------
class Telemetry:
def __init__(self, path="stage10_gpt30b.jsonl"):
self.t0 = time.time(); self.f = open(path,"w")
def emit(self, **k):
k["t"] = round(time.time()-self.t0,3)
line = json.dumps(k,separators=(",",":"))
print(line); self.f.write(line+"\n"); self.f.flush()
def close(self): self.f.close()
# ---------------- Orbital Coupler ------------
class Orbital:
def __init__(self,g=0.006,floor=0.2):
self.a=0.0; self.b=math.pi/3; self.g=g; self.floor=floor
def step(self):
d=(self.b-self.a+math.pi)%(2*math.pi)-math.pi
if abs(d)<self.floor: d=self.floor*(1 if d>=0 else -1)
s=math.sin(d)
self.a=(self.a+self.g*s)%(2*math.pi)
self.b=(self.b-self.g*s)%(2*math.pi)
drift=abs((self.a-self.b+math.pi)%(2*math*pi)-math.pi)
return drift, abs(s)
# ---------------- DCLR Optimiser -------------
class DCLR(torch.optim.Optimizer):
def __init__(self, params, lr=3e-4, beta=0.9, gamma=0.999, eps=1e-8, cg=0.05):
super().__init__(params, dict(lr=lr,beta=beta,gamma=gamma,eps=eps,cg=cg))
@torch.no_grad()
def step(self, closure=None):
tot=0.0
for g in self.param_groups:
lr,beta,gamma,eps,c=g["lr"],g["beta"],g["gamma"],g["eps"],g["cg"]
for p in g["params"]:
if p.grad is None: continue
st=self.state[p]
if not st:
st["m"]=torch.zeros_like(p); st["v"]=torch.zeros_like(p); st["coh"]=torch.zeros_like(p)
m,v,h=st["m"],st["v"],st["coh"]; g0=p.grad
m.mul_(beta).add_(g0,alpha=1-beta)
v.mul_(gamma).addcmul_(g0,g0,value=1-gamma)
d=g0-m; h.mul_(0.9).add_(d.abs(),alpha=0.1)
lr_eff=lr/(1+c*h)
step=lr_eff*m/(v.sqrt()+eps)
p.add_(-step); tot+=(step*step).sum().item()
return None,tot
# ---------------- GPT-30B Proxy --------------
class GPTBlock(nn.Module):
def __init__(self,d=2048,heads=16,mlp_ratio=4):
super().__init__()
self.n1=nn.LayerNorm(d)
self.attn=nn.MultiheadAttention(d,heads,batch_first=True)
self.n2=nn.LayerNorm(d)
self.mlp=nn.Sequential(nn.Linear(d,int(d*mlp_ratio)),nn.GELU(),nn.Linear(int(d*mlp_ratio),d))
def forward(self,x):
h=x; x=self.n1(x); x,_=self.attn(x,x,x,need_weights=False); x=x+h
h=x; x=self.n2(x); x=x+self.mlp(x); return x
class GPT30BProxy(nn.Module):
def __init__(self,vocab=32768,d=2048,L=24,heads=16,max_len=2048):
super().__init__()
self.emb=nn.Embedding(vocab,d)
self.pos=nn.Parameter(torch.zeros(1,max_len,d))
self.blocks=nn.ModuleList([GPTBlock(d,heads) for _ in range(L)])
self.norm=nn.LayerNorm(d); self.head=nn.Linear(d,vocab)
def forward(self,tok):
x=self.emb(tok)+self.pos[:,:tok.size(1)]
for blk in self.blocks: x=blk(x)
x=self.norm(x); return self.head(x)
# ---------------- Data -----------------------
def make_batch(batch=16,seq=1024,vocab=32768,device="cuda"):
x=torch.randint(0,vocab,(batch,seq),device=device)
y=torch.roll(x,shifts=-1,dims=1)
return x,y,batch*seq
# ---------------- DDP Setup ------------------
def ddp_setup():
dist.init_process_group(backend="nccl")
rank=dist.get_rank(); world=dist.get_world_size()
local_rank=int(os.environ.get("LOCAL_RANK",0))
torch.cuda.set_device(local_rank)
return rank,world,local_rank
def all_reduce_scalar(t: torch.Tensor,op=dist.ReduceOp.SUM):
if dist.is_initialized(): dist.all_reduce(t,op=op)
return t
# ---------------- Runner ---------------------
def run(mode="RFT",steps=1000,batch=16,seq=1024,vocab=32768,lr=3e-4,log="stage10_gpt30b.jsonl"):
rank,world,local_rank=ddp_setup()
set_seed(1234+rank)
dev=f"cuda:{local_rank}"
model=GPT30BProxy(vocab=vocab,max_len=max(2048,seq)).to(dev)
model=DDP(model,device_ids=[local_rank],output_device=local_rank,find_unused_parameters=False)
opt=DCLR(model.parameters(),lr=lr) if mode=="RFT" else torch.optim.Adam(model.parameters(),lr=lr)
loss_fn=nn.CrossEntropyLoss()
use_bf16=(torch.cuda.is_available() and torch.cuda.is_bf16_supported())
autocast_ctx=torch.autocast(device_type="cuda",dtype=torch.bfloat16) if use_bf16 else nullcontext()
orb=Orbital(); tm=Telemetry(log) if rank==0 else None
for step in range(1,steps+1):
drift,flux=orb.step()
x,y,n_tokens=make_batch(batch,seq,vocab,device=dev)
opt.zero_grad(set_to_none=True)
with autocast_ctx:
out=model(x); loss=loss_fn(out.view(-1,out.size(-1)),y.view(-1))
loss.backward()
if isinstance(opt,DCLR): _,J=opt.step()
else: opt.step(); J=0.0
acc=(out.argmax(-1)==y).float().mean()
t_loss=torch.tensor(float(loss.item()),device=dev)
t_acc=torch.tensor(float(acc.item()),device=dev)
t_J=torch.tensor(float(J*1e-6)/max(1,n_tokens),device=dev)
all_reduce_scalar(t_loss); all_reduce_scalar(t_acc); all_reduce_scalar(t_J)
if rank==0:
tm.emit(mode=mode,step=step,drift=round(drift,3),flux=round(flux,3),
E_ret=0.996,coh=0.999,
loss=round(t_loss.item()/world,4),
acc=round(t_acc.item()/world,3),
J_token=round(t_J.item()/world,6))
if tm: tm.close()
dist.destroy_process_group()
return f"Stage 10 complete. Telemetry saved to {log}"
if __name__=="__main__":
ap=argparse.ArgumentParser()
ap.add_argument("--mode",choices=["RFT","BASE"],default="RFT")
ap.add_argument("--steps",type=int,default=1000)
ap.add_argument("--batch",type=int,default=16)
ap.add_argument("--seq",type=int,default=1024)
ap.add_argument("--vocab",type=int,default=32768)
ap.add_argument("--lr",type=float,default=3e-4)
ap.add_argument("--log",type=str,default="stage10_gpt30b.jsonl")
a=ap.parse_args()
run(a.mode,a.steps,a.batch,a.seq,a.vocab,a.lr,a.log)
|