#!/usr/bin/env python3 # ============================================================================= # PROJECT : ScaleUp-OmegaB | EVOLUTION v4 — TRUE FEDERATED EXPANSION # AUTHOR : imsuprtwo2 # ENGINE : Federated Dynamic Synaptic Expansion (F-DSE) # BACKEND : NumPy / AdaGrad · no hard param ceiling # BROWSER : TensorFlow.js WebWorker — real gradient computation on user PC # STORAGE : HuggingFace Hub (imsuprtwo2/ScaleUp-OmegaB) # DATASETS : Random-cycling public HuggingFace datasets, any format # SPACES : RAM-aware expansion gate — safe on free HF Spaces (16 GB) # ============================================================================= import os, sys, json, time, uuid, math, random, logging, threading, traceback import asyncio, struct, gc, platform from dataclasses import dataclass, field, asdict from typing import Any, Dict, List, Optional, Tuple from collections import deque import numpy as np from fastapi import FastAPI, WebSocket, WebSocketDisconnect, BackgroundTasks from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel # ── Optional heavy deps ────────────────────────────────────────────────────── try: import psutil HAS_PSUTIL = True except ImportError: HAS_PSUTIL = False try: import torch HAS_TORCH = True TORCH_DEVICE = "cuda" if torch.cuda.is_available() else "cpu" except ImportError: HAS_TORCH = False TORCH_DEVICE = "cpu" try: from huggingface_hub import HfApi, hf_hub_download HAS_HF_HUB = True except ImportError: HAS_HF_HUB = False try: from datasets import load_dataset, get_dataset_config_names HAS_DATASETS = True except ImportError: HAS_DATASETS = False # ============================================================================= # [ S0 GLOBAL CONFIG ] # ============================================================================= logging.basicConfig( level = logging.INFO, format = "%(asctime)s [%(levelname)s] %(name)s - %(message)s", datefmt = "%H:%M:%S", ) log = logging.getLogger("OMEGA") # Detect HuggingFace Spaces environment IS_HF_SPACES = bool(os.environ.get("SPACE_ID") or os.environ.get("SPACES_ZERO_GPU")) HF_TOKEN = os.environ.get("HF_TOKEN", "") HF_REPO_ID = os.environ.get("HF_REPO_ID", "imsuprtwo2/ScaleUp-OmegaB") CHECKPOINT_EVERY = int(os.environ.get("CHECKPOINT_EVERY", "300")) EXPANSION_COOLDOWN = float(os.environ.get("EXPANSION_COOLDOWN", "5")) LOSS_THRESHOLD = float(os.environ.get("LOSS_THRESHOLD", "0.065")) LEARNING_RATE = float(os.environ.get("LEARNING_RATE", "0.0003")) PORT = int(os.environ.get("PORT", "7860")) INPUT_DIM = 128 OUTPUT_DIM = 128 # RAM safety thresholds — critical for HF Spaces free tier (16 GB) RAM_EXPAND_CEILING = 0.78 # pause expansion above this fraction RAM_HARD_CEILING = 0.90 # force gc.collect above this _START_TIME = time.time() # Seed datasets that reliably load as fallback SEED_DATASETS = [ ("wikipedia", "20220301.simple"), ("bookcorpus", None), ("squad", None), ("glue", "sst2"), ("ag_news", None), ("multi_news", None), ("cnn_dailymail", "3.0.0"), ("scientific_papers", "arxiv"), ("cc_news", None), ("openwebtext", None), ] TEXT_FIELDS = [ "text","content","body","article","document","passage", "sentence","review","description","summary","context", "question","answer","title","abstract","story","input","output", ] # ============================================================================= # [ S1 HARDWARE SCANNER ] # ============================================================================= @dataclass class HardwareProfile: client_id: str = "" cpu_cores_phys: int = 1 cpu_cores_logic: int = 1 cpu_freq_mhz: float = 0.0 ram_total_gb: float = 0.0 ram_avail_gb: float = 0.0 gpu_name: str = "None" gpu_vram_gb: float = 0.0 cuda_available: bool = False platform_str: str = "" compute_score: float = 0.0 allocated_threads: int = 1 allocated_batch: int = 8 def to_dict(self) -> Dict: return asdict(self) class HardwareScanner: @staticmethod def scan(client_id: str = "server") -> HardwareProfile: p = HardwareProfile(client_id=client_id, platform_str=platform.system()) if HAS_PSUTIL: p.cpu_cores_phys = psutil.cpu_count(logical=False) or 1 p.cpu_cores_logic = psutil.cpu_count(logical=True) or 1 freq = psutil.cpu_freq() p.cpu_freq_mhz = round(freq.current, 1) if freq else 0.0 vm = psutil.virtual_memory() p.ram_total_gb = round(vm.total / 1024**3, 2) p.ram_avail_gb = round(vm.available/ 1024**3, 2) else: p.cpu_cores_logic = os.cpu_count() or 2 p.cpu_cores_phys = p.cpu_cores_logic if HAS_TORCH: p.cuda_available = torch.cuda.is_available() if p.cuda_available: p.gpu_name = torch.cuda.get_device_name(0) props = torch.cuda.get_device_properties(0) p.gpu_vram_gb = round(props.total_memory / 1024**3, 2) cpu_pts = p.cpu_cores_logic * max(p.cpu_freq_mhz, 1000) / 1000 ram_pts = p.ram_avail_gb * 4 vram_pts = p.gpu_vram_gb * 80 p.compute_score = round(cpu_pts + ram_pts + vram_pts, 2) s = p.compute_score if s < 20: p.allocated_threads, p.allocated_batch = 1, 8 elif s < 60: p.allocated_threads, p.allocated_batch = 2, 16 elif s < 150: p.allocated_threads, p.allocated_batch = 4, 32 elif s < 400: p.allocated_threads, p.allocated_batch = 6, 64 else: p.allocated_threads, p.allocated_batch = min(p.cpu_cores_phys, 12), 128 log.info( f"[HW] {client_id[:12]} cores={p.cpu_cores_logic} " f"ram={p.ram_avail_gb}GB gpu={p.gpu_name} " f"score={p.compute_score} T={p.allocated_threads} B={p.allocated_batch}" ) return p def ram_usage_fraction() -> float: """Returns current RAM usage as a 0.0-1.0 fraction.""" if HAS_PSUTIL: return psutil.virtual_memory().percent / 100.0 return 0.4 # ============================================================================= # [ S2 RESPONSE GENERATOR ] # ============================================================================= class ResponseGenerator: """ Generates technically-accurate, varied chat messages drawn from real training metrics. Each call produces a different style of message so the model never sounds like a broken record. """ _LOSS_LABELS = [ (0.010, "near-zero - pattern fully saturated"), (0.030, "very low - high predictive confidence"), (0.065, "moderate - healthy gradient flow"), (0.120, "elevated - expansion pressure active"), (0.250, "high - novel input distribution"), (999.0, "very high - unexplored representation space"), ] def __init__(self): self._prev_loss = 1.0 self._prev_params = 0 self._cycle = 0 def _loss_label(self, loss: float) -> str: for threshold, label in self._LOSS_LABELS: if loss <= threshold: return label return "indeterminate" def generate(self, *, loss: float, params: int, layers: int, expansions: int, expanded: bool, new_width: int, dataset: str, datasets_tried: int, clients: int, total_score: float, fed_updates: int, train_steps: int, batch: int, iq: float, sps: float, workers: int, ram_pct: float, expansion_paused: bool) -> str: ds_short = dataset.split("/")[-1][:22] if dataset != "none" else "loading..." prev_loss = self._prev_loss prev_par = self._prev_params self._prev_loss = loss self._prev_params = params # --- Expansion just happened: always lead with that --- if expanded: self._cycle += 1 return ( f"Neurogenesis #{expansions}: injected a {new_width}-neuron dense layer. " f"Parameter count {prev_par:,} -> {params:,} (+{params - prev_par:,}). " f"Triggered by loss={loss:.5f} on dataset [{ds_short}]. " f"Architecture is now {layers} layers deep." ) # --- RAM warning --- if expansion_paused and ram_pct > RAM_EXPAND_CEILING * 100: return ( f"Expansion paused: RAM at {ram_pct:.1f}% (HF Spaces safety gate at " f"{RAM_EXPAND_CEILING*100:.0f}%). Holding at {params:,} params / {layers} layers. " f"Training continues at full speed. Loss={loss:.5f}, IQ={iq:.4f}. " f"Expansion will resume automatically when memory clears." ) # --- Rotate through informative templates --- self._cycle = (self._cycle + 1) % 6 c = self._cycle if c == 0: pct_change = abs(prev_loss - loss) / (prev_loss + 1e-9) * 100 direction = "improved" if loss < prev_loss else "increased" return ( f"Loss {direction} {prev_loss:.5f} -> {loss:.5f} ({pct_change:.1f}%) " f"on dataset [{ds_short}]. " f"Architecture: {layers} layers, {params:,} parameters. " f"IQ index: {iq:.4f}. Server training: ~{sps:.1f} steps/sec." ) elif c == 1: return ( f"Trained a {batch}-sample mini-batch against [{ds_short}] " f"(dataset #{datasets_tried} in rotation). " f"Loss={loss:.5f} ({self._loss_label(loss)}). " f"{clients} distributed client(s) contributing {total_score:.0f} compute-units. " f"{fed_updates} federated gradient packets absorbed this session." ) elif c == 2: return ( f"Your input was embedded across all {layers} synaptic layers " f"({params:,} parameters, {expansions} neurogenesis events so far). " f"Prediction error: {loss:.5f}. AdaGrad accumulators updated. " f"Continuous training on [{ds_short}] running in background." ) elif c == 3: return ( f"Server: {workers} worker thread(s), {sps:.1f} steps/sec, " f"{train_steps:,} total gradient steps. " f"Browser workers: {clients - 1} client(s) running TF.js locally. " f"Fed updates: {fed_updates}. Current loss: {loss:.5f}." ) elif c == 4: return ( f"Dataset [{ds_short}] ({datasets_tried} rotated total). " f"Loss={loss:.5f} | Params={params:,} | Layers={layers} | IQ={iq:.4f}. " f"RAM usage: {ram_pct:.1f}%. " f"No expansion needed at this loss level." ) else: return ( f"Gradient descent step complete. Loss: {loss:.5f} " f"({self._loss_label(loss)}). " f"Model has {params:,} active parameters in {layers} layers. " f"Global IQ index: {iq:.4f}. " f"Weights synced to HuggingFace Hub: imsuprtwo2/ScaleUp-OmegaB." ) # ============================================================================= # [ S3 DATASET MANAGER ] # ============================================================================= class DatasetManager: def __init__(self): self._lock = threading.Lock() self._blacklist: set = set() self._catalogue: List[str] = [] self._last_refresh = 0.0 self._refresh_interval= 3600.0 self._current_iter = None self._current_name = "none" self._rows_consumed = 0 self.datasets_tried = 0 self._switch_after = 2000 self._log: deque = deque(maxlen=40) self._queue: deque = deque(maxlen=512) t = threading.Thread(target=self._prefetch_loop, daemon=True) t.start() def _refresh_catalogue(self): now = time.time() if now - self._last_refresh < self._refresh_interval: return self._last_refresh = now if not HAS_HF_HUB: return try: api = HfApi() info = api.list_datasets(limit=5000, full=False) ids = [d.id for d in info if d.id] random.shuffle(ids) with self._lock: self._catalogue = ids log.info(f"[DATASET] Catalogue: {len(ids)} datasets.") except Exception as e: log.warning(f"[DATASET] Catalogue refresh failed: {e}") def _pick(self) -> Tuple[str, Optional[str]]: self._refresh_catalogue() with self._lock: cands = [d for d in self._catalogue if d not in self._blacklist] if cands: return random.choice(cands), None live = [s for s in SEED_DATASETS if s[0] not in self._blacklist] if live: return random.choice(live) with self._lock: self._blacklist.clear() return random.choice(SEED_DATASETS) def _extract(self, record: Dict) -> Optional[str]: for f in TEXT_FIELDS: if f in record: v = record[f] if isinstance(v, str) and len(v.strip()) > 15: return v.strip() if isinstance(v, list): j = " ".join(str(x) for x in v if isinstance(x, str)) if len(j.strip()) > 15: return j.strip() parts = [v for v in record.values() if isinstance(v, str) and len(v) > 8] result = " ".join(parts) return result if len(result) > 15 else None def _load(self): if not HAS_DATASETS: return for _ in range(10): ds_id, config = self._pick() self.datasets_tried += 1 label = ds_id + (f"/{config}" if config else "") log.info(f"[DATASET] Loading [{self.datasets_tried}] {label}") self._log.append(f"[{time.strftime('%H:%M:%S')}] Loading {label}") try: kw: Dict[str, Any] = dict( path = ds_id, split = "train", streaming = True, trust_remote_code = True, ) if config: kw["name"] = config else: try: cfgs = get_dataset_config_names(ds_id, trust_remote_code=True) if cfgs: kw["name"] = cfgs[0] except Exception: pass ds = load_dataset(**kw) sample = next(iter(ds)) text = self._extract(sample) if text is None: raise ValueError(f"No text field found in {ds_id}") self._current_iter = iter(ds) self._current_name = label self._rows_consumed = 0 log.info(f"[DATASET] Active: {label}") self._log.append(f"[{time.strftime('%H:%M:%S')}] Active: {label}") return except Exception as e: log.warning(f"[DATASET] Failed {ds_id}: {e}") with self._lock: self._blacklist.add(ds_id) def _prefetch_loop(self): while True: try: if len(self._queue) < 128: if self._current_iter is None or self._rows_consumed >= self._switch_after: self._load() if self._current_iter is not None: for _ in range(64): try: rec = next(self._current_iter) text = self._extract(rec) if text: self._queue.append(text[:2048]) self._rows_consumed += 1 except StopIteration: self._current_iter = None break else: time.sleep(0.05) except Exception as e: log.warning(f"[DATASET/prefetch] {e}") time.sleep(2) def next_text(self) -> str: for _ in range(200): if self._queue: return self._queue.popleft() time.sleep(0.03) words = ["the","model","trains","on","distributed","datasets","continuously"] return " ".join(random.choices(words, k=random.randint(20, 60))) @property def status(self) -> Dict: return { "current": self._current_name, "rows_consumed": self._rows_consumed, "datasets_tried": self.datasets_tried, "blacklisted": len(self._blacklist), "queue_depth": len(self._queue), "recent_log": list(self._log)[-10:], } # ============================================================================= # [ S4 DYNAMIC BRAIN ] # ============================================================================= class DynamicBrain: """ Self-expanding MLP with no hard parameter ceiling. - AdaGrad per-parameter adaptive learning rates - Xavier initialisation on all new layers - RAM-aware expansion gate (critical for HF Spaces) - Thread-safe via RLock - Binary serialisation for HF Hub checkpoints """ def __init__(self): self.input_dim = INPUT_DIM self.output_dim = OUTPUT_DIM self.lr = LEARNING_RATE self.lock = threading.RLock() self.layers: List[Dict] = [] self._build_seed() self.param_count = self._count() self.global_iq = 0.0 self.total_tokens = 0 self.expansion_events = 0 self.train_steps = 0 self.last_expansion_time = 0.0 self.last_checkpoint_time= time.time() self.expansion_paused = False self.last_expansion_width= 0 self.loss_history: deque = deque(maxlen=500) self.grad_acc: List[Dict]= self._make_adagrad() self.hf = HFModelManager() self._try_load_hf() log.info(f"[BRAIN] Ready: {self.param_count:,} params / {len(self.layers)} layers") # ── Architecture ───────────────────────────────────────────────────────── def _xavier(self, fan_in: int, fan_out: int) -> Dict: lim = math.sqrt(6.0 / (fan_in + fan_out)) return { "W": np.random.uniform(-lim, lim, (fan_in, fan_out)).astype(np.float32), "b": np.zeros((1, fan_out), dtype=np.float32), } def _build_seed(self): self.layers = [ self._xavier(self.input_dim, 1024), self._xavier(1024, 1024), self._xavier(1024, 512), self._xavier(512, self.output_dim), ] def _count(self) -> int: return sum(l["W"].size + l["b"].size for l in self.layers) def _make_adagrad(self) -> List[Dict]: return [ {"W": np.ones_like(l["W"]) * 1e-8, "b": np.ones_like(l["b"]) * 1e-8} for l in self.layers ] # ── Activations ────────────────────────────────────────────────────────── @staticmethod def _lrelu(z, a=0.01): return np.where(z > 0, z, a * z) @staticmethod def _lrelu_d(z, a=0.01): return np.where(z > 0, 1.0, a) # ── Neurogenesis ────────────────────────────────────────────────────────── def maybe_expand(self, loss: float) -> bool: """ Conditionally insert a new hidden layer. Returns True if expansion occurred. """ if loss <= LOSS_THRESHOLD: return False now = time.time() if now - self.last_expansion_time < EXPANSION_COOLDOWN: return False ram = ram_usage_fraction() if ram >= RAM_EXPAND_CEILING: self.expansion_paused = True if ram >= RAM_HARD_CEILING: gc.collect() return False self.expansion_paused = False self.last_expansion_time = now with self.lock: self.expansion_events += 1 new_width = 256 + self.expansion_events * 128 self.last_expansion_width = new_width prev_width = self.layers[-2]["W"].shape[1] new_layer = self._xavier(prev_width, new_width) self.layers[-1] = self._xavier(new_width, self.output_dim) self.layers.insert(-1, new_layer) self.param_count = self._count() self.grad_acc = self._make_adagrad() log.info( f"[NEUROGENESIS #{self.expansion_events}] " f"width={new_width} params={self.param_count:,} ram={ram*100:.1f}%" ) return True # ── Forward / Backprop ──────────────────────────────────────────────────── def _forward(self, x: np.ndarray): pre, post = [], [x] for i, l in enumerate(self.layers): z = post[-1] @ l["W"] + l["b"] pre.append(z) post.append(self._lrelu(z) if i < len(self.layers) - 1 else z) return pre, post def _backprop(self, pre, post, target: np.ndarray): with self.lock: delta = 2.0 * (post[-1] - target) / target.shape[0] for i in range(len(self.layers) - 1, -1, -1): gW = post[i].T @ delta gb = delta.sum(axis=0, keepdims=True) self.grad_acc[i]["W"] += gW ** 2 self.grad_acc[i]["b"] += gb ** 2 self.layers[i]["W"] -= self.lr * gW / (np.sqrt(self.grad_acc[i]["W"]) + 1e-8) self.layers[i]["b"] -= self.lr * gb / (np.sqrt(self.grad_acc[i]["b"]) + 1e-8) if i > 0: delta = (delta @ self.layers[i]["W"].T) * self._lrelu_d(pre[i - 1]) # ── Text vectorisation ──────────────────────────────────────────────────── def _text_to_batch(self, text: str, batch_size: int) -> Tuple[np.ndarray, np.ndarray]: enc = np.array([ord(c) / 255.0 for c in text], dtype=np.float32) L, W = len(enc), self.input_dim if L < W + 1: enc = np.pad(enc, (0, W + 1 - L)) L = len(enc) n = min(batch_size, L - W - 1) idx = np.random.randint(0, max(1, L - W - 1), size=max(1, n)) X = np.stack([enc[i: i + W] for i in idx]) Y = np.stack([enc[i + 1: i + 1 + W] for i in idx]) return X, Y # ── Public train step ───────────────────────────────────────────────────── def train_on_text(self, text: str, batch_size: int = 32) -> Tuple[float, bool]: """Returns (loss, expanded).""" X, Y = self._text_to_batch(text, batch_size) pre, post = self._forward(X) loss = float(np.mean((post[-1] - Y) ** 2)) self._backprop(pre, post, Y) self.train_steps += 1 self.total_tokens += len(text.split()) self.loss_history.append(loss) avg = float(np.mean(list(self.loss_history)[-50:])) if len(self.loss_history) >= 50 else loss self.global_iq = max(0.0, self.global_iq + 0.004 / (avg + 1e-4)) expanded = self.maybe_expand(loss) if time.time() - self.last_checkpoint_time > CHECKPOINT_EVERY: threading.Thread(target=self._checkpoint, daemon=True).start() self.last_checkpoint_time = time.time() return loss, expanded # ── Federated gradient application ─────────────────────────────────────── def apply_federated_update(self, grad_pack: List[Dict]): """ Apply weight deltas arriving from a browser client's TF.js worker. Each element: {"W": flat_array, "b": flat_array, "W_shape": [rows, cols], "b_shape": [1, cols]} Layers that do not match current architecture are skipped gracefully. """ with self.lock: n = min(len(grad_pack), len(self.layers)) for i in range(n): try: gW = np.array(grad_pack[i]["W"], dtype=np.float32) gb = np.array(grad_pack[i]["b"], dtype=np.float32) Ws = grad_pack[i].get("W_shape") bs = grad_pack[i].get("b_shape") if Ws: gW = gW.reshape(Ws) if bs: gb = gb.reshape(bs) if gW.shape != self.layers[i]["W"].shape: continue if gb.shape != self.layers[i]["b"].shape: gb = gb.reshape(self.layers[i]["b"].shape) self.grad_acc[i]["W"] += gW ** 2 self.grad_acc[i]["b"] += gb ** 2 self.layers[i]["W"] -= self.lr * gW / (np.sqrt(self.grad_acc[i]["W"]) + 1e-8) self.layers[i]["b"] -= self.lr * gb / (np.sqrt(self.grad_acc[i]["b"]) + 1e-8) except Exception as e: log.debug(f"[FED] Layer {i} skip: {e}") # ── Serialisation ───────────────────────────────────────────────────────── def snapshot(self) -> bytes: header = json.dumps({ "n": len(self.layers), "shapes": [[list(l["W"].shape), list(l["b"].shape)] for l in self.layers], "params": self.param_count, "exp": self.expansion_events, "steps": self.train_steps, "iq": self.global_iq, "tokens": self.total_tokens, }).encode() parts = [struct.pack(">I", len(header)), header] with self.lock: for l in self.layers: parts += [l["W"].tobytes(), l["b"].tobytes()] return b"".join(parts) def restore(self, data: bytes): hlen = struct.unpack(">I", data[:4])[0] header = json.loads(data[4: 4 + hlen]) offset = 4 + hlen with self.lock: new_layers = [] for (Ws, bs) in header["shapes"]: Wn = int(np.prod(Ws)) bn = int(np.prod(bs)) W = np.frombuffer(data[offset: offset + Wn * 4], dtype=np.float32).reshape(Ws).copy() offset += Wn * 4 b = np.frombuffer(data[offset: offset + bn * 4], dtype=np.float32).reshape(bs).copy() offset += bn * 4 new_layers.append({"W": W, "b": b}) self.layers = new_layers self.param_count = self._count() self.expansion_events = header.get("exp", 0) self.train_steps = header.get("steps", 0) self.global_iq = header.get("iq", 0.0) self.total_tokens = header.get("tokens", 0) self.grad_acc = self._make_adagrad() log.info(f"[BRAIN] Restored: {self.param_count:,} params / {self.expansion_events} expansions") def _checkpoint(self): try: self.hf.push(self.snapshot(), self.status) except Exception as e: log.warning(f"[BRAIN/ckpt] {e}") def _try_load_hf(self): try: data = self.hf.pull() if data: self.restore(data) except Exception as e: log.warning(f"[BRAIN/load] {e}") @property def avg_loss(self) -> float: return float(np.mean(list(self.loss_history)[-50:])) if len(self.loss_history) >= 50 else 1.0 @property def status(self) -> Dict: return { "param_count": self.param_count, "expansion_events": self.expansion_events, "train_steps": self.train_steps, "total_tokens": self.total_tokens, "global_iq": round(self.global_iq, 6), "avg_loss": round(self.avg_loss, 6), "layer_count": len(self.layers), "layer_shapes": [list(l["W"].shape) for l in self.layers], "expansion_paused": self.expansion_paused, "ram_pct": round(ram_usage_fraction() * 100, 1), } # ============================================================================= # [ S5 HF MODEL MANAGER ] # ============================================================================= class HFModelManager: BIN = "omega_checkpoint.bin" META = "omega_meta.json" def __init__(self): self.api = HfApi() if HAS_HF_HUB and HF_TOKEN else None self.repo_id = HF_REPO_ID self.token = HF_TOKEN self._pushes = 0 def _ensure_repo(self): if not self.api: return try: self.api.create_repo( self.repo_id, repo_type="model", exist_ok=True, token=self.token ) except Exception: pass def push(self, data: bytes, meta: Optional[Dict] = None): if not self.api: log.debug("[HF] Push skipped - set HF_TOKEN and HF_REPO_ID.") return self._ensure_repo() import io try: self.api.upload_file( path_or_fileobj = io.BytesIO(data), path_in_repo = self.BIN, repo_id = self.repo_id, repo_type = "model", token = self.token, ) if meta: self.api.upload_file( path_or_fileobj = io.BytesIO(json.dumps(meta, indent=2).encode()), path_in_repo = self.META, repo_id = self.repo_id, repo_type = "model", token = self.token, ) self._pushes += 1 log.info(f"[HF] Pushed checkpoint #{self._pushes} to {self.repo_id}") except Exception as e: log.warning(f"[HF] Push failed: {e}") def pull(self) -> Optional[bytes]: if not self.api: return None try: path = hf_hub_download( repo_id = self.repo_id, filename = self.BIN, repo_type = "model", token = self.token, ) with open(path, "rb") as fh: data = fh.read() log.info(f"[HF] Pulled {len(data):,} bytes from {self.repo_id}") return data except Exception as e: log.info(f"[HF] No checkpoint found ({e})") return None # ============================================================================= # [ S6 CLIENT REGISTRY ] # ============================================================================= @dataclass class ClientSession: session_id: str hw: HardwareProfile connected_at: float = field(default_factory=time.time) messages: int = 0 fed_updates: int = 0 ws: Any = None class ClientRegistry: def __init__(self): self._clients: Dict[str, ClientSession] = {} self._lock = threading.Lock() def register(self, sid: str, hw: HardwareProfile, ws=None) -> ClientSession: s = ClientSession(session_id=sid, hw=hw, ws=ws) with self._lock: self._clients[sid] = s log.info(f"[REG] +{sid[:8]} score={hw.compute_score:.1f}") return s def unregister(self, sid: str): with self._lock: self._clients.pop(sid, None) def get(self, sid: str) -> Optional[ClientSession]: return self._clients.get(sid) @property def count(self) -> int: return len(self._clients) @property def total_compute(self) -> float: return sum(s.hw.compute_score for s in self._clients.values()) @property def total_fed_updates(self) -> int: return sum(s.fed_updates for s in self._clients.values()) # ============================================================================= # [ S7 TRAINING COORDINATOR ] # ============================================================================= class TrainingCoordinator: def __init__(self, brain: DynamicBrain, ds: DatasetManager, hw: HardwareProfile): self.brain = brain self.ds = ds self.hw = hw self._stop = threading.Event() self._workers: List[threading.Thread] = [] self._steps = 0 self._lock = threading.Lock() self._start(hw.allocated_threads) def _start(self, n: int): for i in range(n): t = threading.Thread(target=self._loop, args=(i,), daemon=True) t.start() self._workers.append(t) log.info(f"[COORD] {n} workers (batch={self.hw.allocated_batch})") def _loop(self, wid: int): time.sleep(wid * 0.08) batch = self.hw.allocated_batch while not self._stop.is_set(): try: text = self.ds.next_text() self.brain.train_on_text(text, batch_size=batch) with self._lock: self._steps += 1 time.sleep(0.001) except Exception as e: log.warning(f"[WORKER/{wid}] {e}") time.sleep(1) def scale_up(self, hw: HardwareProfile): current = len(self._workers) target = hw.allocated_threads if target > current: self._start(target - current) def stop(self): self._stop.set() @property def steps_per_sec(self) -> float: return self._steps / max(1, time.time() - _START_TIME) # ============================================================================= # [ S8 PYDANTIC MODELS ] # ============================================================================= class ChatRequest(BaseModel): message: str session_id: Optional[str] = None class HardwareReport(BaseModel): session_id: str cpu_cores: int = 1 ram_gb: float = 4.0 gpu_name: str = "None" gpu_vram_gb: float = 0.0 compute_score: float = 10.0 class GradientPacket(BaseModel): session_id: str weight: float = 1.0 gradients: List[Dict] # [{W, b, W_shape, b_shape}, ...] # ============================================================================= # [ S9 GLOBAL SINGLETONS ] # ============================================================================= SERVER_HW = HardwareScanner.scan("server") BRAIN = DynamicBrain() DS_MGR = DatasetManager() COORD = TrainingCoordinator(BRAIN, DS_MGR, SERVER_HW) REGISTRY = ClientRegistry() RESPGEN = ResponseGenerator() REGISTRY.register("server", SERVER_HW) # ============================================================================= # [ S10 FASTAPI ROUTES ] # ============================================================================= app = FastAPI(title="ScaleUp-OmegaB", version="4.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) @app.post("/api/chat") async def api_chat(req: ChatRequest, bg: BackgroundTasks): sid = req.session_id or str(uuid.uuid4()) session = REGISTRY.get(sid) if session: session.messages += 1 old_params = BRAIN.param_count loss, expanded = BRAIN.train_on_text(req.message, batch_size=16) if expanded: bg.add_task(BRAIN.hf.push, BRAIN.snapshot(), BRAIN.status) reply = RESPGEN.generate( loss = loss, params = BRAIN.param_count, layers = len(BRAIN.layers), expansions = BRAIN.expansion_events, expanded = expanded, new_width = BRAIN.last_expansion_width, dataset = DS_MGR._current_name, datasets_tried = DS_MGR.datasets_tried, clients = REGISTRY.count, total_score = REGISTRY.total_compute, fed_updates = REGISTRY.total_fed_updates, train_steps = BRAIN.train_steps, batch = 16, iq = BRAIN.global_iq, sps = COORD.steps_per_sec, workers = COORD.hw.allocated_threads, ram_pct = ram_usage_fraction() * 100, expansion_paused = BRAIN.expansion_paused, ) return JSONResponse({ "reply": reply, "session_id": sid, "expanded": expanded, "loss": round(loss, 6), "iq": round(BRAIN.global_iq, 6), "total_params": BRAIN.param_count, "expansions": BRAIN.expansion_events, "layer_count": len(BRAIN.layers), "active_clients": REGISTRY.count, "ram_pct": round(ram_usage_fraction() * 100, 1), }) @app.get("/api/status") async def api_status(): return JSONResponse({ "brain": BRAIN.status, "dataset": DS_MGR.status, "hardware":SERVER_HW.to_dict(), "coordinator": { "workers": COORD.hw.allocated_threads, "total_steps":COORD._steps, "steps_sec": round(COORD.steps_per_sec, 2), }, "federation": { "clients": REGISTRY.count, "compute": round(REGISTRY.total_compute, 2), "fed_updates": REGISTRY.total_fed_updates, }, "uptime": round(time.time() - _START_TIME, 1), "is_hf_spaces": IS_HF_SPACES, }) @app.post("/api/hardware_report") async def api_hw(report: HardwareReport): hw = HardwareProfile( client_id = report.session_id, cpu_cores_logic = report.cpu_cores, ram_avail_gb = report.ram_gb, gpu_name = report.gpu_name, gpu_vram_gb = report.gpu_vram_gb, compute_score = report.compute_score, ) s = report.compute_score hw.allocated_threads = 1 if s < 20 else 2 if s < 60 else 3 if s < 150 else 4 hw.allocated_batch = 8 if s < 20 else 16 if s < 60 else 32 if s < 150 else 64 existing = REGISTRY.get(report.session_id) if existing: existing.hw = hw else: REGISTRY.register(report.session_id, hw) COORD.scale_up(hw) return JSONResponse({ "ok": True, "allocated_batch": hw.allocated_batch, "allocated_threads": hw.allocated_threads, }) @app.post("/api/gradients") async def api_gradients(pkt: GradientPacket): session = REGISTRY.get(pkt.session_id) if not session: hw = HardwareProfile(client_id=pkt.session_id, compute_score=10.0) session = REGISTRY.register(pkt.session_id, hw) BRAIN.apply_federated_update(pkt.gradients) session.fed_updates += 1 return JSONResponse({"ok": True, "fed_updates": session.fed_updates}) @app.get("/api/architecture") async def api_arch(): with BRAIN.lock: layers = [ {"i": i, "fan_in": l["W"].shape[0], "fan_out": l["W"].shape[1], "params": l["W"].size + l["b"].size} for i, l in enumerate(BRAIN.layers) ] return JSONResponse({ "layers": layers, "total": BRAIN.param_count, "expansions": BRAIN.expansion_events, }) @app.get("/api/weight_subset") async def api_weight_subset(n_layers: int = 3): """ Return the first n_layers as JSON so the browser worker can initialise its local TF.js model with server-synced starting weights. """ n = min(n_layers, len(BRAIN.layers)) with BRAIN.lock: subset = [ { "W": BRAIN.layers[i]["W"].tolist(), "b": BRAIN.layers[i]["b"].tolist(), "W_shape": list(BRAIN.layers[i]["W"].shape), "b_shape": list(BRAIN.layers[i]["b"].shape), } for i in range(n) ] return JSONResponse({ "layers": subset, "total_server_layers": len(BRAIN.layers), }) # ── WebSocket ────────────────────────────────────────────────────────────────── @app.websocket("/ws/stats") async def ws_stats(ws: WebSocket): await ws.accept() sid = str(uuid.uuid4()) hw = HardwareProfile(client_id=sid, compute_score=5.0) REGISTRY.register(sid, hw, ws=ws) async def push(): while True: try: await ws.send_text(json.dumps({ "type": "stats", "params": BRAIN.param_count, "iq": round(BRAIN.global_iq, 6), "loss": round(BRAIN.avg_loss, 6), "expansions": BRAIN.expansion_events, "train_steps": BRAIN.train_steps, "layers": len(BRAIN.layers), "tokens": BRAIN.total_tokens, "dataset": DS_MGR._current_name, "clients": REGISTRY.count, "steps_sec": round(COORD.steps_per_sec, 2), "workers": COORD.hw.allocated_threads, "ram_pct": round(ram_usage_fraction() * 100, 1), "exp_paused": BRAIN.expansion_paused, "fed_updates": REGISTRY.total_fed_updates, "is_hf": IS_HF_SPACES, })) except Exception: return await asyncio.sleep(1.0) async def recv(): async for raw in ws.iter_text(): try: msg = json.loads(raw) if msg.get("type") == "hw": hw.compute_score = float(msg.get("score", 5)) hw.cpu_cores_logic = int(msg.get("cores", 1)) hw.ram_avail_gb = float(msg.get("ram", 0)) hw.gpu_name = msg.get("gpu", "None") except Exception: pass try: await asyncio.gather(push(), recv()) except (WebSocketDisconnect, Exception): pass finally: REGISTRY.unregister(sid) # ============================================================================= # [ S11 FRONTEND HTML + EMBEDDED TF.JS WEBWORKER ] # ============================================================================= # # The TF.js worker is stored in a """ @app.get("/", response_class=HTMLResponse) async def home(): return INDEX_HTML # ============================================================================= # [ S12 LIFECYCLE + ENTRY POINT ] # ============================================================================= @app.on_event("startup") async def on_startup(): log.info("=" * 72) log.info(" ScaleUp-OmegaB v4 | imsuprtwo2 | FEDERATED INFINITE EXPANSION") log.info(f" Environment : {'HuggingFace Spaces' if IS_HF_SPACES else 'Local / custom'}") log.info(f" Server HW score : {SERVER_HW.compute_score:.1f} " f"cores={SERVER_HW.cpu_cores_logic} gpu={SERVER_HW.gpu_name}") log.info(f" Brain seed : {BRAIN.param_count:,} params / {len(BRAIN.layers)} layers") log.info(f" Training workers: {COORD.hw.allocated_threads} " f"(batch={COORD.hw.allocated_batch})") log.info(f" HF persistence : " f"{'ENABLED -> ' + HF_REPO_ID if HF_TOKEN else 'DISABLED (set HF_TOKEN)'}") log.info(f" Dataset cycling : " f"{'ENABLED' if HAS_DATASETS else 'DISABLED (pip install datasets)'}") log.info(f" RAM guard : expansion pauses above {RAM_EXPAND_CEILING*100:.0f}%") log.info("=" * 72) @app.on_event("shutdown") async def on_shutdown(): COORD.stop() log.info("[SHUTDOWN] Saving final checkpoint...") try: BRAIN.hf.push(BRAIN.snapshot(), BRAIN.status) log.info("[SHUTDOWN] Checkpoint pushed.") except Exception as e: log.warning(f"[SHUTDOWN] Checkpoint failed: {e}") if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=PORT, workers=1, log_level="info")