Spaces:
Runtime error
Runtime error
| import time | |
| import math | |
| import random | |
| import threading | |
| import collections | |
| from dataclasses import dataclass, asdict | |
| from typing import Optional, List, Dict, Any, Literal | |
| from fastapi import FastAPI | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class EngineConfig: | |
| architecture: str = "additive" # additive | multiplicative | affine | bilinear | gated | |
| coeff_mode: str = "single_k" # single_k | triple_k | per_edge_k | |
| topology: str = "single_cell" # single_cell | chain | mesh | |
| dataset_family: str = "housing" # housing | subtraction | multiplication | mixed | symbolic | |
| mode: str = "training" # training | inference | |
| num_cells: int = 3 | |
| learning_rate: float = 0.01 | |
| damping: float = 0.12 | |
| coupling: float = 0.05 | |
| batch_size: int = 24 | |
| sample_noise: float = 0.0 | |
| class CellState: | |
| id: int | |
| a: float = 0.0 | |
| b: float = 0.0 | |
| c: float = 0.0 | |
| target: Optional[float] = None | |
| label: str = "" | |
| k: float = 1.0 | |
| ka: float = 1.0 | |
| kb: float = 1.0 | |
| kc: float = 0.0 | |
| prediction: float = 0.0 | |
| error: float = 0.0 | |
| energy: float = 0.0 | |
| force: float = 0.0 | |
| anchored: bool = False | |
| def to_dict(self) -> Dict[str, Any]: | |
| return asdict(self) | |
| class SimEngine: | |
| def __init__(self): | |
| self.lock = threading.Lock() | |
| self.config = EngineConfig() | |
| self.running = False | |
| self.iteration = 0 | |
| self.current_error = 0.0 | |
| self.current_loss = 0.0 | |
| self.logs = [] | |
| self.cells: List[CellState] = [] | |
| self.batch_queue = collections.deque() | |
| self.current_sample: Optional[Dict[str, Any]] = None | |
| self.last_sample: Optional[Dict[str, Any]] = None | |
| self.loss_history = collections.deque(maxlen=120) | |
| self.error_history = collections.deque(maxlen=120) | |
| self.reset_state() | |
| def reset_state(self): | |
| with self.lock: | |
| self.iteration = 0 | |
| self.current_error = 0.0 | |
| self.current_loss = 0.0 | |
| self.logs = [] | |
| self.batch_queue.clear() | |
| self.current_sample = None | |
| self.last_sample = None | |
| self.loss_history.clear() | |
| self.error_history.clear() | |
| self._build_cells() | |
| self.add_log("Engine reset.") | |
| def _build_cells(self): | |
| count = 1 if self.config.topology == "single_cell" else max(2, int(self.config.num_cells)) | |
| self.cells = [] | |
| for i in range(count): | |
| self.cells.append( | |
| CellState( | |
| id=i, | |
| a=0.0, | |
| b=0.0, | |
| c=0.0, | |
| k=random.uniform(0.35, 1.25), | |
| ka=random.uniform(0.35, 1.25), | |
| kb=random.uniform(0.35, 1.25), | |
| kc=random.uniform(-0.25, 0.25), | |
| ) | |
| ) | |
| def add_log(self, msg: str): | |
| stamp = f"[{self.iteration}] {msg}" | |
| self.logs.insert(0, stamp) | |
| if len(self.logs) > 20: | |
| self.logs.pop() | |
| def configure(self, payload: Dict[str, Any]): | |
| with self.lock: | |
| self.config.architecture = payload.get("architecture", self.config.architecture) | |
| self.config.coeff_mode = payload.get("coeff_mode", self.config.coeff_mode) | |
| self.config.topology = payload.get("topology", self.config.topology) | |
| self.config.dataset_family = payload.get("dataset_family", self.config.dataset_family) | |
| self.config.mode = payload.get("mode", self.config.mode) | |
| self.config.num_cells = int(payload.get("num_cells", self.config.num_cells)) | |
| self.config.learning_rate = float(payload.get("learning_rate", self.config.learning_rate)) | |
| self.config.damping = float(payload.get("damping", self.config.damping)) | |
| self.config.coupling = float(payload.get("coupling", self.config.coupling)) | |
| self.config.batch_size = int(payload.get("batch_size", self.config.batch_size)) | |
| self.config.sample_noise = float(payload.get("sample_noise", self.config.sample_noise)) | |
| self.running = False | |
| self.reset_state() | |
| self.add_log( | |
| f"Config applied: {self.config.architecture} | {self.config.coeff_mode} | " | |
| f"{self.config.topology} | {self.config.dataset_family} | {self.config.mode}" | |
| ) | |
| def _sample_housing(self): | |
| a = random.uniform(2, 10) | |
| b = random.uniform(2, 10) | |
| c = (2.5 * a) + (1.2 * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise) | |
| return a, b, c, "housing_affine" | |
| def _sample_subtraction(self): | |
| a = random.uniform(2, 10) | |
| b = random.uniform(2, 10) | |
| c = (1.0 * a) + (-1.0 * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise) | |
| return a, b, c, "signed_subtraction" | |
| def _sample_multiplication(self): | |
| a = random.uniform(2, 10) | |
| b = random.uniform(2, 10) | |
| c = (a * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise) | |
| return a, b, c, "multiplicative" | |
| def _sample_symbolic(self): | |
| a = random.uniform(1, 12) | |
| b = random.uniform(1, 12) | |
| branch = random.choice(["affine", "signed_affine", "hybrid"]) | |
| if branch == "affine": | |
| c = (1.7 * a) + (0.9 * b) | |
| elif branch == "signed_affine": | |
| c = (0.8 * a) + (-1.4 * b) + 2.0 | |
| else: | |
| c = (a * 0.6) + (b * 0.4) + ((a * b) * 0.2) | |
| c += random.uniform(-self.config.sample_noise, self.config.sample_noise) | |
| return a, b, c, f"symbolic_{branch}" | |
| def generate_sample(self, family: Optional[str] = None) -> Dict[str, Any]: | |
| family = family or self.config.dataset_family | |
| if family == "housing": | |
| a, b, c, label = self._sample_housing() | |
| elif family == "subtraction": | |
| a, b, c, label = self._sample_subtraction() | |
| elif family == "multiplication": | |
| a, b, c, label = self._sample_multiplication() | |
| elif family == "symbolic": | |
| a, b, c, label = self._sample_symbolic() | |
| elif family == "mixed": | |
| pick = random.choice(["housing", "subtraction", "multiplication", "symbolic"]) | |
| return self.generate_sample(pick) | |
| else: | |
| a, b = random.uniform(2, 10), random.uniform(2, 10) | |
| c, label = a + b, "default_add" | |
| return {"a": float(a), "b": float(b), "c": float(c), "label": label} | |
| def _apply_sample_to_cells(self, sample: Dict[str, Any], anchor_output: bool): | |
| self.current_sample = sample | |
| self.last_sample = sample | |
| for cell in self.cells: | |
| cell.a = float(sample["a"]) | |
| cell.b = float(sample["b"]) | |
| cell.target = float(sample["c"]) if sample.get("c") is not None else None | |
| cell.label = sample.get("label", "") | |
| cell.anchored = anchor_output | |
| if anchor_output: | |
| cell.c = float(sample["c"]) | |
| else: | |
| cell.c = 0.0 | |
| cell.prediction = 0.0 | |
| cell.error = 0.0 | |
| cell.energy = 0.0 | |
| cell.force = 0.0 | |
| def load_sample(self, sample: Dict[str, Any], anchor_output: Optional[bool] = None): | |
| with self.lock: | |
| if anchor_output is None: | |
| anchor_output = self.config.mode == "training" | |
| self._apply_sample_to_cells(sample, anchor_output=anchor_output) | |
| self.add_log( | |
| f"Sample loaded: a={sample['a']:.3f}, b={sample['b']:.3f}, " | |
| f"c={sample['c']:.3f}, label={sample.get('label', '')}" | |
| ) | |
| def _coefficient_snapshot(self, cell: CellState): | |
| if self.config.coeff_mode == "single_k": | |
| return {"ka": cell.k, "kb": cell.k, "kc": cell.k} | |
| if self.config.coeff_mode == "per_edge_k": | |
| return {"ka": cell.ka, "kb": cell.kb, "kc": 0.0} | |
| return {"ka": cell.ka, "kb": cell.kb, "kc": cell.kc} | |
| def _set_trainable_param(self, cell: CellState, name: str, value: float): | |
| value = max(-20.0, min(20.0, value)) | |
| if name == "k": | |
| cell.k = value | |
| elif name == "ka": | |
| cell.ka = value | |
| elif name == "kb": | |
| cell.kb = value | |
| elif name == "kc": | |
| cell.kc = value | |
| def _get_trainable_params(self): | |
| if self.config.coeff_mode == "single_k": | |
| return ["k"] | |
| if self.config.coeff_mode == "per_edge_k": | |
| return ["ka", "kb"] | |
| return ["ka", "kb", "kc"] | |
| def _predict_cell(self, cell: CellState) -> float: | |
| coeffs = self._coefficient_snapshot(cell) | |
| a, b = cell.a, cell.b | |
| arch = self.config.architecture | |
| ka, kb, kc = coeffs["ka"], coeffs["kb"], coeffs["kc"] | |
| if self.config.coeff_mode == "single_k": | |
| k = cell.k | |
| if arch == "additive": | |
| return k * (a + b) | |
| if arch == "multiplicative": | |
| return k * (a * b) | |
| if arch == "affine": | |
| return (k * a) + (k * b) + k | |
| if arch == "bilinear": | |
| return k * (a + b + (a * b)) | |
| if arch == "gated": | |
| gate = 1.0 / (1.0 + math.exp(-k)) | |
| return gate * (a + b) + (1.0 - gate) * (a * b) | |
| return k * (a + b) | |
| if arch == "additive": | |
| return (ka * a) + (kb * b) + kc | |
| if arch == "multiplicative": | |
| return (ka * a) * (kb * b) + kc | |
| if arch == "affine": | |
| return (ka * a) + (kb * b) + kc | |
| if arch == "bilinear": | |
| return (ka * a) + (kb * b) + (kc * a * b) | |
| if arch == "gated": | |
| gate = 1.0 / (1.0 + math.exp(-kc)) | |
| return gate * ((ka * a) + (kb * b)) + (1.0 - gate) * (a * b) | |
| return (ka * a) + (kb * b) + kc | |
| def _neighbors(self, idx: int): | |
| if self.config.topology == "single_cell": | |
| return [] | |
| if self.config.topology == "chain": | |
| n = [] | |
| if idx - 1 >= 0: | |
| n.append(idx - 1) | |
| if idx + 1 < len(self.cells): | |
| n.append(idx + 1) | |
| return n | |
| if self.config.topology == "mesh": | |
| return [j for j in range(len(self.cells)) if j != idx] | |
| return [] | |
| def _cell_loss(self, idx: int, preds: List[float]) -> float: | |
| cell = self.cells[idx] | |
| pred = preds[idx] | |
| loss = 0.0 | |
| if cell.target is not None: | |
| loss += (pred - cell.target) ** 2 | |
| neighbors = self._neighbors(idx) | |
| if neighbors: | |
| neighbor_mean = sum(preds[j] for j in neighbors) / len(neighbors) | |
| loss += self.config.coupling * ((pred - neighbor_mean) ** 2) | |
| return loss | |
| def _numeric_gradient(self, idx: int, param_name: str, eps: float = 1e-4) -> float: | |
| cell = self.cells[idx] | |
| old = getattr(cell, param_name) | |
| def local_loss() -> float: | |
| pred = self._predict_cell(cell) | |
| loss = 0.0 | |
| if cell.target is not None: | |
| loss += (pred - cell.target) ** 2 | |
| neighbors = self._neighbors(idx) | |
| if neighbors: | |
| neighbor_preds = [self._predict_cell(self.cells[j]) for j in neighbors] | |
| neighbor_mean = sum(neighbor_preds) / len(neighbor_preds) | |
| loss += self.config.coupling * ((pred - neighbor_mean) ** 2) | |
| return loss | |
| self._set_trainable_param(cell, param_name, old + eps) | |
| plus = local_loss() | |
| self._set_trainable_param(cell, param_name, old - eps) | |
| minus = local_loss() | |
| self._set_trainable_param(cell, param_name, old) | |
| return (plus - minus) / (2.0 * eps) | |
| def _mean(self, xs: List[float]) -> float: | |
| return sum(xs) / max(1, len(xs)) | |
| def _load_next_sample_from_batch(self): | |
| if self.batch_queue: | |
| sample = self.batch_queue.popleft() | |
| self._apply_sample_to_cells(sample, anchor_output=(self.config.mode == "training")) | |
| self.add_log(f"Next batch sample: {sample.get('label', '')}") | |
| return True | |
| return False | |
| def physics_step(self): | |
| with self.lock: | |
| if not self.running or not self.cells: | |
| return False | |
| preds = [] | |
| for cell in self.cells: | |
| pred = self._predict_cell(cell) | |
| cell.prediction = pred | |
| preds.append(pred) | |
| global_pred = self._mean(preds) | |
| target_available = self.current_sample is not None and self.current_sample.get("c") is not None | |
| target = self._mean([c.target for c in self.cells if c.target is not None]) if target_available else None | |
| if self.config.mode == "training": | |
| total_loss = 0.0 | |
| for idx, cell in enumerate(self.cells): | |
| cell.target = target if target is not None else cell.target | |
| cell.error = (cell.prediction - cell.target) if cell.target is not None else 0.0 | |
| cell.energy = cell.error ** 2 | |
| for param_name in self._get_trainable_params(): | |
| grad = self._numeric_gradient(idx, param_name) | |
| old = getattr(cell, param_name) | |
| new_val = old - (self.config.learning_rate * grad) | |
| new_val = (1.0 - self.config.damping) * new_val + (self.config.damping * old) | |
| self._set_trainable_param(cell, param_name, new_val) | |
| total_loss += self._cell_loss(idx, preds) | |
| self.current_loss = total_loss / max(1, len(self.cells)) | |
| self.current_error = (global_pred - target) if target is not None else global_pred | |
| self.loss_history.append(self.current_loss) | |
| self.error_history.append(self.current_error) | |
| if target_available and abs(self.current_error) < 0.05 and self.current_loss < 0.01: | |
| self.add_log("Converged on current sample.") | |
| if self._load_next_sample_from_batch(): | |
| self.iteration += 1 | |
| return True | |
| self.running = False | |
| self.add_log("Batch complete.") | |
| self.iteration += 1 | |
| return False | |
| else: | |
| # Inference mode: output node(s) drift toward the predicted state. | |
| drift_values = [] | |
| for idx, cell in enumerate(self.cells): | |
| neighbors = self._neighbors(idx) | |
| neighbor_mean = self._mean([preds[j] for j in neighbors]) if neighbors else pred | |
| drift = (pred - cell.c) | |
| drift += self.config.coupling * (neighbor_mean - cell.c) | |
| cell.force = drift | |
| cell.c += 0.15 * drift | |
| cell.error = pred - cell.c | |
| cell.energy = cell.error ** 2 | |
| drift_values.append(abs(drift)) | |
| self.current_error = self._mean([cell.error for cell in self.cells]) | |
| self.current_loss = self._mean([cell.energy for cell in self.cells]) | |
| self.loss_history.append(self.current_loss) | |
| self.error_history.append(self.current_error) | |
| if self.current_sample and abs(self.current_error) < 0.05 and self.current_loss < 0.01: | |
| self.add_log("Inference settled.") | |
| if self._load_next_sample_from_batch(): | |
| self.iteration += 1 | |
| return True | |
| self.running = False | |
| self.add_log("Task complete.") | |
| self.iteration += 1 | |
| return False | |
| # If no target exists, stop when drift is tiny. | |
| if not target_available and self._mean(drift_values) < 0.002: | |
| self.running = False | |
| self.add_log("Inference drift stabilized.") | |
| self.iteration += 1 | |
| return False | |
| self.iteration += 1 | |
| return True | |
| def start_batch(self, count: int): | |
| with self.lock: | |
| self.batch_queue.clear() | |
| for _ in range(count): | |
| self.batch_queue.append(self.generate_sample()) | |
| first = self._load_next_sample_from_batch() | |
| self.running = first | |
| self.add_log(f"Batch started with {count} samples.") | |
| return first | |
| def set_custom_sample(self, a: float, b: float, c: Optional[float] = None): | |
| with self.lock: | |
| sample = {"a": float(a), "b": float(b), "c": float(c) if c is not None else None, "label": "custom"} | |
| self._apply_sample_to_cells(sample, anchor_output=(self.config.mode == "training" and c is not None)) | |
| self.current_sample = sample | |
| self.last_sample = sample | |
| self.running = True | |
| self.add_log(f"Custom sample loaded: a={a}, b={b}, c={c}") | |
| return sample | |
| def halt(self): | |
| with self.lock: | |
| self.running = False | |
| self.add_log("Engine halted.") | |
| def snapshot(self) -> Dict[str, Any]: | |
| with self.lock: | |
| return { | |
| "config": asdict(self.config), | |
| "running": self.running, | |
| "iteration": self.iteration, | |
| "current_error": self.current_error, | |
| "current_loss": self.current_loss, | |
| "cells": [c.to_dict() for c in self.cells], | |
| "logs": self.logs, | |
| "last_sample": self.last_sample, | |
| "current_sample": self.current_sample, | |
| "batch_remaining": len(self.batch_queue), | |
| "loss_history": list(self.loss_history), | |
| "error_history": list(self.error_history), | |
| } | |
| engine = SimEngine() | |
| def run_loop(): | |
| while True: | |
| if engine.running: | |
| engine.physics_step() | |
| time.sleep(0.04) | |
| threading.Thread(target=run_loop, daemon=True).start() | |
| async def get_ui(): | |
| return FileResponse("index.html") | |
| async def get_state(): | |
| return engine.snapshot() | |
| async def config(data: dict): | |
| engine.configure(data) | |
| return {"ok": True} | |
| async def example(data: dict): | |
| family = data.get("dataset_family", engine.config.dataset_family) | |
| sample = engine.generate_sample(family) | |
| return sample | |
| async def generate_batch(data: dict): | |
| count = int(data.get("count", engine.config.batch_size)) | |
| engine.start_batch(count) | |
| return {"ok": True, "count": count} | |
| async def test_custom(data: dict): | |
| a = float(data["a"]) | |
| b = float(data["b"]) | |
| c = data.get("c", None) | |
| c_val = float(c) if c not in [None, "", "null"] else None | |
| engine.set_custom_sample(a, b, c_val) | |
| return {"ok": True} | |
| async def halt(): | |
| engine.halt() | |
| return {"ok": True} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |