Spaces:
Sleeping
Sleeping
| # app.py — Minimal, pro UI with big transparent sphere and clean hover | |
| import math, json, random, time, threading | |
| from dataclasses import dataclass, asdict | |
| from typing import List, Tuple, Dict, Any, Optional | |
| from functools import lru_cache | |
| import numpy as np | |
| import plotly.graph_objs as go | |
| import plotly.io as pio | |
| import gradio as gr | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from data_utils import load_piqa, load_hellaswag, hash_vectorize | |
| # ---------- Minimal style ---------- | |
| CUSTOM_CSS = """ | |
| :root { --radius: 14px; --fg:#0f172a; --muted:#64748b; --line:#e5e7eb; } | |
| * { font-family: Inter, ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica Neue, Arial, "Noto Sans", "Apple Color Emoji", "Segoe UI Emoji"; } | |
| .gradio-container { max-width: 1180px !important; } | |
| #header { border-radius: var(--radius); padding: 8px 6px; } | |
| h1, h2, h3, .gr-markdown { color: var(--fg); } | |
| .gr-button { border-radius: 10px; } | |
| .controls .gr-group, .panel { border: 1px solid var(--line); border-radius: var(--radius); } | |
| .panel { padding: 10px; } | |
| #stats { font-weight: 300; color: var(--fg); } | |
| #stats strong { font-weight: 500; } | |
| .small { font-size: 13px; color: var(--muted); } | |
| """ | |
| # ---------- Genome ---------- | |
| class Genome: | |
| d_model: int | |
| n_layers: int | |
| n_heads: int | |
| ffn_mult: float | |
| memory_tokens: int | |
| dropout: float | |
| species: int = 0 | |
| fitness: float = float("inf") | |
| acc: Optional[float] = None | |
| def vector(self) -> np.ndarray: | |
| return np.array([ | |
| self.d_model / 1024.0, | |
| self.n_layers / 24.0, | |
| self.n_heads / 32.0, | |
| self.ffn_mult / 8.0, | |
| self.memory_tokens / 64.0, | |
| self.dropout / 0.5 | |
| ], dtype=np.float32) | |
| def random_genome(rng: random.Random) -> Genome: | |
| return Genome( | |
| d_model=rng.choice([256, 384, 512, 640]), | |
| n_layers=rng.choice([4, 6, 8, 10, 12]), | |
| n_heads=rng.choice([4, 6, 8, 10, 12]), | |
| ffn_mult=rng.choice([2.0, 3.0, 4.0, 6.0]), | |
| memory_tokens=rng.choice([0, 4, 8, 16]), | |
| dropout=rng.choice([0.0, 0.05, 0.1, 0.15]), | |
| species=rng.randrange(5) | |
| ) | |
| def mutate(g: Genome, rng: random.Random, rate: float) -> Genome: | |
| g = Genome(**asdict(g)) | |
| if rng.random() < rate: g.d_model = rng.choice([256, 384, 512, 640]) | |
| if rng.random() < rate: g.n_layers = rng.choice([4, 6, 8, 10, 12]) | |
| if rng.random() < rate: g.n_heads = rng.choice([4, 6, 8, 10, 12]) | |
| if rng.random() < rate: g.ffn_mult = rng.choice([2.0, 3.0, 4.0, 6.0]) | |
| if rng.random() < rate: g.memory_tokens = rng.choice([0, 4, 8, 16]) | |
| if rng.random() < rate: g.dropout = rng.choice([0.0, 0.05, 0.1, 0.15]) | |
| if rng.random() < rate * 0.5: g.species = rng.randrange(5) | |
| g.fitness = float("inf"); g.acc = None | |
| return g | |
| def crossover(a: Genome, b: Genome, rng: random.Random) -> Genome: | |
| return Genome( | |
| d_model = a.d_model if rng.random()<0.5 else b.d_model, | |
| n_layers = a.n_layers if rng.random()<0.5 else b.n_layers, | |
| n_heads = a.n_heads if rng.random()<0.5 else b.n_heads, | |
| ffn_mult = a.ffn_mult if rng.random()<0.5 else b.ffn_mult, | |
| memory_tokens = a.memory_tokens if rng.random()<0.5 else b.memory_tokens, | |
| dropout = a.dropout if rng.random()<0.5 else b.dropout, | |
| species = a.species if rng.random()<0.5 else b.species, | |
| fitness = float("inf"), | |
| acc = None | |
| ) | |
| # ---------- Proxy fitness ---------- | |
| def rastrigin(x: np.ndarray) -> float: | |
| A, n = 10.0, x.shape[0] | |
| return A * n + np.sum(x**2 - A * np.cos(2 * math.pi * x)) | |
| class TinyMLP(nn.Module): | |
| def __init__(self, in_dim: int, genome: Genome): | |
| super().__init__() | |
| h1 = max(64, int(0.25 * genome.d_model)) | |
| h2 = max(32, int(genome.ffn_mult * 32)) | |
| self.net = nn.Sequential( | |
| nn.Linear(in_dim, h1), nn.ReLU(), | |
| nn.Linear(h1, h2), nn.ReLU(), | |
| nn.Linear(h2, 1) | |
| ) | |
| def forward(self, x): return self.net(x).squeeze(-1) | |
| def _cached_dataset(name: str): | |
| if name.startswith("PIQA"): return load_piqa(subset=800, seed=42) | |
| if name.startswith("HellaSwag"): return load_hellaswag(subset=800, seed=42) | |
| return None | |
| def _train_eval_proxy(genome: Genome, dataset_name: str, explore: float, device: str = "cpu"): | |
| data = _cached_dataset(dataset_name) | |
| if data is None: | |
| v = genome.vector() * 2 - 1 | |
| return float(rastrigin(v)), None | |
| Xtr_txt, ytr, Xva_txt, yva = data | |
| nfeat = 4096 | |
| Xtr = hash_vectorize(Xtr_txt, n_features=nfeat, seed=1234) | |
| Xva = hash_vectorize(Xva_txt, n_features=nfeat, seed=5678) | |
| Xtr_t = torch.from_numpy(Xtr) | |
| ytr_t = torch.from_numpy(ytr.astype(np.float32)) | |
| Xva_t = torch.from_numpy(Xva) | |
| yva_t = torch.from_numpy(yva.astype(np.float32)) | |
| model = TinyMLP(nfeat, genome).to(device) | |
| opt = optim.AdamW(model.parameters(), lr=2e-3) | |
| lossf = nn.BCEWithLogitsLoss() | |
| model.train() | |
| steps, bs, N = 120, 256, Xtr_t.size(0) | |
| for _ in range(steps): | |
| idx = torch.randint(0, N, (bs,)) | |
| xb = Xtr_t[idx].to(device); yb = ytr_t[idx].to(device) | |
| logits = model(xb); loss = lossf(logits, yb) | |
| opt.zero_grad(); loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) | |
| opt.step() | |
| model.eval() | |
| with torch.no_grad(): | |
| logits = model(Xva_t.to(device)) | |
| probs = torch.sigmoid(logits).cpu().numpy() | |
| if dataset_name.startswith("PIQA"): | |
| probs = probs.reshape(-1, 2); yva2 = yva.reshape(-1, 2) | |
| pred = (probs[:,0] > probs[:,1]).astype(np.int64) | |
| truth = (yva2[:,0] == 1).astype(np.int64) | |
| acc = float((pred == truth).mean()) | |
| else: | |
| probs = probs.reshape(-1, 4); yva2 = yva.reshape(-1, 4) | |
| pred = probs.argmax(axis=1); truth = yva2.argmax(axis=1) | |
| acc = float((pred == truth).mean()) | |
| parsimony = 0.00000002 * (genome.d_model**2 * genome.n_layers) + 0.0001 * genome.memory_tokens | |
| noise = np.random.normal(scale=0.01 * max(0.0, min(1.0, explore))) | |
| fitness = (1.0 - acc) + parsimony + noise | |
| return float(max(0.0, min(1.5, fitness))), float(acc) | |
| def evaluate_genome(genome: Genome, dataset: str, explore: float): | |
| if dataset == "Demo (Surrogate)": | |
| v = genome.vector() * 2 - 1 | |
| base = rastrigin(v) | |
| parsimony = 0.001 * (genome.d_model + 50*genome.n_layers + 20*genome.n_heads + 100*genome.memory_tokens) | |
| noise = np.random.normal(scale=0.05 * max(0.0, min(1.0, explore))) | |
| return float(base + parsimony + noise), None | |
| if dataset.startswith("PIQA"): | |
| return _train_eval_proxy(genome, "PIQA", explore) | |
| if dataset.startswith("HellaSwag"): | |
| return _train_eval_proxy(genome, "HellaSwag", explore) | |
| v = genome.vector() * 2 - 1 | |
| return float(rastrigin(v)), None | |
| # ---------- Viz helpers (bigger, transparent sphere) ---------- | |
| PALETTE = ["#111827", "#334155", "#475569", "#64748b", "#94a3b8"] # muted grayscale/blue | |
| BG = "white" | |
| def sphere_project(points: np.ndarray) -> np.ndarray: | |
| rng = np.random.RandomState(42) | |
| W = rng.normal(size=(points.shape[1], 3)).astype(np.float32) | |
| Y = points @ W | |
| norms = np.linalg.norm(Y, axis=1, keepdims=True) + 1e-8 | |
| return (Y / norms) * 1.2 | |
| def _species_colors(species: np.ndarray) -> list: | |
| colors = [] | |
| for s in species: | |
| c = PALETTE[int(s) % len(PALETTE)] | |
| colors.append(c) | |
| return colors | |
| def make_sphere_figure(points3d: np.ndarray, genomes: List[Genome], gen_idx: int) -> go.Figure: | |
| species = np.array([g.species for g in genomes]) | |
| colors = _species_colors(species) | |
| custom = np.array([[g.d_model, g.n_layers, g.n_heads, g.ffn_mult, g.memory_tokens, g.dropout, | |
| g.species, g.fitness, (g.acc if g.acc is not None else -1.0)] | |
| for g in genomes], dtype=np.float32) | |
| scatter = go.Scatter3d( | |
| x=points3d[:,0], y=points3d[:,1], z=points3d[:,2], | |
| mode='markers', | |
| marker=dict(size=6.5, color=colors, opacity=0.92), | |
| customdata=custom, | |
| hovertemplate=( | |
| "<b>Genome</b><br>" | |
| "d_model=%{customdata[0]:.0f} · layers=%{customdata[1]:.0f} · heads=%{customdata[2]:.0f}<br>" | |
| "ffn_mult=%{customdata[3]:.1f} · mem=%{customdata[4]:.0f} · drop=%{customdata[5]:.2f}<br>" | |
| "species=%{customdata[6]:.0f}<br>" | |
| "fitness=%{customdata[7]:.4f}<br>" | |
| "accuracy=%{customdata[8]:.3f}<extra></extra>" | |
| ) | |
| ) | |
| # Subtle, large sphere | |
| u = np.linspace(0, 2*np.pi, 72) | |
| v = np.linspace(0, np.pi, 36) | |
| r = 1.2 | |
| xs = r*np.outer(np.cos(u), np.sin(v)) | |
| ys = r*np.outer(np.sin(u), np.sin(v)) | |
| zs = r*np.outer(np.ones_like(u), np.cos(v)) | |
| sphere = go.Surface( | |
| x=xs, y=ys, z=zs, | |
| opacity=0.08, | |
| showscale=False, | |
| colorscale=[[0, "#cbd5e1"], [1, "#cbd5e1"]], | |
| hoverinfo="skip" | |
| ) | |
| layout = go.Layout( | |
| paper_bgcolor=BG, plot_bgcolor=BG, | |
| title=f"Evo Architecture Sphere — Gen {gen_idx}", | |
| scene=dict( | |
| xaxis=dict(visible=False), yaxis=dict(visible=False), zaxis=dict(visible=False), | |
| bgcolor=BG | |
| ), | |
| margin=dict(l=0, r=0, t=36, b=0), | |
| showlegend=False, | |
| height=720, | |
| font=dict(family="Inter, Arial, sans-serif", size=14) | |
| ) | |
| return go.Figure(data=[sphere, scatter], layout=layout) | |
| def make_history_figure(history: List[Tuple[int,float,float]], metric: str) -> go.Figure: | |
| xs = [h[0] for h in history] | |
| if metric == "Accuracy": | |
| ys = [h[2] if (h[2] == h[2]) else None for h in history] | |
| title, ylab = "Best Accuracy per Generation", "Accuracy" | |
| else: | |
| ys = [h[1] for h in history] | |
| title, ylab = "Best Fitness per Generation", "Fitness (↓ better)" | |
| fig = go.Figure(data=[go.Scatter(x=xs, y=ys, mode="lines+markers", line=dict(width=2))]) | |
| fig.update_layout( | |
| paper_bgcolor=BG, plot_bgcolor=BG, | |
| title=title, xaxis_title="Generation", yaxis_title=ylab, | |
| margin=dict(l=30, r=10, t=36, b=30), | |
| height=340, | |
| font=dict(family="Inter, Arial, sans-serif", size=14) | |
| ) | |
| return fig | |
| def fig_to_html(fig: go.Figure) -> str: | |
| # Robust Plotly rendering inside Gradio | |
| return pio.to_html(fig, include_plotlyjs="cdn", full_html=False, config=dict(displaylogo=False)) | |
| def approx_params(g: Genome) -> int: | |
| per_layer = (4.0 + 2.0 * float(g.ffn_mult)) * (g.d_model ** 2) | |
| total = per_layer * g.n_layers + 1000 * g.memory_tokens | |
| return int(total) | |
| # ---------- Orchestrator ---------- | |
| class EvoRunner: | |
| def __init__(self): | |
| self.lock = threading.Lock() | |
| self.running = False | |
| self.stop_flag = False | |
| self.state: Dict[str, Any] = {} | |
| def run(self, dataset, pop_size, generations, mutation_rate, explore, exploit, seed, pace_ms, metric_choice): | |
| rng = random.Random(int(seed)) | |
| self.stop_flag = False | |
| self.running = True | |
| pop: List[Genome] = [random_genome(rng) for _ in range(pop_size)] | |
| for g in pop: | |
| fit, acc = evaluate_genome(g, dataset, explore) | |
| g.fitness, g.acc = fit, acc | |
| history: List[Tuple[int,float,float]] = [] | |
| best_overall: Optional[Genome] = None | |
| for gen in range(1, generations+1): | |
| if self.stop_flag: break | |
| k = max(2, int(2 + exploit * 5)) | |
| parents = [] | |
| for _ in range(pop_size): | |
| sample = rng.sample(pop, k=k) | |
| parents.append(min(sample, key=lambda x: x.fitness)) | |
| children = [] | |
| for i in range(0, pop_size, 2): | |
| a = parents[i]; b = parents[(i+1) % pop_size] | |
| child1 = mutate(crossover(a,b,rng), rng, mutation_rate) | |
| child2 = mutate(crossover(b,a,rng), rng, mutation_rate) | |
| children.extend([child1, child2]) | |
| children = children[:pop_size] | |
| for c in children: | |
| fit, acc = evaluate_genome(c, dataset, explore) | |
| c.fitness, c.acc = fit, acc | |
| elite_n = max(1, pop_size // 10) | |
| elites = sorted(pop, key=lambda x: x.fitness)[:elite_n] | |
| pop = sorted(children, key=lambda x: x.fitness) | |
| pop[-elite_n:] = elites | |
| best = min(pop, key=lambda x: x.fitness) | |
| if best_overall is None or best.fitness < best_overall.fitness: best_overall = best | |
| history.append((gen, best.fitness, (best.acc if best.acc is not None else float("nan")))) | |
| P = np.stack([g.vector() for g in pop], axis=0) | |
| P3 = sphere_project(P) | |
| sphere_fig = make_sphere_figure(P3, pop, gen) | |
| hist_fig = make_history_figure(history, metric_choice) | |
| top = sorted(pop, key=lambda x: x.fitness)[: min(12, len(pop))] | |
| top_table = [ | |
| { | |
| "gen": gen, | |
| "fitness": round(t.fitness, 4), | |
| "accuracy": (None if t.acc is None else round(float(t.acc), 4)), | |
| "d_model": t.d_model, | |
| "layers": t.n_layers, | |
| "heads": t.n_heads, | |
| "ffn_mult": t.ffn_mult, | |
| "mem": t.memory_tokens, | |
| "dropout": t.dropout, | |
| "species": t.species, | |
| "params_approx": approx_params(t) | |
| } for t in top | |
| ] | |
| best_card = top_table[0] if len(top_table) else {} | |
| with self.lock: | |
| self.state = { | |
| "sphere_html": fig_to_html(sphere_fig), | |
| "history_html": fig_to_html(hist_fig), | |
| "top": top_table, | |
| "best": best_card, | |
| "gen": gen, | |
| "dataset": dataset, | |
| "metric": metric_choice | |
| } | |
| time.sleep(max(0.0, pace_ms/1000.0)) | |
| self.running = False | |
| def start(self, *args, **kwargs): | |
| if self.running: return | |
| t = threading.Thread(target=self.run, args=args, kwargs=kwargs, daemon=True) | |
| t.start() | |
| def stop(self): self.stop_flag = True | |
| runner = EvoRunner() | |
| # ---------- UI callbacks ---------- | |
| def start_evo(dataset, pop, gens, mut, explore, exploit, seed, pace_ms, metric_choice): | |
| runner.start(dataset, int(pop), int(gens), float(mut), float(explore), float(exploit), int(seed), int(pace_ms), metric_choice) | |
| return (gr.update(interactive=False), gr.update(interactive=True)) | |
| def stop_evo(): | |
| runner.stop() | |
| return (gr.update(interactive=True), gr.update(interactive=False)) | |
| def poll_state(): | |
| with runner.lock: | |
| s = runner.state.copy() | |
| sphere_html = s.get("sphere_html", "") | |
| history_html = s.get("history_html", "") | |
| best = s.get("best", {}) | |
| gen = s.get("gen", 0) | |
| dataset = s.get("dataset", "Demo (Surrogate)") | |
| top = s.get("top", []) | |
| if best: | |
| acc_txt = "—" if best.get("accuracy") is None else f"{best.get('accuracy'):.3f}" | |
| stats_md = ( | |
| f"**Dataset:** {dataset} \n" | |
| f"**Generation:** {gen} \n" | |
| f"**Best fitness:** {best.get('fitness','–')} \n" | |
| f"**Best accuracy:** {acc_txt} \n" | |
| f"**Config:** d_model={best.get('d_model')} · layers={best.get('layers')} · " | |
| f"heads={best.get('heads')} · ffn_mult={best.get('ffn_mult')} · mem={best.get('mem')} · " | |
| f"dropout={best.get('dropout')} \n" | |
| f"**~Params (rough):** {best.get('params_approx'):,}" | |
| ) | |
| else: | |
| stats_md = "Waiting… click **Start Evolution**." | |
| df = pd.DataFrame(top) | |
| return sphere_html, history_html, stats_md, df | |
| def export_snapshot(): | |
| from json import dumps | |
| with runner.lock: | |
| payload = dumps(runner.state, default=lambda o: o, indent=2) | |
| path = "evo_snapshot.json" | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(payload) | |
| return path | |
| # ---------- Build UI (minimal layout) ---------- | |
| with gr.Blocks(css=CUSTOM_CSS, theme=gr.themes.Soft()) as demo: | |
| with gr.Column(elem_id="header"): | |
| gr.Markdown("## Evo Playground — Minimal Live Evolution (PIQA / HellaSwag accuracy)") | |
| with gr.Row(): | |
| with gr.Column(scale=1, elem_classes=["controls"]): | |
| with gr.Group(): | |
| dataset = gr.Dropdown( | |
| label="Dataset", | |
| choices=["Demo (Surrogate)", "PIQA (Phase 2)", "HellaSwag (Phase 2)"], | |
| value="Demo (Surrogate)", | |
| info="PIQA/HellaSwag compute real proxy accuracy; Demo is a fast surrogate." | |
| ) | |
| pop = gr.Slider(8, 80, value=24, step=2, label="Population size") | |
| gens = gr.Slider(5, 200, value=60, step=1, label="Max generations") | |
| mut = gr.Slider(0.05, 0.9, value=0.25, step=0.01, label="Mutation rate") | |
| with gr.Row(): | |
| explore = gr.Slider(0.0, 1.0, value=0.35, step=0.05, label="Exploration") | |
| exploit = gr.Slider(0.0, 1.0, value=0.65, step=0.05, label="Exploitation") | |
| seed = gr.Number(value=42, label="Seed", precision=0) | |
| pace = gr.Slider(0, 1000, value=120, step=10, label="Pace (ms between gens)") | |
| metric_choice = gr.Radio(choices=["Accuracy", "Fitness"], value="Accuracy", label="History Metric") | |
| with gr.Row(): | |
| start = gr.Button("▶ Start Evolution", variant="primary") | |
| stop = gr.Button("⏹ Stop", variant="secondary") | |
| with gr.Group(elem_classes=["panel"]): | |
| stats_md = gr.Markdown("Waiting…", elem_id="stats") | |
| with gr.Group(elem_classes=["panel"]): | |
| export_btn = gr.Button("Export Snapshot (JSON)") | |
| export_file = gr.File(label="Download snapshot", visible=False) | |
| with gr.Column(scale=2): | |
| with gr.Group(elem_classes=["panel"]): | |
| sphere_html = gr.HTML() | |
| with gr.Group(elem_classes=["panel"]): | |
| hist_html = gr.HTML() | |
| with gr.Group(elem_classes=["panel"]): | |
| top_df = gr.Dataframe(label="Top Genomes (live)", wrap=True, interactive=False) | |
| # Wiring | |
| start.click(start_evo, [dataset, pop, gens, mut, explore, exploit, seed, pace, metric_choice], [start, stop]) | |
| stop.click(stop_evo, [], [start, stop]) | |
| export_btn.click(export_snapshot, [], [export_file]) | |
| # Initial paint + polling | |
| demo.load(poll_state, None, [sphere_html, hist_html, stats_md, top_df]) | |
| gr.Timer(0.7).tick(poll_state, None, [sphere_html, hist_html, stats_md, top_df]) | |
| if __name__ == "__main__": | |
| demo.launch() | |