import time import math import random import threading import collections from dataclasses import dataclass, asdict from typing import Optional, List, Dict, Any, Literal from fastapi import FastAPI from fastapi.responses import HTMLResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) @dataclass class EngineConfig: architecture: str = "additive" # additive | multiplicative | affine | bilinear | gated coeff_mode: str = "single_k" # single_k | triple_k | per_edge_k topology: str = "single_cell" # single_cell | chain | mesh dataset_family: str = "housing" # housing | subtraction | multiplication | mixed | symbolic mode: str = "training" # training | inference num_cells: int = 3 learning_rate: float = 0.01 damping: float = 0.12 coupling: float = 0.05 batch_size: int = 24 sample_noise: float = 0.0 @dataclass class CellState: id: int a: float = 0.0 b: float = 0.0 c: float = 0.0 target: Optional[float] = None label: str = "" k: float = 1.0 ka: float = 1.0 kb: float = 1.0 kc: float = 0.0 prediction: float = 0.0 error: float = 0.0 energy: float = 0.0 force: float = 0.0 anchored: bool = False def to_dict(self) -> Dict[str, Any]: return asdict(self) class SimEngine: def __init__(self): self.lock = threading.Lock() self.config = EngineConfig() self.running = False self.iteration = 0 self.current_error = 0.0 self.current_loss = 0.0 self.logs = [] self.cells: List[CellState] = [] self.batch_queue = collections.deque() self.current_sample: Optional[Dict[str, Any]] = None self.last_sample: Optional[Dict[str, Any]] = None self.loss_history = collections.deque(maxlen=120) self.error_history = collections.deque(maxlen=120) self.reset_state() def reset_state(self): with self.lock: self.iteration = 0 self.current_error = 0.0 self.current_loss = 0.0 self.logs = [] self.batch_queue.clear() self.current_sample = None self.last_sample = None self.loss_history.clear() self.error_history.clear() self._build_cells() self.add_log("Engine reset.") def _build_cells(self): count = 1 if self.config.topology == "single_cell" else max(2, int(self.config.num_cells)) self.cells = [] for i in range(count): self.cells.append( CellState( id=i, a=0.0, b=0.0, c=0.0, k=random.uniform(0.35, 1.25), ka=random.uniform(0.35, 1.25), kb=random.uniform(0.35, 1.25), kc=random.uniform(-0.25, 0.25), ) ) def add_log(self, msg: str): stamp = f"[{self.iteration}] {msg}" self.logs.insert(0, stamp) if len(self.logs) > 20: self.logs.pop() def configure(self, payload: Dict[str, Any]): with self.lock: self.config.architecture = payload.get("architecture", self.config.architecture) self.config.coeff_mode = payload.get("coeff_mode", self.config.coeff_mode) self.config.topology = payload.get("topology", self.config.topology) self.config.dataset_family = payload.get("dataset_family", self.config.dataset_family) self.config.mode = payload.get("mode", self.config.mode) self.config.num_cells = int(payload.get("num_cells", self.config.num_cells)) self.config.learning_rate = float(payload.get("learning_rate", self.config.learning_rate)) self.config.damping = float(payload.get("damping", self.config.damping)) self.config.coupling = float(payload.get("coupling", self.config.coupling)) self.config.batch_size = int(payload.get("batch_size", self.config.batch_size)) self.config.sample_noise = float(payload.get("sample_noise", self.config.sample_noise)) self.running = False self.reset_state() self.add_log( f"Config applied: {self.config.architecture} | {self.config.coeff_mode} | " f"{self.config.topology} | {self.config.dataset_family} | {self.config.mode}" ) def _sample_housing(self): a = random.uniform(2, 10) b = random.uniform(2, 10) c = (2.5 * a) + (1.2 * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise) return a, b, c, "housing_affine" def _sample_subtraction(self): a = random.uniform(2, 10) b = random.uniform(2, 10) c = (1.0 * a) + (-1.0 * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise) return a, b, c, "signed_subtraction" def _sample_multiplication(self): a = random.uniform(2, 10) b = random.uniform(2, 10) c = (a * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise) return a, b, c, "multiplicative" def _sample_symbolic(self): a = random.uniform(1, 12) b = random.uniform(1, 12) branch = random.choice(["affine", "signed_affine", "hybrid"]) if branch == "affine": c = (1.7 * a) + (0.9 * b) elif branch == "signed_affine": c = (0.8 * a) + (-1.4 * b) + 2.0 else: c = (a * 0.6) + (b * 0.4) + ((a * b) * 0.2) c += random.uniform(-self.config.sample_noise, self.config.sample_noise) return a, b, c, f"symbolic_{branch}" def generate_sample(self, family: Optional[str] = None) -> Dict[str, Any]: family = family or self.config.dataset_family if family == "housing": a, b, c, label = self._sample_housing() elif family == "subtraction": a, b, c, label = self._sample_subtraction() elif family == "multiplication": a, b, c, label = self._sample_multiplication() elif family == "symbolic": a, b, c, label = self._sample_symbolic() elif family == "mixed": pick = random.choice(["housing", "subtraction", "multiplication", "symbolic"]) return self.generate_sample(pick) else: a, b = random.uniform(2, 10), random.uniform(2, 10) c, label = a + b, "default_add" return {"a": float(a), "b": float(b), "c": float(c), "label": label} def _apply_sample_to_cells(self, sample: Dict[str, Any], anchor_output: bool): self.current_sample = sample self.last_sample = sample for cell in self.cells: cell.a = float(sample["a"]) cell.b = float(sample["b"]) cell.target = float(sample["c"]) if sample.get("c") is not None else None cell.label = sample.get("label", "") cell.anchored = anchor_output if anchor_output: cell.c = float(sample["c"]) else: cell.c = 0.0 cell.prediction = 0.0 cell.error = 0.0 cell.energy = 0.0 cell.force = 0.0 def load_sample(self, sample: Dict[str, Any], anchor_output: Optional[bool] = None): with self.lock: if anchor_output is None: anchor_output = self.config.mode == "training" self._apply_sample_to_cells(sample, anchor_output=anchor_output) self.add_log( f"Sample loaded: a={sample['a']:.3f}, b={sample['b']:.3f}, " f"c={sample['c']:.3f}, label={sample.get('label', '')}" ) def _coefficient_snapshot(self, cell: CellState): if self.config.coeff_mode == "single_k": return {"ka": cell.k, "kb": cell.k, "kc": cell.k} if self.config.coeff_mode == "per_edge_k": return {"ka": cell.ka, "kb": cell.kb, "kc": 0.0} return {"ka": cell.ka, "kb": cell.kb, "kc": cell.kc} def _set_trainable_param(self, cell: CellState, name: str, value: float): value = max(-20.0, min(20.0, value)) if name == "k": cell.k = value elif name == "ka": cell.ka = value elif name == "kb": cell.kb = value elif name == "kc": cell.kc = value def _get_trainable_params(self): if self.config.coeff_mode == "single_k": return ["k"] if self.config.coeff_mode == "per_edge_k": return ["ka", "kb"] return ["ka", "kb", "kc"] def _predict_cell(self, cell: CellState) -> float: coeffs = self._coefficient_snapshot(cell) a, b = cell.a, cell.b arch = self.config.architecture ka, kb, kc = coeffs["ka"], coeffs["kb"], coeffs["kc"] if self.config.coeff_mode == "single_k": k = cell.k if arch == "additive": return k * (a + b) if arch == "multiplicative": return k * (a * b) if arch == "affine": return (k * a) + (k * b) + k if arch == "bilinear": return k * (a + b + (a * b)) if arch == "gated": gate = 1.0 / (1.0 + math.exp(-k)) return gate * (a + b) + (1.0 - gate) * (a * b) return k * (a + b) if arch == "additive": return (ka * a) + (kb * b) + kc if arch == "multiplicative": return (ka * a) * (kb * b) + kc if arch == "affine": return (ka * a) + (kb * b) + kc if arch == "bilinear": return (ka * a) + (kb * b) + (kc * a * b) if arch == "gated": gate = 1.0 / (1.0 + math.exp(-kc)) return gate * ((ka * a) + (kb * b)) + (1.0 - gate) * (a * b) return (ka * a) + (kb * b) + kc def _neighbors(self, idx: int): if self.config.topology == "single_cell": return [] if self.config.topology == "chain": n = [] if idx - 1 >= 0: n.append(idx - 1) if idx + 1 < len(self.cells): n.append(idx + 1) return n if self.config.topology == "mesh": return [j for j in range(len(self.cells)) if j != idx] return [] def _cell_loss(self, idx: int, preds: List[float]) -> float: cell = self.cells[idx] pred = preds[idx] loss = 0.0 if cell.target is not None: loss += (pred - cell.target) ** 2 neighbors = self._neighbors(idx) if neighbors: neighbor_mean = sum(preds[j] for j in neighbors) / len(neighbors) loss += self.config.coupling * ((pred - neighbor_mean) ** 2) return loss def _numeric_gradient(self, idx: int, param_name: str, eps: float = 1e-4) -> float: cell = self.cells[idx] old = getattr(cell, param_name) def local_loss() -> float: pred = self._predict_cell(cell) loss = 0.0 if cell.target is not None: loss += (pred - cell.target) ** 2 neighbors = self._neighbors(idx) if neighbors: neighbor_preds = [self._predict_cell(self.cells[j]) for j in neighbors] neighbor_mean = sum(neighbor_preds) / len(neighbor_preds) loss += self.config.coupling * ((pred - neighbor_mean) ** 2) return loss self._set_trainable_param(cell, param_name, old + eps) plus = local_loss() self._set_trainable_param(cell, param_name, old - eps) minus = local_loss() self._set_trainable_param(cell, param_name, old) return (plus - minus) / (2.0 * eps) def _mean(self, xs: List[float]) -> float: return sum(xs) / max(1, len(xs)) def _load_next_sample_from_batch(self): if self.batch_queue: sample = self.batch_queue.popleft() self._apply_sample_to_cells(sample, anchor_output=(self.config.mode == "training")) self.add_log(f"Next batch sample: {sample.get('label', '')}") return True return False def physics_step(self): with self.lock: if not self.running or not self.cells: return False preds = [] for cell in self.cells: pred = self._predict_cell(cell) cell.prediction = pred preds.append(pred) global_pred = self._mean(preds) target_available = self.current_sample is not None and self.current_sample.get("c") is not None target = self._mean([c.target for c in self.cells if c.target is not None]) if target_available else None if self.config.mode == "training": total_loss = 0.0 for idx, cell in enumerate(self.cells): cell.target = target if target is not None else cell.target cell.error = (cell.prediction - cell.target) if cell.target is not None else 0.0 cell.energy = cell.error ** 2 for param_name in self._get_trainable_params(): grad = self._numeric_gradient(idx, param_name) old = getattr(cell, param_name) new_val = old - (self.config.learning_rate * grad) new_val = (1.0 - self.config.damping) * new_val + (self.config.damping * old) self._set_trainable_param(cell, param_name, new_val) total_loss += self._cell_loss(idx, preds) self.current_loss = total_loss / max(1, len(self.cells)) self.current_error = (global_pred - target) if target is not None else global_pred self.loss_history.append(self.current_loss) self.error_history.append(self.current_error) if target_available and abs(self.current_error) < 0.05 and self.current_loss < 0.01: self.add_log("Converged on current sample.") if self._load_next_sample_from_batch(): self.iteration += 1 return True self.running = False self.add_log("Batch complete.") self.iteration += 1 return False else: # Inference mode: output node(s) drift toward the predicted state. drift_values = [] for idx, cell in enumerate(self.cells): neighbors = self._neighbors(idx) neighbor_mean = self._mean([preds[j] for j in neighbors]) if neighbors else pred drift = (pred - cell.c) drift += self.config.coupling * (neighbor_mean - cell.c) cell.force = drift cell.c += 0.15 * drift cell.error = pred - cell.c cell.energy = cell.error ** 2 drift_values.append(abs(drift)) self.current_error = self._mean([cell.error for cell in self.cells]) self.current_loss = self._mean([cell.energy for cell in self.cells]) self.loss_history.append(self.current_loss) self.error_history.append(self.current_error) if self.current_sample and abs(self.current_error) < 0.05 and self.current_loss < 0.01: self.add_log("Inference settled.") if self._load_next_sample_from_batch(): self.iteration += 1 return True self.running = False self.add_log("Task complete.") self.iteration += 1 return False # If no target exists, stop when drift is tiny. if not target_available and self._mean(drift_values) < 0.002: self.running = False self.add_log("Inference drift stabilized.") self.iteration += 1 return False self.iteration += 1 return True def start_batch(self, count: int): with self.lock: self.batch_queue.clear() for _ in range(count): self.batch_queue.append(self.generate_sample()) first = self._load_next_sample_from_batch() self.running = first self.add_log(f"Batch started with {count} samples.") return first def set_custom_sample(self, a: float, b: float, c: Optional[float] = None): with self.lock: sample = {"a": float(a), "b": float(b), "c": float(c) if c is not None else None, "label": "custom"} self._apply_sample_to_cells(sample, anchor_output=(self.config.mode == "training" and c is not None)) self.current_sample = sample self.last_sample = sample self.running = True self.add_log(f"Custom sample loaded: a={a}, b={b}, c={c}") return sample def halt(self): with self.lock: self.running = False self.add_log("Engine halted.") def snapshot(self) -> Dict[str, Any]: with self.lock: return { "config": asdict(self.config), "running": self.running, "iteration": self.iteration, "current_error": self.current_error, "current_loss": self.current_loss, "cells": [c.to_dict() for c in self.cells], "logs": self.logs, "last_sample": self.last_sample, "current_sample": self.current_sample, "batch_remaining": len(self.batch_queue), "loss_history": list(self.loss_history), "error_history": list(self.error_history), } engine = SimEngine() def run_loop(): while True: if engine.running: engine.physics_step() time.sleep(0.04) threading.Thread(target=run_loop, daemon=True).start() @app.get("/", response_class=HTMLResponse) async def get_ui(): return FileResponse("index.html") @app.get("/state") async def get_state(): return engine.snapshot() @app.post("/config") async def config(data: dict): engine.configure(data) return {"ok": True} @app.post("/example") async def example(data: dict): family = data.get("dataset_family", engine.config.dataset_family) sample = engine.generate_sample(family) return sample @app.post("/generate_batch") async def generate_batch(data: dict): count = int(data.get("count", engine.config.batch_size)) engine.start_batch(count) return {"ok": True, "count": count} @app.post("/test_custom") async def test_custom(data: dict): a = float(data["a"]) b = float(data["b"]) c = data.get("c", None) c_val = float(c) if c not in [None, "", "null"] else None engine.set_custom_sample(a, b, c_val) return {"ok": True} @app.post("/halt") async def halt(): engine.halt() return {"ok": True} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)