testing_space / app.py
everydaytok's picture
Update app.py
20b49c8 verified
raw
history blame
19.6 kB
import time
import math
import random
import threading
import collections
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict, Any, Literal
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@dataclass
class EngineConfig:
architecture: str = "additive" # additive | multiplicative | affine | bilinear | gated
coeff_mode: str = "single_k" # single_k | triple_k | per_edge_k
topology: str = "single_cell" # single_cell | chain | mesh
dataset_family: str = "housing" # housing | subtraction | multiplication | mixed | symbolic
mode: str = "training" # training | inference
num_cells: int = 3
learning_rate: float = 0.01
damping: float = 0.12
coupling: float = 0.05
batch_size: int = 24
sample_noise: float = 0.0
@dataclass
class CellState:
id: int
a: float = 0.0
b: float = 0.0
c: float = 0.0
target: Optional[float] = None
label: str = ""
k: float = 1.0
ka: float = 1.0
kb: float = 1.0
kc: float = 0.0
prediction: float = 0.0
error: float = 0.0
energy: float = 0.0
force: float = 0.0
anchored: bool = False
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class SimEngine:
def __init__(self):
self.lock = threading.Lock()
self.config = EngineConfig()
self.running = False
self.iteration = 0
self.current_error = 0.0
self.current_loss = 0.0
self.logs = []
self.cells: List[CellState] = []
self.batch_queue = collections.deque()
self.current_sample: Optional[Dict[str, Any]] = None
self.last_sample: Optional[Dict[str, Any]] = None
self.loss_history = collections.deque(maxlen=120)
self.error_history = collections.deque(maxlen=120)
self.reset_state()
def reset_state(self):
with self.lock:
self.iteration = 0
self.current_error = 0.0
self.current_loss = 0.0
self.logs = []
self.batch_queue.clear()
self.current_sample = None
self.last_sample = None
self.loss_history.clear()
self.error_history.clear()
self._build_cells()
self.add_log("Engine reset.")
def _build_cells(self):
count = 1 if self.config.topology == "single_cell" else max(2, int(self.config.num_cells))
self.cells = []
for i in range(count):
self.cells.append(
CellState(
id=i,
a=0.0,
b=0.0,
c=0.0,
k=random.uniform(0.35, 1.25),
ka=random.uniform(0.35, 1.25),
kb=random.uniform(0.35, 1.25),
kc=random.uniform(-0.25, 0.25),
)
)
def add_log(self, msg: str):
stamp = f"[{self.iteration}] {msg}"
self.logs.insert(0, stamp)
if len(self.logs) > 20:
self.logs.pop()
def configure(self, payload: Dict[str, Any]):
with self.lock:
self.config.architecture = payload.get("architecture", self.config.architecture)
self.config.coeff_mode = payload.get("coeff_mode", self.config.coeff_mode)
self.config.topology = payload.get("topology", self.config.topology)
self.config.dataset_family = payload.get("dataset_family", self.config.dataset_family)
self.config.mode = payload.get("mode", self.config.mode)
self.config.num_cells = int(payload.get("num_cells", self.config.num_cells))
self.config.learning_rate = float(payload.get("learning_rate", self.config.learning_rate))
self.config.damping = float(payload.get("damping", self.config.damping))
self.config.coupling = float(payload.get("coupling", self.config.coupling))
self.config.batch_size = int(payload.get("batch_size", self.config.batch_size))
self.config.sample_noise = float(payload.get("sample_noise", self.config.sample_noise))
self.running = False
self.reset_state()
self.add_log(
f"Config applied: {self.config.architecture} | {self.config.coeff_mode} | "
f"{self.config.topology} | {self.config.dataset_family} | {self.config.mode}"
)
def _sample_housing(self):
a = random.uniform(2, 10)
b = random.uniform(2, 10)
c = (2.5 * a) + (1.2 * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise)
return a, b, c, "housing_affine"
def _sample_subtraction(self):
a = random.uniform(2, 10)
b = random.uniform(2, 10)
c = (1.0 * a) + (-1.0 * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise)
return a, b, c, "signed_subtraction"
def _sample_multiplication(self):
a = random.uniform(2, 10)
b = random.uniform(2, 10)
c = (a * b) + random.uniform(-self.config.sample_noise, self.config.sample_noise)
return a, b, c, "multiplicative"
def _sample_symbolic(self):
a = random.uniform(1, 12)
b = random.uniform(1, 12)
branch = random.choice(["affine", "signed_affine", "hybrid"])
if branch == "affine":
c = (1.7 * a) + (0.9 * b)
elif branch == "signed_affine":
c = (0.8 * a) + (-1.4 * b) + 2.0
else:
c = (a * 0.6) + (b * 0.4) + ((a * b) * 0.2)
c += random.uniform(-self.config.sample_noise, self.config.sample_noise)
return a, b, c, f"symbolic_{branch}"
def generate_sample(self, family: Optional[str] = None) -> Dict[str, Any]:
family = family or self.config.dataset_family
if family == "housing":
a, b, c, label = self._sample_housing()
elif family == "subtraction":
a, b, c, label = self._sample_subtraction()
elif family == "multiplication":
a, b, c, label = self._sample_multiplication()
elif family == "symbolic":
a, b, c, label = self._sample_symbolic()
elif family == "mixed":
pick = random.choice(["housing", "subtraction", "multiplication", "symbolic"])
return self.generate_sample(pick)
else:
a, b = random.uniform(2, 10), random.uniform(2, 10)
c, label = a + b, "default_add"
return {"a": float(a), "b": float(b), "c": float(c), "label": label}
def _apply_sample_to_cells(self, sample: Dict[str, Any], anchor_output: bool):
self.current_sample = sample
self.last_sample = sample
for cell in self.cells:
cell.a = float(sample["a"])
cell.b = float(sample["b"])
cell.target = float(sample["c"]) if sample.get("c") is not None else None
cell.label = sample.get("label", "")
cell.anchored = anchor_output
if anchor_output:
cell.c = float(sample["c"])
else:
cell.c = 0.0
cell.prediction = 0.0
cell.error = 0.0
cell.energy = 0.0
cell.force = 0.0
def load_sample(self, sample: Dict[str, Any], anchor_output: Optional[bool] = None):
with self.lock:
if anchor_output is None:
anchor_output = self.config.mode == "training"
self._apply_sample_to_cells(sample, anchor_output=anchor_output)
self.add_log(
f"Sample loaded: a={sample['a']:.3f}, b={sample['b']:.3f}, "
f"c={sample['c']:.3f}, label={sample.get('label', '')}"
)
def _coefficient_snapshot(self, cell: CellState):
if self.config.coeff_mode == "single_k":
return {"ka": cell.k, "kb": cell.k, "kc": cell.k}
if self.config.coeff_mode == "per_edge_k":
return {"ka": cell.ka, "kb": cell.kb, "kc": 0.0}
return {"ka": cell.ka, "kb": cell.kb, "kc": cell.kc}
def _set_trainable_param(self, cell: CellState, name: str, value: float):
value = max(-20.0, min(20.0, value))
if name == "k":
cell.k = value
elif name == "ka":
cell.ka = value
elif name == "kb":
cell.kb = value
elif name == "kc":
cell.kc = value
def _get_trainable_params(self):
if self.config.coeff_mode == "single_k":
return ["k"]
if self.config.coeff_mode == "per_edge_k":
return ["ka", "kb"]
return ["ka", "kb", "kc"]
def _predict_cell(self, cell: CellState) -> float:
coeffs = self._coefficient_snapshot(cell)
a, b = cell.a, cell.b
arch = self.config.architecture
ka, kb, kc = coeffs["ka"], coeffs["kb"], coeffs["kc"]
if self.config.coeff_mode == "single_k":
k = cell.k
if arch == "additive":
return k * (a + b)
if arch == "multiplicative":
return k * (a * b)
if arch == "affine":
return (k * a) + (k * b) + k
if arch == "bilinear":
return k * (a + b + (a * b))
if arch == "gated":
gate = 1.0 / (1.0 + math.exp(-k))
return gate * (a + b) + (1.0 - gate) * (a * b)
return k * (a + b)
if arch == "additive":
return (ka * a) + (kb * b) + kc
if arch == "multiplicative":
return (ka * a) * (kb * b) + kc
if arch == "affine":
return (ka * a) + (kb * b) + kc
if arch == "bilinear":
return (ka * a) + (kb * b) + (kc * a * b)
if arch == "gated":
gate = 1.0 / (1.0 + math.exp(-kc))
return gate * ((ka * a) + (kb * b)) + (1.0 - gate) * (a * b)
return (ka * a) + (kb * b) + kc
def _neighbors(self, idx: int):
if self.config.topology == "single_cell":
return []
if self.config.topology == "chain":
n = []
if idx - 1 >= 0:
n.append(idx - 1)
if idx + 1 < len(self.cells):
n.append(idx + 1)
return n
if self.config.topology == "mesh":
return [j for j in range(len(self.cells)) if j != idx]
return []
def _cell_loss(self, idx: int, preds: List[float]) -> float:
cell = self.cells[idx]
pred = preds[idx]
loss = 0.0
if cell.target is not None:
loss += (pred - cell.target) ** 2
neighbors = self._neighbors(idx)
if neighbors:
neighbor_mean = sum(preds[j] for j in neighbors) / len(neighbors)
loss += self.config.coupling * ((pred - neighbor_mean) ** 2)
return loss
def _numeric_gradient(self, idx: int, param_name: str, eps: float = 1e-4) -> float:
cell = self.cells[idx]
old = getattr(cell, param_name)
def local_loss() -> float:
pred = self._predict_cell(cell)
loss = 0.0
if cell.target is not None:
loss += (pred - cell.target) ** 2
neighbors = self._neighbors(idx)
if neighbors:
neighbor_preds = [self._predict_cell(self.cells[j]) for j in neighbors]
neighbor_mean = sum(neighbor_preds) / len(neighbor_preds)
loss += self.config.coupling * ((pred - neighbor_mean) ** 2)
return loss
self._set_trainable_param(cell, param_name, old + eps)
plus = local_loss()
self._set_trainable_param(cell, param_name, old - eps)
minus = local_loss()
self._set_trainable_param(cell, param_name, old)
return (plus - minus) / (2.0 * eps)
def _mean(self, xs: List[float]) -> float:
return sum(xs) / max(1, len(xs))
def _load_next_sample_from_batch(self):
if self.batch_queue:
sample = self.batch_queue.popleft()
self._apply_sample_to_cells(sample, anchor_output=(self.config.mode == "training"))
self.add_log(f"Next batch sample: {sample.get('label', '')}")
return True
return False
def physics_step(self):
with self.lock:
if not self.running or not self.cells:
return False
preds = []
for cell in self.cells:
pred = self._predict_cell(cell)
cell.prediction = pred
preds.append(pred)
global_pred = self._mean(preds)
target_available = self.current_sample is not None and self.current_sample.get("c") is not None
target = self._mean([c.target for c in self.cells if c.target is not None]) if target_available else None
if self.config.mode == "training":
total_loss = 0.0
for idx, cell in enumerate(self.cells):
cell.target = target if target is not None else cell.target
cell.error = (cell.prediction - cell.target) if cell.target is not None else 0.0
cell.energy = cell.error ** 2
for param_name in self._get_trainable_params():
grad = self._numeric_gradient(idx, param_name)
old = getattr(cell, param_name)
new_val = old - (self.config.learning_rate * grad)
new_val = (1.0 - self.config.damping) * new_val + (self.config.damping * old)
self._set_trainable_param(cell, param_name, new_val)
total_loss += self._cell_loss(idx, preds)
self.current_loss = total_loss / max(1, len(self.cells))
self.current_error = (global_pred - target) if target is not None else global_pred
self.loss_history.append(self.current_loss)
self.error_history.append(self.current_error)
if target_available and abs(self.current_error) < 0.05 and self.current_loss < 0.01:
self.add_log("Converged on current sample.")
if self._load_next_sample_from_batch():
self.iteration += 1
return True
self.running = False
self.add_log("Batch complete.")
self.iteration += 1
return False
else:
# Inference mode: output node(s) drift toward the predicted state.
drift_values = []
for idx, cell in enumerate(self.cells):
neighbors = self._neighbors(idx)
neighbor_mean = self._mean([preds[j] for j in neighbors]) if neighbors else pred
drift = (pred - cell.c)
drift += self.config.coupling * (neighbor_mean - cell.c)
cell.force = drift
cell.c += 0.15 * drift
cell.error = pred - cell.c
cell.energy = cell.error ** 2
drift_values.append(abs(drift))
self.current_error = self._mean([cell.error for cell in self.cells])
self.current_loss = self._mean([cell.energy for cell in self.cells])
self.loss_history.append(self.current_loss)
self.error_history.append(self.current_error)
if self.current_sample and abs(self.current_error) < 0.05 and self.current_loss < 0.01:
self.add_log("Inference settled.")
if self._load_next_sample_from_batch():
self.iteration += 1
return True
self.running = False
self.add_log("Task complete.")
self.iteration += 1
return False
# If no target exists, stop when drift is tiny.
if not target_available and self._mean(drift_values) < 0.002:
self.running = False
self.add_log("Inference drift stabilized.")
self.iteration += 1
return False
self.iteration += 1
return True
def start_batch(self, count: int):
with self.lock:
self.batch_queue.clear()
for _ in range(count):
self.batch_queue.append(self.generate_sample())
first = self._load_next_sample_from_batch()
self.running = first
self.add_log(f"Batch started with {count} samples.")
return first
def set_custom_sample(self, a: float, b: float, c: Optional[float] = None):
with self.lock:
sample = {"a": float(a), "b": float(b), "c": float(c) if c is not None else None, "label": "custom"}
self._apply_sample_to_cells(sample, anchor_output=(self.config.mode == "training" and c is not None))
self.current_sample = sample
self.last_sample = sample
self.running = True
self.add_log(f"Custom sample loaded: a={a}, b={b}, c={c}")
return sample
def halt(self):
with self.lock:
self.running = False
self.add_log("Engine halted.")
def snapshot(self) -> Dict[str, Any]:
with self.lock:
return {
"config": asdict(self.config),
"running": self.running,
"iteration": self.iteration,
"current_error": self.current_error,
"current_loss": self.current_loss,
"cells": [c.to_dict() for c in self.cells],
"logs": self.logs,
"last_sample": self.last_sample,
"current_sample": self.current_sample,
"batch_remaining": len(self.batch_queue),
"loss_history": list(self.loss_history),
"error_history": list(self.error_history),
}
engine = SimEngine()
def run_loop():
while True:
if engine.running:
engine.physics_step()
time.sleep(0.04)
threading.Thread(target=run_loop, daemon=True).start()
@app.get("/", response_class=HTMLResponse)
async def get_ui():
return FileResponse("index.html")
@app.get("/state")
async def get_state():
return engine.snapshot()
@app.post("/config")
async def config(data: dict):
engine.configure(data)
return {"ok": True}
@app.post("/example")
async def example(data: dict):
family = data.get("dataset_family", engine.config.dataset_family)
sample = engine.generate_sample(family)
return sample
@app.post("/generate_batch")
async def generate_batch(data: dict):
count = int(data.get("count", engine.config.batch_size))
engine.start_batch(count)
return {"ok": True, "count": count}
@app.post("/test_custom")
async def test_custom(data: dict):
a = float(data["a"])
b = float(data["b"])
c = data.get("c", None)
c_val = float(c) if c not in [None, "", "null"] else None
engine.set_custom_sample(a, b, c_val)
return {"ok": True}
@app.post("/halt")
async def halt():
engine.halt()
return {"ok": True}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)