#!/usr/bin/env python3
# =============================================================================
# PROJECT : ScaleUp-OmegaB | EVOLUTION v4 — TRUE FEDERATED EXPANSION
# AUTHOR : imsuprtwo2
# ENGINE : Federated Dynamic Synaptic Expansion (F-DSE)
# BACKEND : NumPy / AdaGrad · no hard param ceiling
# BROWSER : TensorFlow.js WebWorker — real gradient computation on user PC
# STORAGE : HuggingFace Hub (imsuprtwo2/ScaleUp-OmegaB)
# DATASETS : Random-cycling public HuggingFace datasets, any format
# SPACES : RAM-aware expansion gate — safe on free HF Spaces (16 GB)
# =============================================================================
import os, sys, json, time, uuid, math, random, logging, threading, traceback
import asyncio, struct, gc, platform
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional, Tuple
from collections import deque
import numpy as np
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# ── Optional heavy deps ──────────────────────────────────────────────────────
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
try:
import torch
HAS_TORCH = True
TORCH_DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
except ImportError:
HAS_TORCH = False
TORCH_DEVICE = "cpu"
try:
from huggingface_hub import HfApi, hf_hub_download
HAS_HF_HUB = True
except ImportError:
HAS_HF_HUB = False
try:
from datasets import load_dataset, get_dataset_config_names
HAS_DATASETS = True
except ImportError:
HAS_DATASETS = False
# =============================================================================
# [ S0 GLOBAL CONFIG ]
# =============================================================================
logging.basicConfig(
level = logging.INFO,
format = "%(asctime)s [%(levelname)s] %(name)s - %(message)s",
datefmt = "%H:%M:%S",
)
log = logging.getLogger("OMEGA")
# Detect HuggingFace Spaces environment
IS_HF_SPACES = bool(os.environ.get("SPACE_ID") or os.environ.get("SPACES_ZERO_GPU"))
HF_TOKEN = os.environ.get("HF_TOKEN", "")
HF_REPO_ID = os.environ.get("HF_REPO_ID", "imsuprtwo2/ScaleUp-OmegaB")
CHECKPOINT_EVERY = int(os.environ.get("CHECKPOINT_EVERY", "300"))
EXPANSION_COOLDOWN = float(os.environ.get("EXPANSION_COOLDOWN", "5"))
LOSS_THRESHOLD = float(os.environ.get("LOSS_THRESHOLD", "0.065"))
LEARNING_RATE = float(os.environ.get("LEARNING_RATE", "0.0003"))
PORT = int(os.environ.get("PORT", "7860"))
INPUT_DIM = 128
OUTPUT_DIM = 128
# RAM safety thresholds — critical for HF Spaces free tier (16 GB)
RAM_EXPAND_CEILING = 0.78 # pause expansion above this fraction
RAM_HARD_CEILING = 0.90 # force gc.collect above this
_START_TIME = time.time()
# Seed datasets that reliably load as fallback
SEED_DATASETS = [
("wikipedia", "20220301.simple"),
("bookcorpus", None),
("squad", None),
("glue", "sst2"),
("ag_news", None),
("multi_news", None),
("cnn_dailymail", "3.0.0"),
("scientific_papers", "arxiv"),
("cc_news", None),
("openwebtext", None),
]
TEXT_FIELDS = [
"text","content","body","article","document","passage",
"sentence","review","description","summary","context",
"question","answer","title","abstract","story","input","output",
]
# =============================================================================
# [ S1 HARDWARE SCANNER ]
# =============================================================================
@dataclass
class HardwareProfile:
client_id: str = ""
cpu_cores_phys: int = 1
cpu_cores_logic: int = 1
cpu_freq_mhz: float = 0.0
ram_total_gb: float = 0.0
ram_avail_gb: float = 0.0
gpu_name: str = "None"
gpu_vram_gb: float = 0.0
cuda_available: bool = False
platform_str: str = ""
compute_score: float = 0.0
allocated_threads: int = 1
allocated_batch: int = 8
def to_dict(self) -> Dict:
return asdict(self)
class HardwareScanner:
@staticmethod
def scan(client_id: str = "server") -> HardwareProfile:
p = HardwareProfile(client_id=client_id, platform_str=platform.system())
if HAS_PSUTIL:
p.cpu_cores_phys = psutil.cpu_count(logical=False) or 1
p.cpu_cores_logic = psutil.cpu_count(logical=True) or 1
freq = psutil.cpu_freq()
p.cpu_freq_mhz = round(freq.current, 1) if freq else 0.0
vm = psutil.virtual_memory()
p.ram_total_gb = round(vm.total / 1024**3, 2)
p.ram_avail_gb = round(vm.available/ 1024**3, 2)
else:
p.cpu_cores_logic = os.cpu_count() or 2
p.cpu_cores_phys = p.cpu_cores_logic
if HAS_TORCH:
p.cuda_available = torch.cuda.is_available()
if p.cuda_available:
p.gpu_name = torch.cuda.get_device_name(0)
props = torch.cuda.get_device_properties(0)
p.gpu_vram_gb = round(props.total_memory / 1024**3, 2)
cpu_pts = p.cpu_cores_logic * max(p.cpu_freq_mhz, 1000) / 1000
ram_pts = p.ram_avail_gb * 4
vram_pts = p.gpu_vram_gb * 80
p.compute_score = round(cpu_pts + ram_pts + vram_pts, 2)
s = p.compute_score
if s < 20: p.allocated_threads, p.allocated_batch = 1, 8
elif s < 60: p.allocated_threads, p.allocated_batch = 2, 16
elif s < 150: p.allocated_threads, p.allocated_batch = 4, 32
elif s < 400: p.allocated_threads, p.allocated_batch = 6, 64
else: p.allocated_threads, p.allocated_batch = min(p.cpu_cores_phys, 12), 128
log.info(
f"[HW] {client_id[:12]} cores={p.cpu_cores_logic} "
f"ram={p.ram_avail_gb}GB gpu={p.gpu_name} "
f"score={p.compute_score} T={p.allocated_threads} B={p.allocated_batch}"
)
return p
def ram_usage_fraction() -> float:
"""Returns current RAM usage as a 0.0-1.0 fraction."""
if HAS_PSUTIL:
return psutil.virtual_memory().percent / 100.0
return 0.4
# =============================================================================
# [ S2 RESPONSE GENERATOR ]
# =============================================================================
class ResponseGenerator:
"""
Generates technically-accurate, varied chat messages drawn from real
training metrics. Each call produces a different style of message so
the model never sounds like a broken record.
"""
_LOSS_LABELS = [
(0.010, "near-zero - pattern fully saturated"),
(0.030, "very low - high predictive confidence"),
(0.065, "moderate - healthy gradient flow"),
(0.120, "elevated - expansion pressure active"),
(0.250, "high - novel input distribution"),
(999.0, "very high - unexplored representation space"),
]
def __init__(self):
self._prev_loss = 1.0
self._prev_params = 0
self._cycle = 0
def _loss_label(self, loss: float) -> str:
for threshold, label in self._LOSS_LABELS:
if loss <= threshold:
return label
return "indeterminate"
def generate(self, *, loss: float, params: int, layers: int,
expansions: int, expanded: bool, new_width: int,
dataset: str, datasets_tried: int, clients: int,
total_score: float, fed_updates: int, train_steps: int,
batch: int, iq: float, sps: float, workers: int,
ram_pct: float, expansion_paused: bool) -> str:
ds_short = dataset.split("/")[-1][:22] if dataset != "none" else "loading..."
prev_loss = self._prev_loss
prev_par = self._prev_params
self._prev_loss = loss
self._prev_params = params
# --- Expansion just happened: always lead with that ---
if expanded:
self._cycle += 1
return (
f"Neurogenesis #{expansions}: injected a {new_width}-neuron dense layer. "
f"Parameter count {prev_par:,} -> {params:,} (+{params - prev_par:,}). "
f"Triggered by loss={loss:.5f} on dataset [{ds_short}]. "
f"Architecture is now {layers} layers deep."
)
# --- RAM warning ---
if expansion_paused and ram_pct > RAM_EXPAND_CEILING * 100:
return (
f"Expansion paused: RAM at {ram_pct:.1f}% (HF Spaces safety gate at "
f"{RAM_EXPAND_CEILING*100:.0f}%). Holding at {params:,} params / {layers} layers. "
f"Training continues at full speed. Loss={loss:.5f}, IQ={iq:.4f}. "
f"Expansion will resume automatically when memory clears."
)
# --- Rotate through informative templates ---
self._cycle = (self._cycle + 1) % 6
c = self._cycle
if c == 0:
pct_change = abs(prev_loss - loss) / (prev_loss + 1e-9) * 100
direction = "improved" if loss < prev_loss else "increased"
return (
f"Loss {direction} {prev_loss:.5f} -> {loss:.5f} ({pct_change:.1f}%) "
f"on dataset [{ds_short}]. "
f"Architecture: {layers} layers, {params:,} parameters. "
f"IQ index: {iq:.4f}. Server training: ~{sps:.1f} steps/sec."
)
elif c == 1:
return (
f"Trained a {batch}-sample mini-batch against [{ds_short}] "
f"(dataset #{datasets_tried} in rotation). "
f"Loss={loss:.5f} ({self._loss_label(loss)}). "
f"{clients} distributed client(s) contributing {total_score:.0f} compute-units. "
f"{fed_updates} federated gradient packets absorbed this session."
)
elif c == 2:
return (
f"Your input was embedded across all {layers} synaptic layers "
f"({params:,} parameters, {expansions} neurogenesis events so far). "
f"Prediction error: {loss:.5f}. AdaGrad accumulators updated. "
f"Continuous training on [{ds_short}] running in background."
)
elif c == 3:
return (
f"Server: {workers} worker thread(s), {sps:.1f} steps/sec, "
f"{train_steps:,} total gradient steps. "
f"Browser workers: {clients - 1} client(s) running TF.js locally. "
f"Fed updates: {fed_updates}. Current loss: {loss:.5f}."
)
elif c == 4:
return (
f"Dataset [{ds_short}] ({datasets_tried} rotated total). "
f"Loss={loss:.5f} | Params={params:,} | Layers={layers} | IQ={iq:.4f}. "
f"RAM usage: {ram_pct:.1f}%. "
f"No expansion needed at this loss level."
)
else:
return (
f"Gradient descent step complete. Loss: {loss:.5f} "
f"({self._loss_label(loss)}). "
f"Model has {params:,} active parameters in {layers} layers. "
f"Global IQ index: {iq:.4f}. "
f"Weights synced to HuggingFace Hub: imsuprtwo2/ScaleUp-OmegaB."
)
# =============================================================================
# [ S3 DATASET MANAGER ]
# =============================================================================
class DatasetManager:
def __init__(self):
self._lock = threading.Lock()
self._blacklist: set = set()
self._catalogue: List[str] = []
self._last_refresh = 0.0
self._refresh_interval= 3600.0
self._current_iter = None
self._current_name = "none"
self._rows_consumed = 0
self.datasets_tried = 0
self._switch_after = 2000
self._log: deque = deque(maxlen=40)
self._queue: deque = deque(maxlen=512)
t = threading.Thread(target=self._prefetch_loop, daemon=True)
t.start()
def _refresh_catalogue(self):
now = time.time()
if now - self._last_refresh < self._refresh_interval:
return
self._last_refresh = now
if not HAS_HF_HUB:
return
try:
api = HfApi()
info = api.list_datasets(limit=5000, full=False)
ids = [d.id for d in info if d.id]
random.shuffle(ids)
with self._lock:
self._catalogue = ids
log.info(f"[DATASET] Catalogue: {len(ids)} datasets.")
except Exception as e:
log.warning(f"[DATASET] Catalogue refresh failed: {e}")
def _pick(self) -> Tuple[str, Optional[str]]:
self._refresh_catalogue()
with self._lock:
cands = [d for d in self._catalogue if d not in self._blacklist]
if cands:
return random.choice(cands), None
live = [s for s in SEED_DATASETS if s[0] not in self._blacklist]
if live:
return random.choice(live)
with self._lock:
self._blacklist.clear()
return random.choice(SEED_DATASETS)
def _extract(self, record: Dict) -> Optional[str]:
for f in TEXT_FIELDS:
if f in record:
v = record[f]
if isinstance(v, str) and len(v.strip()) > 15:
return v.strip()
if isinstance(v, list):
j = " ".join(str(x) for x in v if isinstance(x, str))
if len(j.strip()) > 15:
return j.strip()
parts = [v for v in record.values() if isinstance(v, str) and len(v) > 8]
result = " ".join(parts)
return result if len(result) > 15 else None
def _load(self):
if not HAS_DATASETS:
return
for _ in range(10):
ds_id, config = self._pick()
self.datasets_tried += 1
label = ds_id + (f"/{config}" if config else "")
log.info(f"[DATASET] Loading [{self.datasets_tried}] {label}")
self._log.append(f"[{time.strftime('%H:%M:%S')}] Loading {label}")
try:
kw: Dict[str, Any] = dict(
path = ds_id,
split = "train",
streaming = True,
trust_remote_code = True,
)
if config:
kw["name"] = config
else:
try:
cfgs = get_dataset_config_names(ds_id, trust_remote_code=True)
if cfgs:
kw["name"] = cfgs[0]
except Exception:
pass
ds = load_dataset(**kw)
sample = next(iter(ds))
text = self._extract(sample)
if text is None:
raise ValueError(f"No text field found in {ds_id}")
self._current_iter = iter(ds)
self._current_name = label
self._rows_consumed = 0
log.info(f"[DATASET] Active: {label}")
self._log.append(f"[{time.strftime('%H:%M:%S')}] Active: {label}")
return
except Exception as e:
log.warning(f"[DATASET] Failed {ds_id}: {e}")
with self._lock:
self._blacklist.add(ds_id)
def _prefetch_loop(self):
while True:
try:
if len(self._queue) < 128:
if self._current_iter is None or self._rows_consumed >= self._switch_after:
self._load()
if self._current_iter is not None:
for _ in range(64):
try:
rec = next(self._current_iter)
text = self._extract(rec)
if text:
self._queue.append(text[:2048])
self._rows_consumed += 1
except StopIteration:
self._current_iter = None
break
else:
time.sleep(0.05)
except Exception as e:
log.warning(f"[DATASET/prefetch] {e}")
time.sleep(2)
def next_text(self) -> str:
for _ in range(200):
if self._queue:
return self._queue.popleft()
time.sleep(0.03)
words = ["the","model","trains","on","distributed","datasets","continuously"]
return " ".join(random.choices(words, k=random.randint(20, 60)))
@property
def status(self) -> Dict:
return {
"current": self._current_name,
"rows_consumed": self._rows_consumed,
"datasets_tried": self.datasets_tried,
"blacklisted": len(self._blacklist),
"queue_depth": len(self._queue),
"recent_log": list(self._log)[-10:],
}
# =============================================================================
# [ S4 DYNAMIC BRAIN ]
# =============================================================================
class DynamicBrain:
"""
Self-expanding MLP with no hard parameter ceiling.
- AdaGrad per-parameter adaptive learning rates
- Xavier initialisation on all new layers
- RAM-aware expansion gate (critical for HF Spaces)
- Thread-safe via RLock
- Binary serialisation for HF Hub checkpoints
"""
def __init__(self):
self.input_dim = INPUT_DIM
self.output_dim = OUTPUT_DIM
self.lr = LEARNING_RATE
self.lock = threading.RLock()
self.layers: List[Dict] = []
self._build_seed()
self.param_count = self._count()
self.global_iq = 0.0
self.total_tokens = 0
self.expansion_events = 0
self.train_steps = 0
self.last_expansion_time = 0.0
self.last_checkpoint_time= time.time()
self.expansion_paused = False
self.last_expansion_width= 0
self.loss_history: deque = deque(maxlen=500)
self.grad_acc: List[Dict]= self._make_adagrad()
self.hf = HFModelManager()
self._try_load_hf()
log.info(f"[BRAIN] Ready: {self.param_count:,} params / {len(self.layers)} layers")
# ── Architecture ─────────────────────────────────────────────────────────
def _xavier(self, fan_in: int, fan_out: int) -> Dict:
lim = math.sqrt(6.0 / (fan_in + fan_out))
return {
"W": np.random.uniform(-lim, lim, (fan_in, fan_out)).astype(np.float32),
"b": np.zeros((1, fan_out), dtype=np.float32),
}
def _build_seed(self):
self.layers = [
self._xavier(self.input_dim, 1024),
self._xavier(1024, 1024),
self._xavier(1024, 512),
self._xavier(512, self.output_dim),
]
def _count(self) -> int:
return sum(l["W"].size + l["b"].size for l in self.layers)
def _make_adagrad(self) -> List[Dict]:
return [
{"W": np.ones_like(l["W"]) * 1e-8,
"b": np.ones_like(l["b"]) * 1e-8}
for l in self.layers
]
# ── Activations ──────────────────────────────────────────────────────────
@staticmethod
def _lrelu(z, a=0.01): return np.where(z > 0, z, a * z)
@staticmethod
def _lrelu_d(z, a=0.01): return np.where(z > 0, 1.0, a)
# ── Neurogenesis ──────────────────────────────────────────────────────────
def maybe_expand(self, loss: float) -> bool:
"""
Conditionally insert a new hidden layer.
Returns True if expansion occurred.
"""
if loss <= LOSS_THRESHOLD:
return False
now = time.time()
if now - self.last_expansion_time < EXPANSION_COOLDOWN:
return False
ram = ram_usage_fraction()
if ram >= RAM_EXPAND_CEILING:
self.expansion_paused = True
if ram >= RAM_HARD_CEILING:
gc.collect()
return False
self.expansion_paused = False
self.last_expansion_time = now
with self.lock:
self.expansion_events += 1
new_width = 256 + self.expansion_events * 128
self.last_expansion_width = new_width
prev_width = self.layers[-2]["W"].shape[1]
new_layer = self._xavier(prev_width, new_width)
self.layers[-1] = self._xavier(new_width, self.output_dim)
self.layers.insert(-1, new_layer)
self.param_count = self._count()
self.grad_acc = self._make_adagrad()
log.info(
f"[NEUROGENESIS #{self.expansion_events}] "
f"width={new_width} params={self.param_count:,} ram={ram*100:.1f}%"
)
return True
# ── Forward / Backprop ────────────────────────────────────────────────────
def _forward(self, x: np.ndarray):
pre, post = [], [x]
for i, l in enumerate(self.layers):
z = post[-1] @ l["W"] + l["b"]
pre.append(z)
post.append(self._lrelu(z) if i < len(self.layers) - 1 else z)
return pre, post
def _backprop(self, pre, post, target: np.ndarray):
with self.lock:
delta = 2.0 * (post[-1] - target) / target.shape[0]
for i in range(len(self.layers) - 1, -1, -1):
gW = post[i].T @ delta
gb = delta.sum(axis=0, keepdims=True)
self.grad_acc[i]["W"] += gW ** 2
self.grad_acc[i]["b"] += gb ** 2
self.layers[i]["W"] -= self.lr * gW / (np.sqrt(self.grad_acc[i]["W"]) + 1e-8)
self.layers[i]["b"] -= self.lr * gb / (np.sqrt(self.grad_acc[i]["b"]) + 1e-8)
if i > 0:
delta = (delta @ self.layers[i]["W"].T) * self._lrelu_d(pre[i - 1])
# ── Text vectorisation ────────────────────────────────────────────────────
def _text_to_batch(self, text: str, batch_size: int) -> Tuple[np.ndarray, np.ndarray]:
enc = np.array([ord(c) / 255.0 for c in text], dtype=np.float32)
L, W = len(enc), self.input_dim
if L < W + 1:
enc = np.pad(enc, (0, W + 1 - L))
L = len(enc)
n = min(batch_size, L - W - 1)
idx = np.random.randint(0, max(1, L - W - 1), size=max(1, n))
X = np.stack([enc[i: i + W] for i in idx])
Y = np.stack([enc[i + 1: i + 1 + W] for i in idx])
return X, Y
# ── Public train step ─────────────────────────────────────────────────────
def train_on_text(self, text: str, batch_size: int = 32) -> Tuple[float, bool]:
"""Returns (loss, expanded)."""
X, Y = self._text_to_batch(text, batch_size)
pre, post = self._forward(X)
loss = float(np.mean((post[-1] - Y) ** 2))
self._backprop(pre, post, Y)
self.train_steps += 1
self.total_tokens += len(text.split())
self.loss_history.append(loss)
avg = float(np.mean(list(self.loss_history)[-50:])) if len(self.loss_history) >= 50 else loss
self.global_iq = max(0.0, self.global_iq + 0.004 / (avg + 1e-4))
expanded = self.maybe_expand(loss)
if time.time() - self.last_checkpoint_time > CHECKPOINT_EVERY:
threading.Thread(target=self._checkpoint, daemon=True).start()
self.last_checkpoint_time = time.time()
return loss, expanded
# ── Federated gradient application ───────────────────────────────────────
def apply_federated_update(self, grad_pack: List[Dict]):
"""
Apply weight deltas arriving from a browser client's TF.js worker.
Each element: {"W": flat_array, "b": flat_array,
"W_shape": [rows, cols], "b_shape": [1, cols]}
Layers that do not match current architecture are skipped gracefully.
"""
with self.lock:
n = min(len(grad_pack), len(self.layers))
for i in range(n):
try:
gW = np.array(grad_pack[i]["W"], dtype=np.float32)
gb = np.array(grad_pack[i]["b"], dtype=np.float32)
Ws = grad_pack[i].get("W_shape")
bs = grad_pack[i].get("b_shape")
if Ws:
gW = gW.reshape(Ws)
if bs:
gb = gb.reshape(bs)
if gW.shape != self.layers[i]["W"].shape:
continue
if gb.shape != self.layers[i]["b"].shape:
gb = gb.reshape(self.layers[i]["b"].shape)
self.grad_acc[i]["W"] += gW ** 2
self.grad_acc[i]["b"] += gb ** 2
self.layers[i]["W"] -= self.lr * gW / (np.sqrt(self.grad_acc[i]["W"]) + 1e-8)
self.layers[i]["b"] -= self.lr * gb / (np.sqrt(self.grad_acc[i]["b"]) + 1e-8)
except Exception as e:
log.debug(f"[FED] Layer {i} skip: {e}")
# ── Serialisation ─────────────────────────────────────────────────────────
def snapshot(self) -> bytes:
header = json.dumps({
"n": len(self.layers),
"shapes": [[list(l["W"].shape), list(l["b"].shape)] for l in self.layers],
"params": self.param_count,
"exp": self.expansion_events,
"steps": self.train_steps,
"iq": self.global_iq,
"tokens": self.total_tokens,
}).encode()
parts = [struct.pack(">I", len(header)), header]
with self.lock:
for l in self.layers:
parts += [l["W"].tobytes(), l["b"].tobytes()]
return b"".join(parts)
def restore(self, data: bytes):
hlen = struct.unpack(">I", data[:4])[0]
header = json.loads(data[4: 4 + hlen])
offset = 4 + hlen
with self.lock:
new_layers = []
for (Ws, bs) in header["shapes"]:
Wn = int(np.prod(Ws))
bn = int(np.prod(bs))
W = np.frombuffer(data[offset: offset + Wn * 4], dtype=np.float32).reshape(Ws).copy()
offset += Wn * 4
b = np.frombuffer(data[offset: offset + bn * 4], dtype=np.float32).reshape(bs).copy()
offset += bn * 4
new_layers.append({"W": W, "b": b})
self.layers = new_layers
self.param_count = self._count()
self.expansion_events = header.get("exp", 0)
self.train_steps = header.get("steps", 0)
self.global_iq = header.get("iq", 0.0)
self.total_tokens = header.get("tokens", 0)
self.grad_acc = self._make_adagrad()
log.info(f"[BRAIN] Restored: {self.param_count:,} params / {self.expansion_events} expansions")
def _checkpoint(self):
try:
self.hf.push(self.snapshot(), self.status)
except Exception as e:
log.warning(f"[BRAIN/ckpt] {e}")
def _try_load_hf(self):
try:
data = self.hf.pull()
if data:
self.restore(data)
except Exception as e:
log.warning(f"[BRAIN/load] {e}")
@property
def avg_loss(self) -> float:
return float(np.mean(list(self.loss_history)[-50:])) if len(self.loss_history) >= 50 else 1.0
@property
def status(self) -> Dict:
return {
"param_count": self.param_count,
"expansion_events": self.expansion_events,
"train_steps": self.train_steps,
"total_tokens": self.total_tokens,
"global_iq": round(self.global_iq, 6),
"avg_loss": round(self.avg_loss, 6),
"layer_count": len(self.layers),
"layer_shapes": [list(l["W"].shape) for l in self.layers],
"expansion_paused": self.expansion_paused,
"ram_pct": round(ram_usage_fraction() * 100, 1),
}
# =============================================================================
# [ S5 HF MODEL MANAGER ]
# =============================================================================
class HFModelManager:
BIN = "omega_checkpoint.bin"
META = "omega_meta.json"
def __init__(self):
self.api = HfApi() if HAS_HF_HUB and HF_TOKEN else None
self.repo_id = HF_REPO_ID
self.token = HF_TOKEN
self._pushes = 0
def _ensure_repo(self):
if not self.api:
return
try:
self.api.create_repo(
self.repo_id, repo_type="model",
exist_ok=True, token=self.token
)
except Exception:
pass
def push(self, data: bytes, meta: Optional[Dict] = None):
if not self.api:
log.debug("[HF] Push skipped - set HF_TOKEN and HF_REPO_ID.")
return
self._ensure_repo()
import io
try:
self.api.upload_file(
path_or_fileobj = io.BytesIO(data),
path_in_repo = self.BIN,
repo_id = self.repo_id,
repo_type = "model",
token = self.token,
)
if meta:
self.api.upload_file(
path_or_fileobj = io.BytesIO(json.dumps(meta, indent=2).encode()),
path_in_repo = self.META,
repo_id = self.repo_id,
repo_type = "model",
token = self.token,
)
self._pushes += 1
log.info(f"[HF] Pushed checkpoint #{self._pushes} to {self.repo_id}")
except Exception as e:
log.warning(f"[HF] Push failed: {e}")
def pull(self) -> Optional[bytes]:
if not self.api:
return None
try:
path = hf_hub_download(
repo_id = self.repo_id,
filename = self.BIN,
repo_type = "model",
token = self.token,
)
with open(path, "rb") as fh:
data = fh.read()
log.info(f"[HF] Pulled {len(data):,} bytes from {self.repo_id}")
return data
except Exception as e:
log.info(f"[HF] No checkpoint found ({e})")
return None
# =============================================================================
# [ S6 CLIENT REGISTRY ]
# =============================================================================
@dataclass
class ClientSession:
session_id: str
hw: HardwareProfile
connected_at: float = field(default_factory=time.time)
messages: int = 0
fed_updates: int = 0
ws: Any = None
class ClientRegistry:
def __init__(self):
self._clients: Dict[str, ClientSession] = {}
self._lock = threading.Lock()
def register(self, sid: str, hw: HardwareProfile, ws=None) -> ClientSession:
s = ClientSession(session_id=sid, hw=hw, ws=ws)
with self._lock:
self._clients[sid] = s
log.info(f"[REG] +{sid[:8]} score={hw.compute_score:.1f}")
return s
def unregister(self, sid: str):
with self._lock:
self._clients.pop(sid, None)
def get(self, sid: str) -> Optional[ClientSession]:
return self._clients.get(sid)
@property
def count(self) -> int:
return len(self._clients)
@property
def total_compute(self) -> float:
return sum(s.hw.compute_score for s in self._clients.values())
@property
def total_fed_updates(self) -> int:
return sum(s.fed_updates for s in self._clients.values())
# =============================================================================
# [ S7 TRAINING COORDINATOR ]
# =============================================================================
class TrainingCoordinator:
def __init__(self, brain: DynamicBrain, ds: DatasetManager, hw: HardwareProfile):
self.brain = brain
self.ds = ds
self.hw = hw
self._stop = threading.Event()
self._workers: List[threading.Thread] = []
self._steps = 0
self._lock = threading.Lock()
self._start(hw.allocated_threads)
def _start(self, n: int):
for i in range(n):
t = threading.Thread(target=self._loop, args=(i,), daemon=True)
t.start()
self._workers.append(t)
log.info(f"[COORD] {n} workers (batch={self.hw.allocated_batch})")
def _loop(self, wid: int):
time.sleep(wid * 0.08)
batch = self.hw.allocated_batch
while not self._stop.is_set():
try:
text = self.ds.next_text()
self.brain.train_on_text(text, batch_size=batch)
with self._lock:
self._steps += 1
time.sleep(0.001)
except Exception as e:
log.warning(f"[WORKER/{wid}] {e}")
time.sleep(1)
def scale_up(self, hw: HardwareProfile):
current = len(self._workers)
target = hw.allocated_threads
if target > current:
self._start(target - current)
def stop(self):
self._stop.set()
@property
def steps_per_sec(self) -> float:
return self._steps / max(1, time.time() - _START_TIME)
# =============================================================================
# [ S8 PYDANTIC MODELS ]
# =============================================================================
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
class HardwareReport(BaseModel):
session_id: str
cpu_cores: int = 1
ram_gb: float = 4.0
gpu_name: str = "None"
gpu_vram_gb: float = 0.0
compute_score: float = 10.0
class GradientPacket(BaseModel):
session_id: str
weight: float = 1.0
gradients: List[Dict] # [{W, b, W_shape, b_shape}, ...]
# =============================================================================
# [ S9 GLOBAL SINGLETONS ]
# =============================================================================
SERVER_HW = HardwareScanner.scan("server")
BRAIN = DynamicBrain()
DS_MGR = DatasetManager()
COORD = TrainingCoordinator(BRAIN, DS_MGR, SERVER_HW)
REGISTRY = ClientRegistry()
RESPGEN = ResponseGenerator()
REGISTRY.register("server", SERVER_HW)
# =============================================================================
# [ S10 FASTAPI ROUTES ]
# =============================================================================
app = FastAPI(title="ScaleUp-OmegaB", version="4.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
)
@app.post("/api/chat")
async def api_chat(req: ChatRequest, bg: BackgroundTasks):
sid = req.session_id or str(uuid.uuid4())
session = REGISTRY.get(sid)
if session:
session.messages += 1
old_params = BRAIN.param_count
loss, expanded = BRAIN.train_on_text(req.message, batch_size=16)
if expanded:
bg.add_task(BRAIN.hf.push, BRAIN.snapshot(), BRAIN.status)
reply = RESPGEN.generate(
loss = loss,
params = BRAIN.param_count,
layers = len(BRAIN.layers),
expansions = BRAIN.expansion_events,
expanded = expanded,
new_width = BRAIN.last_expansion_width,
dataset = DS_MGR._current_name,
datasets_tried = DS_MGR.datasets_tried,
clients = REGISTRY.count,
total_score = REGISTRY.total_compute,
fed_updates = REGISTRY.total_fed_updates,
train_steps = BRAIN.train_steps,
batch = 16,
iq = BRAIN.global_iq,
sps = COORD.steps_per_sec,
workers = COORD.hw.allocated_threads,
ram_pct = ram_usage_fraction() * 100,
expansion_paused = BRAIN.expansion_paused,
)
return JSONResponse({
"reply": reply,
"session_id": sid,
"expanded": expanded,
"loss": round(loss, 6),
"iq": round(BRAIN.global_iq, 6),
"total_params": BRAIN.param_count,
"expansions": BRAIN.expansion_events,
"layer_count": len(BRAIN.layers),
"active_clients": REGISTRY.count,
"ram_pct": round(ram_usage_fraction() * 100, 1),
})
@app.get("/api/status")
async def api_status():
return JSONResponse({
"brain": BRAIN.status,
"dataset": DS_MGR.status,
"hardware":SERVER_HW.to_dict(),
"coordinator": {
"workers": COORD.hw.allocated_threads,
"total_steps":COORD._steps,
"steps_sec": round(COORD.steps_per_sec, 2),
},
"federation": {
"clients": REGISTRY.count,
"compute": round(REGISTRY.total_compute, 2),
"fed_updates": REGISTRY.total_fed_updates,
},
"uptime": round(time.time() - _START_TIME, 1),
"is_hf_spaces": IS_HF_SPACES,
})
@app.post("/api/hardware_report")
async def api_hw(report: HardwareReport):
hw = HardwareProfile(
client_id = report.session_id,
cpu_cores_logic = report.cpu_cores,
ram_avail_gb = report.ram_gb,
gpu_name = report.gpu_name,
gpu_vram_gb = report.gpu_vram_gb,
compute_score = report.compute_score,
)
s = report.compute_score
hw.allocated_threads = 1 if s < 20 else 2 if s < 60 else 3 if s < 150 else 4
hw.allocated_batch = 8 if s < 20 else 16 if s < 60 else 32 if s < 150 else 64
existing = REGISTRY.get(report.session_id)
if existing:
existing.hw = hw
else:
REGISTRY.register(report.session_id, hw)
COORD.scale_up(hw)
return JSONResponse({
"ok": True,
"allocated_batch": hw.allocated_batch,
"allocated_threads": hw.allocated_threads,
})
@app.post("/api/gradients")
async def api_gradients(pkt: GradientPacket):
session = REGISTRY.get(pkt.session_id)
if not session:
hw = HardwareProfile(client_id=pkt.session_id, compute_score=10.0)
session = REGISTRY.register(pkt.session_id, hw)
BRAIN.apply_federated_update(pkt.gradients)
session.fed_updates += 1
return JSONResponse({"ok": True, "fed_updates": session.fed_updates})
@app.get("/api/architecture")
async def api_arch():
with BRAIN.lock:
layers = [
{"i": i, "fan_in": l["W"].shape[0], "fan_out": l["W"].shape[1],
"params": l["W"].size + l["b"].size}
for i, l in enumerate(BRAIN.layers)
]
return JSONResponse({
"layers": layers,
"total": BRAIN.param_count,
"expansions": BRAIN.expansion_events,
})
@app.get("/api/weight_subset")
async def api_weight_subset(n_layers: int = 3):
"""
Return the first n_layers as JSON so the browser worker can initialise
its local TF.js model with server-synced starting weights.
"""
n = min(n_layers, len(BRAIN.layers))
with BRAIN.lock:
subset = [
{
"W": BRAIN.layers[i]["W"].tolist(),
"b": BRAIN.layers[i]["b"].tolist(),
"W_shape": list(BRAIN.layers[i]["W"].shape),
"b_shape": list(BRAIN.layers[i]["b"].shape),
}
for i in range(n)
]
return JSONResponse({
"layers": subset,
"total_server_layers": len(BRAIN.layers),
})
# ── WebSocket ──────────────────────────────────────────────────────────────────
@app.websocket("/ws/stats")
async def ws_stats(ws: WebSocket):
await ws.accept()
sid = str(uuid.uuid4())
hw = HardwareProfile(client_id=sid, compute_score=5.0)
REGISTRY.register(sid, hw, ws=ws)
async def push():
while True:
try:
await ws.send_text(json.dumps({
"type": "stats",
"params": BRAIN.param_count,
"iq": round(BRAIN.global_iq, 6),
"loss": round(BRAIN.avg_loss, 6),
"expansions": BRAIN.expansion_events,
"train_steps": BRAIN.train_steps,
"layers": len(BRAIN.layers),
"tokens": BRAIN.total_tokens,
"dataset": DS_MGR._current_name,
"clients": REGISTRY.count,
"steps_sec": round(COORD.steps_per_sec, 2),
"workers": COORD.hw.allocated_threads,
"ram_pct": round(ram_usage_fraction() * 100, 1),
"exp_paused": BRAIN.expansion_paused,
"fed_updates": REGISTRY.total_fed_updates,
"is_hf": IS_HF_SPACES,
}))
except Exception:
return
await asyncio.sleep(1.0)
async def recv():
async for raw in ws.iter_text():
try:
msg = json.loads(raw)
if msg.get("type") == "hw":
hw.compute_score = float(msg.get("score", 5))
hw.cpu_cores_logic = int(msg.get("cores", 1))
hw.ram_avail_gb = float(msg.get("ram", 0))
hw.gpu_name = msg.get("gpu", "None")
except Exception:
pass
try:
await asyncio.gather(push(), recv())
except (WebSocketDisconnect, Exception):
pass
finally:
REGISTRY.unregister(sid)
# =============================================================================
# [ S11 FRONTEND HTML + EMBEDDED TF.JS WEBWORKER ]
# =============================================================================
#
# The TF.js worker is stored in a