Staticplay-CurioDynamics / physics_world.py
Xlbully's picture
Add CLI and example output
bdfcde8
import math
import random
from dataclasses import dataclass
@dataclass
class State:
x: float
y: float
vx: float
vy: float
class ProjectileWorld:
def __init__(self, g=9.81, drag=0.12, wind=0.4, dt=0.1, drag_power=2.0):
self.g = g
self.drag = drag
self.wind = wind
self.dt = dt
self.drag_power = drag_power
def step(self, s: State) -> State:
# Hidden physics: quadratic drag + wind
ax = -self.drag * s.vx * abs(s.vx) + self.wind
ay = -self.g - self.drag * s.vy * abs(s.vy)
vx = s.vx + ax * self.dt
vy = s.vy + ay * self.dt
x = s.x + vx * self.dt
y = s.y + vy * self.dt
return State(x=x, y=y, vx=vx, vy=vy)
class CuriosityAgent:
def __init__(self, dt=0.1):
self.dt = dt
# learned symbolic model coefficients for ax and ay
self.model_ax = {}
self.model_ay = {}
# Invented concepts
self.invented = {}
self.surprise_window = []
self.err_vx_window = []
self.window_size = 20
self.surprise_threshold = 0.06
self.stable_threshold = 0.5
self.drag_power_window = []
# regression buffers per episode
self.samples = []
self.feature_means = {"vx_abs_vx": 0.0, "vy_abs_vy": 0.0}
def predict(self, s: State) -> State:
# If no model yet, predict no acceleration
if not self.model_ax and not self.model_ay:
ax = 0.0
ay = 0.0
else:
ax = self._eval_model(self.model_ax, s)
ay = self._eval_model(self.model_ay, s)
vx = s.vx + ax * self.dt
vy = s.vy + ay * self.dt
x = s.x + vx * self.dt
y = s.y + vy * self.dt
return State(x=x, y=y, vx=vx, vy=vy)
def update(self, s: State, s_next: State):
pred = self.predict(s)
# prediction error on velocity (scaled)
err_vx = s_next.vx - pred.vx
err_vy = s_next.vy - pred.vy
surprise = math.sqrt(err_vx * err_vx + err_vy * err_vy)
# keep surprise window
self.surprise_window.append(surprise)
if len(self.surprise_window) > self.window_size:
self.surprise_window.pop(0)
self.err_vx_window.append(err_vx)
if len(self.err_vx_window) > self.window_size:
self.err_vx_window.pop(0)
# store samples for regression (ax, ay from finite differences)
ax = (s_next.vx - s.vx) / self.dt
ay = (s_next.vy - s.vy) / self.dt
self.samples.append((s.vx, s.vy, ax, ay))
self._maybe_invent(surprise)
return surprise
def _maybe_invent(self, surprise):
if len(self.surprise_window) < self.window_size:
return
high = sum(1 for s in self.surprise_window if s > self.surprise_threshold)
ratio = high / self.window_size
if ratio >= self.stable_threshold and "drag" not in self.invented:
self.invented["drag"] = {
"confidence": round(ratio, 3),
"evidence_window": list(self.surprise_window),
}
# wind discovery: persistent bias in vx error
if "model_update" not in self.invented and ratio >= self.stable_threshold:
self.invented["model_update"] = {"confidence": round(ratio, 3)}
def fit_params(self):
if len(self.samples) < 20:
self.samples.clear()
return
# Symbolic regression via sparse linear model on feature library
features_ax_linear = []
features_ax_quad = []
features_ay_linear = []
features_ay_quad = []
targets_ax = []
targets_ay = []
mean_vx_abs_vx = sum((vx * abs(vx)) for vx, _, _, _ in self.samples) / len(self.samples)
mean_vy_abs_vy = sum((vy * abs(vy)) for _, vy, _, _ in self.samples) / len(self.samples)
self.feature_means["vx_abs_vx"] = mean_vx_abs_vx
self.feature_means["vy_abs_vy"] = mean_vy_abs_vy
for vx, vy, ax, ay in self.samples:
features_ax_linear.append({"1": 1.0, "vx": vx})
features_ax_quad.append({"1": 1.0, "vx_abs_vx": (vx * abs(vx)) - mean_vx_abs_vx})
features_ay_linear.append({"1": 1.0, "vy": vy})
features_ay_quad.append({"1": 1.0, "vy_abs_vy": (vy * abs(vy)) - mean_vy_abs_vy})
targets_ax.append(ax)
targets_ay.append(ay)
coeff_ax_lin, mse_ax_lin = self._fit_sparse(features_ax_linear, targets_ax, return_mse=True, center=True)
coeff_ax_quad, mse_ax_quad = self._fit_sparse(features_ax_quad, targets_ax, return_mse=True, center=True)
coeff_ay_lin, mse_ay_lin = self._fit_sparse(features_ay_linear, targets_ay, return_mse=True, center=True)
coeff_ay_quad, mse_ay_quad = self._fit_sparse(features_ay_quad, targets_ay, return_mse=True, center=True)
coeff_ax = coeff_ax_quad if mse_ax_quad < mse_ax_lin else coeff_ax_lin
coeff_ay = coeff_ay_quad if mse_ay_quad < mse_ay_lin else coeff_ay_lin
self.model_ax = coeff_ax
self.model_ay = coeff_ay
if self.model_ax or self.model_ay:
self.invented.setdefault(
"symbolic_model",
{"terms_ax": list(self.model_ax.keys()), "terms_ay": list(self.model_ay.keys())},
)
self.samples.clear()
def _fit_sparse(self, feature_rows, targets, return_mse=False, center=False):
# Sequential Thresholded Least Squares (SINDy-style)
keys = list(feature_rows[0].keys())
n = len(feature_rows)
# Build design matrix
X = [[row[k] for k in keys] for row in feature_rows]
y = targets[:]
y_mean = 0.0
if center:
y_mean = sum(y) / len(y)
y = [v - y_mean for v in y]
# Normalize columns (except constant)
means = [0.0] * len(keys)
stds = [1.0] * len(keys)
for j, k in enumerate(keys):
if k == "1":
means[j] = 0.0
stds[j] = 1.0
continue
col = [X[i][j] for i in range(n)]
m = sum(col) / n
v = sum((c - m) ** 2 for c in col) / n
s = math.sqrt(v) if v > 1e-12 else 1.0
means[j] = m
stds[j] = s
for i in range(n):
X[i][j] = (X[i][j] - m) / s
active = set(range(len(keys)))
coeff = [0.0] * len(keys)
def solve_least_squares(active_idx):
# Normal equations on active set
a_idx = sorted(active_idx)
m = len(a_idx)
if m == 0:
return [0.0] * len(keys)
xtx = [[0.0 for _ in range(m)] for _ in range(m)]
xty = [0.0 for _ in range(m)]
for i in range(n):
row = [X[i][j] for j in a_idx]
for r in range(m):
xty[r] += row[r] * y[i]
for c in range(m):
xtx[r][c] += row[r] * row[c]
# Gauss-Seidel
beta = [0.0] * m
for _ in range(30):
for r in range(m):
denom = xtx[r][r] if abs(xtx[r][r]) > 1e-8 else 1e-8
num = xty[r]
for c in range(m):
if c == r:
continue
num -= xtx[r][c] * beta[c]
beta[r] = num / denom
full = [0.0] * len(keys)
for r, j in enumerate(a_idx):
full[j] = beta[r]
return full
# Iterative thresholding
for _ in range(6):
coeff = solve_least_squares(active)
# Unnormalize coefficients
coeff_unnorm = coeff[:]
for j, k in enumerate(keys):
if k == "1":
continue
coeff_unnorm[j] = coeff[j] / stds[j]
# Threshold
new_active = set(i for i, v in enumerate(coeff_unnorm) if abs(v) >= 0.02)
new_active.add(keys.index("1"))
if new_active == active:
coeff = coeff_unnorm
break
active = new_active
coeff = coeff_unnorm
pruned = {k: round(v, 3) for k, v in zip(keys, coeff) if abs(v) >= 0.02}
if center:
pruned["1"] = round(pruned.get("1", 0.0) + y_mean, 3)
if not return_mse:
return pruned
# compute mse
mse = 0.0
for row, y in zip(feature_rows, targets):
y_hat = sum(pruned.get(k, 0.0) * row[k] for k in row)
mse += (y - y_hat) ** 2
mse /= len(feature_rows)
return pruned, mse
def _eval_model(self, model, s: State):
features = {
"1": 1.0,
"vx": s.vx,
"vy": s.vy,
"vx_abs_vx": (s.vx * abs(s.vx)) - self.feature_means["vx_abs_vx"],
"vy_abs_vy": (s.vy * abs(s.vy)) - self.feature_means["vy_abs_vy"],
}
return sum(model.get(k, 0.0) * features[k] for k in model)
def run_stress_test(
episodes=50,
steps=200,
g=9.81,
drag=0.12,
wind=0.4,
dt=0.1,
drag_power=2.0,
seed=123
):
random.seed(seed)
world = ProjectileWorld(g=g, drag=drag, wind=wind, dt=dt, drag_power=drag_power)
agent = CuriosityAgent(dt=dt)
surprises = []
for _ in range(episodes):
# random launch
speed = random.uniform(8, 20)
angle = random.uniform(20, 70) * math.pi / 180.0
s = State(
x=0.0,
y=0.0,
vx=speed * math.cos(angle),
vy=speed * math.sin(angle),
)
for _ in range(steps):
s_next = world.step(s)
surprise = agent.update(s, s_next)
surprises.append(surprise)
s = s_next
if s.y < 0.0:
break
agent.fit_params()
# Interpret constants as wind and gravity when quadratic terms exist
wind_est = None
g_est = None
if "vx_abs_vx" in agent.model_ax and "1" in agent.model_ax:
wind_est = agent.model_ax["1"] - agent.model_ax["vx_abs_vx"] * agent.feature_means["vx_abs_vx"]
if "vy_abs_vy" in agent.model_ay and "1" in agent.model_ay:
g_est = -(agent.model_ay["1"] - agent.model_ay["vy_abs_vy"] * agent.feature_means["vy_abs_vy"])
return {
"g_true": g,
"drag_true": drag,
"wind_true": wind,
"drag_power_true": drag_power,
"model_ax": agent.model_ax,
"model_ay": agent.model_ay,
"wind_est": round(wind_est, 3) if wind_est is not None else None,
"g_est": round(g_est, 3) if g_est is not None else None,
"invented": agent.invented,
"avg_surprise": round(sum(surprises) / len(surprises), 3),
"max_surprise": round(max(surprises), 3),
"samples": len(surprises),
}
def _parse_args():
import argparse
p = argparse.ArgumentParser(description="Staticplay CurioDynamics runner")
p.add_argument("--episodes", type=int, default=50)
p.add_argument("--steps", type=int, default=200)
p.add_argument("--g", type=float, default=9.81)
p.add_argument("--drag", type=float, default=0.12)
p.add_argument("--wind", type=float, default=0.4)
p.add_argument("--drag_power", type=float, default=2.0)
p.add_argument("--dt", type=float, default=0.1)
p.add_argument("--seed", type=int, default=123)
return p.parse_args()
if __name__ == "__main__":
args = _parse_args()
result = run_stress_test(
episodes=args.episodes,
steps=args.steps,
g=args.g,
drag=args.drag,
wind=args.wind,
drag_power=args.drag_power,
dt=args.dt,
seed=args.seed,
)
print(result)