Spaces:
Sleeping
Sleeping
| """ | |
| server.py β MicroJulia OpenAI-compatible inference server | |
| Serves POST /v1/chat/completions (streaming + non-streaming) and GET /v1/models. | |
| Loads the pure-Julia MicroGPT char-level model from best_model.json on HF Hub. | |
| Architecture: Pre-norm GPT with RMSNorm (no learnable params), ReLU MLP, | |
| separate Q/K/V attention. 5K params, 28-char vocab, val_loss=2.34. | |
| Follows the RandyGPT FastAPI/uvicorn pattern for proven HF Spaces compatibility. | |
| """ | |
| import json | |
| import math | |
| import time | |
| import uuid | |
| import os | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from pathlib import Path | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.exceptions import RequestValidationError | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| from huggingface_hub import hf_hub_download | |
| # ββ Model definition ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Matches the Julia AutoGrad training code exactly: | |
| # Pre-norm blocks with RMSNorm (no learnable weights), ReLU MLP, | |
| # separate Q/K/V attention, no final norm before lm_head. | |
| def rms_norm(x, eps=1e-5): | |
| """RMSNorm without learnable parameters (matches Julia rmsnorm_ag).""" | |
| return x * torch.rsqrt(x.pow(2).mean(dim=-1, keepdim=True) + eps) | |
| class Attn(nn.Module): | |
| def __init__(self, n_embd, n_head): | |
| super().__init__() | |
| self.n_head = n_head | |
| self.head_dim = n_embd // n_head | |
| self.scale = 1.0 / math.sqrt(self.head_dim) | |
| self.wq = nn.Linear(n_embd, n_embd, bias=False) | |
| self.wk = nn.Linear(n_embd, n_embd, bias=False) | |
| self.wv = nn.Linear(n_embd, n_embd, bias=False) | |
| self.wo = nn.Linear(n_embd, n_embd, bias=False) | |
| def forward(self, x): | |
| B, T, C = x.shape | |
| q = self.wq(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2) | |
| k = self.wk(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2) | |
| v = self.wv(x).view(B, T, self.n_head, self.head_dim).transpose(1, 2) | |
| scores = q @ k.transpose(-2, -1) * self.scale | |
| mask = torch.full((T, T), float('-inf'), device=x.device).triu(1) | |
| attn = F.softmax(scores + mask, dim=-1) | |
| out = (attn @ v).transpose(1, 2).contiguous().view(B, T, C) | |
| return self.wo(out) | |
| class MLP(nn.Module): | |
| def __init__(self, n_embd): | |
| super().__init__() | |
| self.fc1 = nn.Linear(n_embd, 4 * n_embd, bias=False) | |
| self.fc2 = nn.Linear(4 * n_embd, n_embd, bias=False) | |
| def forward(self, x): | |
| return self.fc2(F.relu(self.fc1(x))) | |
| class Block(nn.Module): | |
| def __init__(self, n_embd, n_head): | |
| super().__init__() | |
| self.attn = Attn(n_embd, n_head) | |
| self.mlp = MLP(n_embd) | |
| def forward(self, x): | |
| x = x + self.attn(rms_norm(x)) | |
| x = x + self.mlp(rms_norm(x)) | |
| return x | |
| class MicroGPT(nn.Module): | |
| def __init__(self, vocab_size, n_embd, n_head, n_layer, block_size): | |
| super().__init__() | |
| self.block_size = block_size | |
| self.wte = nn.Embedding(vocab_size, n_embd) | |
| self.wpe = nn.Embedding(block_size, n_embd) | |
| self.layers = nn.ModuleList([Block(n_embd, n_head) for _ in range(n_layer)]) | |
| self.lm_head = nn.Linear(n_embd, vocab_size, bias=False) | |
| def forward(self, ids): | |
| B, T = ids.shape | |
| x = self.wte(ids) + self.wpe(torch.arange(T, device=ids.device).unsqueeze(0)) | |
| for block in self.layers: | |
| x = block(x) | |
| return self.lm_head(x) | |
| def generate_stream(self, ids, max_new_tokens=200, temperature=0.1, | |
| top_k=8, repetition_penalty=1.3, valid_vocab=None): | |
| """Yields (token_id, is_last) one token at a time.""" | |
| self.eval() | |
| generated = [] | |
| for i in range(max_new_tokens): | |
| ctx = ids[:, -self.block_size:] | |
| logits = self(ctx)[:, -1, :] # (1, vocab_size) | |
| logits = logits[0] # (vocab_size,) | |
| # Mask out any token indices beyond the actual charset | |
| if valid_vocab is not None and logits.shape[0] > valid_vocab: | |
| logits[valid_vocab:] = float('-inf') | |
| # Repetition penalty | |
| if repetition_penalty > 1.0: | |
| seen = set() | |
| for t in generated[-self.block_size:]: | |
| seen.add(t) | |
| for t in ctx[0].tolist(): | |
| seen.add(t) | |
| for t in seen: | |
| if 0 <= t < logits.shape[0]: | |
| if logits[t] > 0: | |
| logits[t] /= repetition_penalty | |
| else: | |
| logits[t] *= repetition_penalty | |
| # Temperature | |
| logits = logits / max(temperature, 0.01) | |
| # Top-k filtering | |
| if top_k > 0 and top_k < logits.shape[0]: | |
| topk_vals, _ = torch.topk(logits, top_k) | |
| logits[logits < topk_vals[-1]] = float('-inf') | |
| probs = F.softmax(logits, dim=-1) | |
| nxt = torch.multinomial(probs, 1) | |
| ids = torch.cat([ids, nxt.view(1, 1)], dim=1) | |
| token_id = nxt.item() | |
| generated.append(token_id) | |
| is_last = (i == max_new_tokens - 1) | |
| yield token_id, is_last | |
| def generate(self, ids, max_new_tokens=200, temperature=0.1, | |
| top_k=8, repetition_penalty=1.3, valid_vocab=None): | |
| """Generate all tokens at once, return full id sequence.""" | |
| self.eval() | |
| generated = [] | |
| for token_id, _ in self.generate_stream(ids, max_new_tokens, temperature, | |
| top_k, repetition_penalty, valid_vocab): | |
| generated.append(token_id) | |
| return generated | |
| # ββ Char-level tokenizer ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CharTokenizer: | |
| def __init__(self, uchars): | |
| self.uchars = uchars | |
| self.stoi = {c: i for i, c in enumerate(uchars)} | |
| self.itos = {i: c for i, c in enumerate(uchars)} | |
| self.vocab_size = len(uchars) | |
| def encode(self, text): | |
| """Encode text to token IDs (char-level, lowercase, skip unknown).""" | |
| return [self.stoi[c] for c in text.lower() if c in self.stoi] | |
| def decode(self, ids): | |
| """Decode token IDs back to text.""" | |
| return "".join(self.itos.get(i, "?") for i in ids) | |
| # ββ Load model at startup ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| REPO = os.environ.get("HF_REPO", "LisaMegaWatts/JuliaGPT") | |
| MODEL_ID = "microjulia-philosophy" | |
| DEVICE = "cpu" | |
| print(f"Loading MicroJulia model from {REPO} ...") | |
| ckpt_path = hf_hub_download(repo_id=REPO, filename="best_model.json") | |
| with open(ckpt_path) as f: | |
| ckpt = json.load(f) | |
| hp = ckpt["hyperparams"] | |
| n_embd = hp["n_embd"] | |
| n_head = hp["n_head"] | |
| n_layer = hp["n_layer"] | |
| block_size = hp["block_size"] | |
| sd = ckpt["state_dict"] | |
| # Determine vocab_size from weight dimensions | |
| wte_weights = torch.tensor(sd["wte"], dtype=torch.float32) | |
| vocab_size = wte_weights.shape[0] | |
| # Build char tokenizer from embedded uchars | |
| tok = CharTokenizer(ckpt["uchars"]) | |
| print(f" n_embd={n_embd}, n_head={n_head}, n_layer={n_layer}, block_size={block_size}") | |
| print(f" vocab_size={vocab_size} (weights), chars={tok.vocab_size} ({tok.uchars})") | |
| if "training" in ckpt: | |
| t = ckpt["training"] | |
| print(f" trained: {t.get('total_steps_completed', '?')} steps, " | |
| f"best_val_loss={t.get('best_val_loss', '?'):.4f}") | |
| # Build model and load weights | |
| model = MicroGPT(vocab_size, n_embd, n_head, n_layer, block_size) | |
| state = {} | |
| state["wte.weight"] = wte_weights | |
| state["wpe.weight"] = torch.tensor(sd["wpe"], dtype=torch.float32) | |
| state["lm_head.weight"] = torch.tensor(sd["lm_head"], dtype=torch.float32) | |
| for i in range(n_layer): | |
| prefix = f"layer{i}" | |
| state[f"layers.{i}.attn.wq.weight"] = torch.tensor(sd[f"{prefix}.attn_wq"], dtype=torch.float32) | |
| state[f"layers.{i}.attn.wk.weight"] = torch.tensor(sd[f"{prefix}.attn_wk"], dtype=torch.float32) | |
| state[f"layers.{i}.attn.wv.weight"] = torch.tensor(sd[f"{prefix}.attn_wv"], dtype=torch.float32) | |
| state[f"layers.{i}.attn.wo.weight"] = torch.tensor(sd[f"{prefix}.attn_wo"], dtype=torch.float32) | |
| state[f"layers.{i}.mlp.fc1.weight"] = torch.tensor(sd[f"{prefix}.mlp_fc1"], dtype=torch.float32) | |
| state[f"layers.{i}.mlp.fc2.weight"] = torch.tensor(sd[f"{prefix}.mlp_fc2"], dtype=torch.float32) | |
| model.load_state_dict(state) | |
| model.eval() | |
| print(f"Model ready β {sum(p.numel() for p in model.parameters())} params") | |
| # ββ FastAPI app βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI(title="MicroJulia", version="1.0.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def _openai_error(status: int, message: str, err_type: str = "invalid_request_error", code: str = None): | |
| body = {"error": {"message": message, "type": err_type}} | |
| if code: | |
| body["error"]["code"] = code | |
| return JSONResponse(status_code=status, content=body) | |
| async def http_exception_handler(request: Request, exc: HTTPException): | |
| return _openai_error(exc.status_code, str(exc.detail)) | |
| async def validation_exception_handler(request: Request, exc: RequestValidationError): | |
| msg = "; ".join(f"{e['loc'][-1]}: {e['msg']}" for e in exc.errors()) | |
| return _openai_error(422, msg, code="invalid_request_error") | |
| def root(): | |
| return { | |
| "name": "MicroJulia", | |
| "version": "1.0.0", | |
| "description": "Pure Julia char-level GPT trained on classical philosophy", | |
| "architecture": "MicroGPT (no LayerNorm, GELU, separate Q/K/V)", | |
| "model": { | |
| "vocab_size": tok.vocab_size, | |
| "n_embd": n_embd, | |
| "n_layer": n_layer, | |
| "n_head": n_head, | |
| "block_size": block_size, | |
| "params": sum(p.numel() for p in model.parameters()), | |
| }, | |
| "endpoints": ["/v1/models", "/v1/chat/completions"], | |
| "features": ["streaming", "OpenAI-compatible"], | |
| } | |
| def list_models(): | |
| return { | |
| "object": "list", | |
| "data": [{ | |
| "id": MODEL_ID, | |
| "object": "model", | |
| "created": 1700000000, | |
| "owned_by": "microjulia", | |
| }] | |
| } | |
| class Message(BaseModel): | |
| role: str | |
| content: str | |
| class ChatRequest(BaseModel): | |
| model: Optional[str] = MODEL_ID | |
| messages: List[Message] | |
| max_tokens: Optional[int] = 200 | |
| temperature: Optional[float] = 0.1 | |
| top_k: Optional[int] = 8 | |
| repetition_penalty: Optional[float] = 1.3 | |
| n: Optional[int] = 1 | |
| stream: Optional[bool] = False | |
| def _sse(data: dict) -> str: | |
| return f"data: {json.dumps(data)}\n\n" | |
| def _stream_completion(ids, max_tokens, temperature, top_k, rep_penalty, | |
| completion_id, _model, _tok): | |
| """Generator that yields SSE chunks one token at a time.""" | |
| token_count = 0 | |
| # Initial chunk with role | |
| yield _sse({ | |
| "id": completion_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": MODEL_ID, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"role": "assistant", "content": ""}, | |
| "finish_reason": None, | |
| }], | |
| }) | |
| for token_id, is_last in _model.generate_stream( | |
| ids, max_new_tokens=max_tokens, | |
| temperature=temperature, top_k=top_k, | |
| repetition_penalty=rep_penalty, valid_vocab=_tok.vocab_size | |
| ): | |
| token_text = _tok.decode([token_id]) | |
| token_count += 1 | |
| finish_reason = ("length" if token_count >= max_tokens else "stop") if is_last else None | |
| yield _sse({ | |
| "id": completion_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": MODEL_ID, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"content": token_text}, | |
| "finish_reason": finish_reason, | |
| }], | |
| }) | |
| yield "data: [DONE]\n\n" | |
| def chat_completions(req: ChatRequest): | |
| _m, _t = model, tok | |
| prompt = req.messages[-1].content.strip() if req.messages else "" | |
| if not prompt: | |
| raise HTTPException(status_code=400, detail="No content in messages") | |
| ids = _t.encode(prompt) | |
| if not ids: | |
| # If prompt has no valid chars, start with a random token | |
| ids = [0] | |
| max_tokens = max(1, min(req.max_tokens or 200, block_size)) | |
| temperature = max(0.01, min(req.temperature or 0.1, 2.0)) | |
| top_k = max(1, min(req.top_k or 8, tok.vocab_size)) | |
| rep_penalty = max(1.0, min(req.repetition_penalty or 1.3, 3.0)) | |
| n = max(1, min(req.n or 1, 4)) | |
| completion_id = f"chatcmpl-{uuid.uuid4().hex[:8]}" | |
| tensor = torch.tensor([ids], dtype=torch.long) | |
| # ββ Streaming βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if req.stream: | |
| return StreamingResponse( | |
| _stream_completion(tensor, max_tokens, temperature, top_k, | |
| rep_penalty, completion_id, _m, _t), | |
| media_type="text/event-stream", | |
| headers={"X-Accel-Buffering": "no"}, | |
| ) | |
| # ββ Non-streaming βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| choices = [] | |
| total_completion_tokens = 0 | |
| for i in range(n): | |
| generated = _m.generate(tensor.clone(), max_new_tokens=max_tokens, | |
| temperature=temperature, top_k=top_k, | |
| repetition_penalty=rep_penalty, | |
| valid_vocab=_t.vocab_size) | |
| text = _t.decode(generated) | |
| total_completion_tokens += len(generated) | |
| choices.append({ | |
| "index": i, | |
| "message": {"role": "assistant", "content": text}, | |
| "finish_reason": "length" if len(generated) >= max_tokens else "stop", | |
| }) | |
| return { | |
| "id": completion_id, | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": MODEL_ID, | |
| "system_fingerprint": "microjulia-v1", | |
| "choices": choices, | |
| "usage": { | |
| "prompt_tokens": len(ids), | |
| "completion_tokens": total_completion_tokens, | |
| "total_tokens": len(ids) + total_completion_tokens, | |
| }, | |
| } | |