import math import random from dataclasses import dataclass @dataclass class State: x: float y: float vx: float vy: float class ProjectileWorld: def __init__(self, g=9.81, drag=0.12, wind=0.4, dt=0.1, drag_power=2.0): self.g = g self.drag = drag self.wind = wind self.dt = dt self.drag_power = drag_power def step(self, s: State) -> State: # Hidden physics: quadratic drag + wind ax = -self.drag * s.vx * abs(s.vx) + self.wind ay = -self.g - self.drag * s.vy * abs(s.vy) vx = s.vx + ax * self.dt vy = s.vy + ay * self.dt x = s.x + vx * self.dt y = s.y + vy * self.dt return State(x=x, y=y, vx=vx, vy=vy) class CuriosityAgent: def __init__(self, dt=0.1): self.dt = dt # learned symbolic model coefficients for ax and ay self.model_ax = {} self.model_ay = {} # Invented concepts self.invented = {} self.surprise_window = [] self.err_vx_window = [] self.window_size = 20 self.surprise_threshold = 0.06 self.stable_threshold = 0.5 self.drag_power_window = [] # regression buffers per episode self.samples = [] self.feature_means = {"vx_abs_vx": 0.0, "vy_abs_vy": 0.0} def predict(self, s: State) -> State: # If no model yet, predict no acceleration if not self.model_ax and not self.model_ay: ax = 0.0 ay = 0.0 else: ax = self._eval_model(self.model_ax, s) ay = self._eval_model(self.model_ay, s) vx = s.vx + ax * self.dt vy = s.vy + ay * self.dt x = s.x + vx * self.dt y = s.y + vy * self.dt return State(x=x, y=y, vx=vx, vy=vy) def update(self, s: State, s_next: State): pred = self.predict(s) # prediction error on velocity (scaled) err_vx = s_next.vx - pred.vx err_vy = s_next.vy - pred.vy surprise = math.sqrt(err_vx * err_vx + err_vy * err_vy) # keep surprise window self.surprise_window.append(surprise) if len(self.surprise_window) > self.window_size: self.surprise_window.pop(0) self.err_vx_window.append(err_vx) if len(self.err_vx_window) > self.window_size: self.err_vx_window.pop(0) # store samples for regression (ax, ay from finite differences) ax = (s_next.vx - s.vx) / self.dt ay = (s_next.vy - s.vy) / self.dt self.samples.append((s.vx, s.vy, ax, ay)) self._maybe_invent(surprise) return surprise def _maybe_invent(self, surprise): if len(self.surprise_window) < self.window_size: return high = sum(1 for s in self.surprise_window if s > self.surprise_threshold) ratio = high / self.window_size if ratio >= self.stable_threshold and "drag" not in self.invented: self.invented["drag"] = { "confidence": round(ratio, 3), "evidence_window": list(self.surprise_window), } # wind discovery: persistent bias in vx error if "model_update" not in self.invented and ratio >= self.stable_threshold: self.invented["model_update"] = {"confidence": round(ratio, 3)} def fit_params(self): if len(self.samples) < 20: self.samples.clear() return # Symbolic regression via sparse linear model on feature library features_ax_linear = [] features_ax_quad = [] features_ay_linear = [] features_ay_quad = [] targets_ax = [] targets_ay = [] mean_vx_abs_vx = sum((vx * abs(vx)) for vx, _, _, _ in self.samples) / len(self.samples) mean_vy_abs_vy = sum((vy * abs(vy)) for _, vy, _, _ in self.samples) / len(self.samples) self.feature_means["vx_abs_vx"] = mean_vx_abs_vx self.feature_means["vy_abs_vy"] = mean_vy_abs_vy for vx, vy, ax, ay in self.samples: features_ax_linear.append({"1": 1.0, "vx": vx}) features_ax_quad.append({"1": 1.0, "vx_abs_vx": (vx * abs(vx)) - mean_vx_abs_vx}) features_ay_linear.append({"1": 1.0, "vy": vy}) features_ay_quad.append({"1": 1.0, "vy_abs_vy": (vy * abs(vy)) - mean_vy_abs_vy}) targets_ax.append(ax) targets_ay.append(ay) coeff_ax_lin, mse_ax_lin = self._fit_sparse(features_ax_linear, targets_ax, return_mse=True, center=True) coeff_ax_quad, mse_ax_quad = self._fit_sparse(features_ax_quad, targets_ax, return_mse=True, center=True) coeff_ay_lin, mse_ay_lin = self._fit_sparse(features_ay_linear, targets_ay, return_mse=True, center=True) coeff_ay_quad, mse_ay_quad = self._fit_sparse(features_ay_quad, targets_ay, return_mse=True, center=True) coeff_ax = coeff_ax_quad if mse_ax_quad < mse_ax_lin else coeff_ax_lin coeff_ay = coeff_ay_quad if mse_ay_quad < mse_ay_lin else coeff_ay_lin self.model_ax = coeff_ax self.model_ay = coeff_ay if self.model_ax or self.model_ay: self.invented.setdefault( "symbolic_model", {"terms_ax": list(self.model_ax.keys()), "terms_ay": list(self.model_ay.keys())}, ) self.samples.clear() def _fit_sparse(self, feature_rows, targets, return_mse=False, center=False): # Sequential Thresholded Least Squares (SINDy-style) keys = list(feature_rows[0].keys()) n = len(feature_rows) # Build design matrix X = [[row[k] for k in keys] for row in feature_rows] y = targets[:] y_mean = 0.0 if center: y_mean = sum(y) / len(y) y = [v - y_mean for v in y] # Normalize columns (except constant) means = [0.0] * len(keys) stds = [1.0] * len(keys) for j, k in enumerate(keys): if k == "1": means[j] = 0.0 stds[j] = 1.0 continue col = [X[i][j] for i in range(n)] m = sum(col) / n v = sum((c - m) ** 2 for c in col) / n s = math.sqrt(v) if v > 1e-12 else 1.0 means[j] = m stds[j] = s for i in range(n): X[i][j] = (X[i][j] - m) / s active = set(range(len(keys))) coeff = [0.0] * len(keys) def solve_least_squares(active_idx): # Normal equations on active set a_idx = sorted(active_idx) m = len(a_idx) if m == 0: return [0.0] * len(keys) xtx = [[0.0 for _ in range(m)] for _ in range(m)] xty = [0.0 for _ in range(m)] for i in range(n): row = [X[i][j] for j in a_idx] for r in range(m): xty[r] += row[r] * y[i] for c in range(m): xtx[r][c] += row[r] * row[c] # Gauss-Seidel beta = [0.0] * m for _ in range(30): for r in range(m): denom = xtx[r][r] if abs(xtx[r][r]) > 1e-8 else 1e-8 num = xty[r] for c in range(m): if c == r: continue num -= xtx[r][c] * beta[c] beta[r] = num / denom full = [0.0] * len(keys) for r, j in enumerate(a_idx): full[j] = beta[r] return full # Iterative thresholding for _ in range(6): coeff = solve_least_squares(active) # Unnormalize coefficients coeff_unnorm = coeff[:] for j, k in enumerate(keys): if k == "1": continue coeff_unnorm[j] = coeff[j] / stds[j] # Threshold new_active = set(i for i, v in enumerate(coeff_unnorm) if abs(v) >= 0.02) new_active.add(keys.index("1")) if new_active == active: coeff = coeff_unnorm break active = new_active coeff = coeff_unnorm pruned = {k: round(v, 3) for k, v in zip(keys, coeff) if abs(v) >= 0.02} if center: pruned["1"] = round(pruned.get("1", 0.0) + y_mean, 3) if not return_mse: return pruned # compute mse mse = 0.0 for row, y in zip(feature_rows, targets): y_hat = sum(pruned.get(k, 0.0) * row[k] for k in row) mse += (y - y_hat) ** 2 mse /= len(feature_rows) return pruned, mse def _eval_model(self, model, s: State): features = { "1": 1.0, "vx": s.vx, "vy": s.vy, "vx_abs_vx": (s.vx * abs(s.vx)) - self.feature_means["vx_abs_vx"], "vy_abs_vy": (s.vy * abs(s.vy)) - self.feature_means["vy_abs_vy"], } return sum(model.get(k, 0.0) * features[k] for k in model) def run_stress_test( episodes=50, steps=200, g=9.81, drag=0.12, wind=0.4, dt=0.1, drag_power=2.0, seed=123 ): random.seed(seed) world = ProjectileWorld(g=g, drag=drag, wind=wind, dt=dt, drag_power=drag_power) agent = CuriosityAgent(dt=dt) surprises = [] for _ in range(episodes): # random launch speed = random.uniform(8, 20) angle = random.uniform(20, 70) * math.pi / 180.0 s = State( x=0.0, y=0.0, vx=speed * math.cos(angle), vy=speed * math.sin(angle), ) for _ in range(steps): s_next = world.step(s) surprise = agent.update(s, s_next) surprises.append(surprise) s = s_next if s.y < 0.0: break agent.fit_params() # Interpret constants as wind and gravity when quadratic terms exist wind_est = None g_est = None if "vx_abs_vx" in agent.model_ax and "1" in agent.model_ax: wind_est = agent.model_ax["1"] - agent.model_ax["vx_abs_vx"] * agent.feature_means["vx_abs_vx"] if "vy_abs_vy" in agent.model_ay and "1" in agent.model_ay: g_est = -(agent.model_ay["1"] - agent.model_ay["vy_abs_vy"] * agent.feature_means["vy_abs_vy"]) return { "g_true": g, "drag_true": drag, "wind_true": wind, "drag_power_true": drag_power, "model_ax": agent.model_ax, "model_ay": agent.model_ay, "wind_est": round(wind_est, 3) if wind_est is not None else None, "g_est": round(g_est, 3) if g_est is not None else None, "invented": agent.invented, "avg_surprise": round(sum(surprises) / len(surprises), 3), "max_surprise": round(max(surprises), 3), "samples": len(surprises), } def _parse_args(): import argparse p = argparse.ArgumentParser(description="Staticplay CurioDynamics runner") p.add_argument("--episodes", type=int, default=50) p.add_argument("--steps", type=int, default=200) p.add_argument("--g", type=float, default=9.81) p.add_argument("--drag", type=float, default=0.12) p.add_argument("--wind", type=float, default=0.4) p.add_argument("--drag_power", type=float, default=2.0) p.add_argument("--dt", type=float, default=0.1) p.add_argument("--seed", type=int, default=123) return p.parse_args() if __name__ == "__main__": args = _parse_args() result = run_stress_test( episodes=args.episodes, steps=args.steps, g=args.g, drag=args.drag, wind=args.wind, drag_power=args.drag_power, dt=args.dt, seed=args.seed, ) print(result)