""" server.py — MicroJulia OpenAI-compatible inference server Serves POST /v1/chat/completions (streaming + non-streaming) and GET /v1/models. Loads the pure-Julia MicroGPT char-level model from best_model.json on HF Hub. Architecture: Pre-norm GPT with RMSNorm (no learnable params), ReLU MLP, separate Q/K/V attention. 5K params, 28-char vocab, val_loss=2.34. Follows the RandyGPT FastAPI/uvicorn pattern for proven HF Spaces compatibility. """ import json import math import time import uuid import os import torch import torch.nn as nn import torch.nn.functional as F from pathlib import Path from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.exceptions import RequestValidationError from pydantic import BaseModel from typing import List, Optional from huggingface_hub import hf_hub_download # ── Model definition ────────────────────────────────────────────────────────── # Matches the Julia AutoGrad training code exactly: # Pre-norm blocks with RMSNorm (no learnable weights), ReLU MLP, # separate Q/K/V attention, no final norm before lm_head. def rms_norm(x, eps=1e-5): """RMSNorm without learnable parameters (matches Julia rmsnorm_ag).""" return x * torch.rsqrt(x.pow(2).mean(dim=-1, keepdim=True) + eps) class Attn(nn.Module): def __init__(self, n_embd, n_head): super().__init__() self.n_head = n_head self.head_dim = n_embd // n_head self.scale = 1.0 / math.sqrt(self.head_dim) self.wq = nn.Linear(n_embd, n_embd, bias=False) self.wk = nn.Linear(n_embd, n_embd, bias=False) self.wv = nn.Linear(n_embd, n_embd, bias=False) self.wo = nn.Linear(n_embd, n_embd, bias=False) def forward(self, x): B, T, C = x.shape q = self.wq(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2) k = self.wk(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2) v = self.wv(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2) scores = q @ k.transpose(-2, -1) * self.scale mask = torch.full((T, T), float('-inf'), device=x.device).triu(1) attn = F.softmax(scores + mask, dim=-1) out = (attn @ v).transpose(1, 2).contiguous().view(B, T, C) return self.wo(out) class MLP(nn.Module): def __init__(self, n_embd): super().__init__() self.fc1 = nn.Linear(n_embd, 4 * n_embd, bias=False) self.fc2 = nn.Linear(4 * n_embd, n_embd, bias=False) def forward(self, x): return self.fc2(F.relu(self.fc1(x))) class Block(nn.Module): def __init__(self, n_embd, n_head): super().__init__() self.attn = Attn(n_embd, n_head) self.mlp = MLP(n_embd) def forward(self, x): x = x + self.attn(rms_norm(x)) x = x + self.mlp(rms_norm(x)) return x class MicroGPT(nn.Module): def __init__(self, vocab_size, n_embd, n_head, n_layer, block_size): super().__init__() self.block_size = block_size self.wte = nn.Embedding(vocab_size, n_embd) self.wpe = nn.Embedding(block_size, n_embd) self.layers = nn.ModuleList([Block(n_embd, n_head) for _ in range(n_layer)]) self.lm_head = nn.Linear(n_embd, vocab_size, bias=False) def forward(self, ids): B, T = ids.shape x = self.wte(ids) + self.wpe(torch.arange(T, device=ids.device).unsqueeze(0)) for block in self.layers: x = block(x) return self.lm_head(x) @torch.no_grad() def generate_stream(self, ids, max_new_tokens=200, temperature=0.1, top_k=8, repetition_penalty=1.3, valid_vocab=None): """Yields (token_id, is_last) one token at a time.""" self.eval() generated = [] for i in range(max_new_tokens): ctx = ids[:, -self.block_size:] logits = self(ctx)[:, -1, :] # (1, vocab_size) logits = logits[0] # (vocab_size,) # Mask out any token indices beyond the actual charset if valid_vocab is not None and logits.shape[0] > valid_vocab: logits[valid_vocab:] = float('-inf') # Repetition penalty if repetition_penalty > 1.0: seen = set() for t in generated[-self.block_size:]: seen.add(t) for t in ctx[0].tolist(): seen.add(t) for t in seen: if 0 <= t < logits.shape[0]: if logits[t] > 0: logits[t] /= repetition_penalty else: logits[t] *= repetition_penalty # Temperature logits = logits / max(temperature, 0.01) # Top-k filtering if top_k > 0 and top_k < logits.shape[0]: topk_vals, _ = torch.topk(logits, top_k) logits[logits < topk_vals[-1]] = float('-inf') probs = F.softmax(logits, dim=-1) nxt = torch.multinomial(probs, 1) ids = torch.cat([ids, nxt.view(1, 1)], dim=1) token_id = nxt.item() generated.append(token_id) is_last = (i == max_new_tokens - 1) yield token_id, is_last @torch.no_grad() def generate(self, ids, max_new_tokens=200, temperature=0.1, top_k=8, repetition_penalty=1.3, valid_vocab=None): """Generate all tokens at once, return full id sequence.""" self.eval() generated = [] for token_id, _ in self.generate_stream(ids, max_new_tokens, temperature, top_k, repetition_penalty, valid_vocab): generated.append(token_id) return generated # ── Char-level tokenizer ────────────────────────────────────────────────────── class CharTokenizer: def __init__(self, uchars): self.uchars = uchars self.stoi = {c: i for i, c in enumerate(uchars)} self.itos = {i: c for i, c in enumerate(uchars)} self.vocab_size = len(uchars) def encode(self, text): """Encode text to token IDs (char-level, lowercase, skip unknown).""" return [self.stoi[c] for c in text.lower() if c in self.stoi] def decode(self, ids): """Decode token IDs back to text.""" return "".join(self.itos.get(i, "?") for i in ids) # ── Load model at startup ──────────────────────────────────────────────────── REPO = os.environ.get("HF_REPO", "LisaMegaWatts/JuliaGPT") MODEL_ID = "microjulia-philosophy" DEVICE = "cpu" print(f"Loading MicroJulia model from {REPO} ...") ckpt_path = hf_hub_download(repo_id=REPO, filename="best_model.json") with open(ckpt_path) as f: ckpt = json.load(f) hp = ckpt["hyperparams"] n_embd = hp["n_embd"] n_head = hp["n_head"] n_layer = hp["n_layer"] block_size = hp["block_size"] sd = ckpt["state_dict"] # Determine vocab_size from weight dimensions wte_weights = torch.tensor(sd["wte"], dtype=torch.float32) vocab_size = wte_weights.shape[0] # Build char tokenizer from embedded uchars tok = CharTokenizer(ckpt["uchars"]) print(f" n_embd={n_embd}, n_head={n_head}, n_layer={n_layer}, block_size={block_size}") print(f" vocab_size={vocab_size} (weights), chars={tok.vocab_size} ({tok.uchars})") if "training" in ckpt: t = ckpt["training"] print(f" trained: {t.get('total_steps_completed', '?')} steps, " f"best_val_loss={t.get('best_val_loss', '?'):.4f}") # Build model and load weights model = MicroGPT(vocab_size, n_embd, n_head, n_layer, block_size) state = {} state["wte.weight"] = wte_weights state["wpe.weight"] = torch.tensor(sd["wpe"], dtype=torch.float32) state["lm_head.weight"] = torch.tensor(sd["lm_head"], dtype=torch.float32) for i in range(n_layer): prefix = f"layer{i}" state[f"layers.{i}.attn.wq.weight"] = torch.tensor(sd[f"{prefix}.attn_wq"], dtype=torch.float32) state[f"layers.{i}.attn.wk.weight"] = torch.tensor(sd[f"{prefix}.attn_wk"], dtype=torch.float32) state[f"layers.{i}.attn.wv.weight"] = torch.tensor(sd[f"{prefix}.attn_wv"], dtype=torch.float32) state[f"layers.{i}.attn.wo.weight"] = torch.tensor(sd[f"{prefix}.attn_wo"], dtype=torch.float32) state[f"layers.{i}.mlp.fc1.weight"] = torch.tensor(sd[f"{prefix}.mlp_fc1"], dtype=torch.float32) state[f"layers.{i}.mlp.fc2.weight"] = torch.tensor(sd[f"{prefix}.mlp_fc2"], dtype=torch.float32) model.load_state_dict(state) model.eval() print(f"Model ready — {sum(p.numel() for p in model.parameters())} params") # ── FastAPI app ─────────────────────────────────────────────────────────────── app = FastAPI(title="MicroJulia", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) def _openai_error(status: int, message: str, err_type: str = "invalid_request_error", code: str = None): body = {"error": {"message": message, "type": err_type}} if code: body["error"]["code"] = code return JSONResponse(status_code=status, content=body) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): return _openai_error(exc.status_code, str(exc.detail)) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): msg = "; ".join(f"{e['loc'][-1]}: {e['msg']}" for e in exc.errors()) return _openai_error(422, msg, code="invalid_request_error") @app.get("/") def root(): return { "name": "MicroJulia", "version": "1.0.0", "description": "Pure Julia char-level GPT trained on classical philosophy", "architecture": "MicroGPT (no LayerNorm, GELU, separate Q/K/V)", "model": { "vocab_size": tok.vocab_size, "n_embd": n_embd, "n_layer": n_layer, "n_head": n_head, "block_size": block_size, "params": sum(p.numel() for p in model.parameters()), }, "endpoints": ["/v1/models", "/v1/chat/completions"], "features": ["streaming", "OpenAI-compatible"], } @app.get("/v1/models") def list_models(): return { "object": "list", "data": [{ "id": MODEL_ID, "object": "model", "created": 1700000000, "owned_by": "microjulia", }] } class Message(BaseModel): role: str content: str class ChatRequest(BaseModel): model: Optional[str] = MODEL_ID messages: List[Message] max_tokens: Optional[int] = 200 temperature: Optional[float] = 0.1 top_k: Optional[int] = 8 repetition_penalty: Optional[float] = 1.3 n: Optional[int] = 1 stream: Optional[bool] = False def _sse(data: dict) -> str: return f"data: {json.dumps(data)}\n\n" def _stream_completion(ids, max_tokens, temperature, top_k, rep_penalty, completion_id, _model, _tok): """Generator that yields SSE chunks one token at a time.""" token_count = 0 # Initial chunk with role yield _sse({ "id": completion_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": MODEL_ID, "choices": [{ "index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None, }], }) for token_id, is_last in _model.generate_stream( ids, max_new_tokens=max_tokens, temperature=temperature, top_k=top_k, repetition_penalty=rep_penalty, valid_vocab=_tok.vocab_size ): token_text = _tok.decode([token_id]) token_count += 1 finish_reason = ("length" if token_count >= max_tokens else "stop") if is_last else None yield _sse({ "id": completion_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": MODEL_ID, "choices": [{ "index": 0, "delta": {"content": token_text}, "finish_reason": finish_reason, }], }) yield "data: [DONE]\n\n" @app.post("/v1/chat/completions") def chat_completions(req: ChatRequest): _m, _t = model, tok prompt = req.messages[-1].content.strip() if req.messages else "" if not prompt: raise HTTPException(status_code=400, detail="No content in messages") ids = _t.encode(prompt) if not ids: # If prompt has no valid chars, start with a random token ids = [0] max_tokens = max(1, min(req.max_tokens or 200, block_size)) temperature = max(0.01, min(req.temperature or 0.1, 2.0)) top_k = max(1, min(req.top_k or 8, tok.vocab_size)) rep_penalty = max(1.0, min(req.repetition_penalty or 1.3, 3.0)) n = max(1, min(req.n or 1, 4)) completion_id = f"chatcmpl-{uuid.uuid4().hex[:8]}" tensor = torch.tensor([ids], dtype=torch.long) # ── Streaming ───────────────────────────────────────────────────────── if req.stream: return StreamingResponse( _stream_completion(tensor, max_tokens, temperature, top_k, rep_penalty, completion_id, _m, _t), media_type="text/event-stream", headers={"X-Accel-Buffering": "no"}, ) # ── Non-streaming ───────────────────────────────────────────────────── choices = [] total_completion_tokens = 0 for i in range(n): generated = _m.generate(tensor.clone(), max_new_tokens=max_tokens, temperature=temperature, top_k=top_k, repetition_penalty=rep_penalty, valid_vocab=_t.vocab_size) text = _t.decode(generated) total_completion_tokens += len(generated) choices.append({ "index": i, "message": {"role": "assistant", "content": text}, "finish_reason": "length" if len(generated) >= max_tokens else "stop", }) return { "id": completion_id, "object": "chat.completion", "created": int(time.time()), "model": MODEL_ID, "system_fingerprint": "microjulia-v1", "choices": choices, "usage": { "prompt_tokens": len(ids), "completion_tokens": total_completion_tokens, "total_tokens": len(ids) + total_completion_tokens, }, }