| import math |
| import random |
| from dataclasses import dataclass |
|
|
|
|
| @dataclass |
| class State: |
| x: float |
| y: float |
| vx: float |
| vy: float |
|
|
|
|
| class ProjectileWorld: |
| def __init__(self, g=9.81, drag=0.12, wind=0.4, dt=0.1, drag_power=2.0): |
| self.g = g |
| self.drag = drag |
| self.wind = wind |
| self.dt = dt |
| self.drag_power = drag_power |
|
|
| def step(self, s: State) -> State: |
| |
| ax = -self.drag * s.vx * abs(s.vx) + self.wind |
| ay = -self.g - self.drag * s.vy * abs(s.vy) |
|
|
| vx = s.vx + ax * self.dt |
| vy = s.vy + ay * self.dt |
| x = s.x + vx * self.dt |
| y = s.y + vy * self.dt |
| return State(x=x, y=y, vx=vx, vy=vy) |
|
|
|
|
| class CuriosityAgent: |
| def __init__(self, dt=0.1): |
| self.dt = dt |
| |
| self.model_ax = {} |
| self.model_ay = {} |
| |
| self.invented = {} |
| self.surprise_window = [] |
| self.err_vx_window = [] |
| self.window_size = 20 |
| self.surprise_threshold = 0.06 |
| self.stable_threshold = 0.5 |
| self.drag_power_window = [] |
| |
| self.samples = [] |
| self.feature_means = {"vx_abs_vx": 0.0, "vy_abs_vy": 0.0} |
|
|
| def predict(self, s: State) -> State: |
| |
| if not self.model_ax and not self.model_ay: |
| ax = 0.0 |
| ay = 0.0 |
| else: |
| ax = self._eval_model(self.model_ax, s) |
| ay = self._eval_model(self.model_ay, s) |
| vx = s.vx + ax * self.dt |
| vy = s.vy + ay * self.dt |
| x = s.x + vx * self.dt |
| y = s.y + vy * self.dt |
| return State(x=x, y=y, vx=vx, vy=vy) |
|
|
| def update(self, s: State, s_next: State): |
| pred = self.predict(s) |
| |
| err_vx = s_next.vx - pred.vx |
| err_vy = s_next.vy - pred.vy |
| surprise = math.sqrt(err_vx * err_vx + err_vy * err_vy) |
|
|
| |
| self.surprise_window.append(surprise) |
| if len(self.surprise_window) > self.window_size: |
| self.surprise_window.pop(0) |
| self.err_vx_window.append(err_vx) |
| if len(self.err_vx_window) > self.window_size: |
| self.err_vx_window.pop(0) |
|
|
| |
| ax = (s_next.vx - s.vx) / self.dt |
| ay = (s_next.vy - s.vy) / self.dt |
| self.samples.append((s.vx, s.vy, ax, ay)) |
|
|
| self._maybe_invent(surprise) |
|
|
| return surprise |
|
|
| def _maybe_invent(self, surprise): |
| if len(self.surprise_window) < self.window_size: |
| return |
| high = sum(1 for s in self.surprise_window if s > self.surprise_threshold) |
| ratio = high / self.window_size |
| if ratio >= self.stable_threshold and "drag" not in self.invented: |
| self.invented["drag"] = { |
| "confidence": round(ratio, 3), |
| "evidence_window": list(self.surprise_window), |
| } |
| |
| if "model_update" not in self.invented and ratio >= self.stable_threshold: |
| self.invented["model_update"] = {"confidence": round(ratio, 3)} |
|
|
| def fit_params(self): |
| if len(self.samples) < 20: |
| self.samples.clear() |
| return |
|
|
| |
| features_ax_linear = [] |
| features_ax_quad = [] |
| features_ay_linear = [] |
| features_ay_quad = [] |
| targets_ax = [] |
| targets_ay = [] |
| mean_vx_abs_vx = sum((vx * abs(vx)) for vx, _, _, _ in self.samples) / len(self.samples) |
| mean_vy_abs_vy = sum((vy * abs(vy)) for _, vy, _, _ in self.samples) / len(self.samples) |
| self.feature_means["vx_abs_vx"] = mean_vx_abs_vx |
| self.feature_means["vy_abs_vy"] = mean_vy_abs_vy |
|
|
| for vx, vy, ax, ay in self.samples: |
| features_ax_linear.append({"1": 1.0, "vx": vx}) |
| features_ax_quad.append({"1": 1.0, "vx_abs_vx": (vx * abs(vx)) - mean_vx_abs_vx}) |
| features_ay_linear.append({"1": 1.0, "vy": vy}) |
| features_ay_quad.append({"1": 1.0, "vy_abs_vy": (vy * abs(vy)) - mean_vy_abs_vy}) |
| targets_ax.append(ax) |
| targets_ay.append(ay) |
|
|
| coeff_ax_lin, mse_ax_lin = self._fit_sparse(features_ax_linear, targets_ax, return_mse=True, center=True) |
| coeff_ax_quad, mse_ax_quad = self._fit_sparse(features_ax_quad, targets_ax, return_mse=True, center=True) |
| coeff_ay_lin, mse_ay_lin = self._fit_sparse(features_ay_linear, targets_ay, return_mse=True, center=True) |
| coeff_ay_quad, mse_ay_quad = self._fit_sparse(features_ay_quad, targets_ay, return_mse=True, center=True) |
|
|
| coeff_ax = coeff_ax_quad if mse_ax_quad < mse_ax_lin else coeff_ax_lin |
| coeff_ay = coeff_ay_quad if mse_ay_quad < mse_ay_lin else coeff_ay_lin |
| self.model_ax = coeff_ax |
| self.model_ay = coeff_ay |
|
|
| if self.model_ax or self.model_ay: |
| self.invented.setdefault( |
| "symbolic_model", |
| {"terms_ax": list(self.model_ax.keys()), "terms_ay": list(self.model_ay.keys())}, |
| ) |
|
|
| self.samples.clear() |
|
|
| def _fit_sparse(self, feature_rows, targets, return_mse=False, center=False): |
| |
| keys = list(feature_rows[0].keys()) |
| n = len(feature_rows) |
|
|
| |
| X = [[row[k] for k in keys] for row in feature_rows] |
| y = targets[:] |
| y_mean = 0.0 |
| if center: |
| y_mean = sum(y) / len(y) |
| y = [v - y_mean for v in y] |
|
|
| |
| means = [0.0] * len(keys) |
| stds = [1.0] * len(keys) |
| for j, k in enumerate(keys): |
| if k == "1": |
| means[j] = 0.0 |
| stds[j] = 1.0 |
| continue |
| col = [X[i][j] for i in range(n)] |
| m = sum(col) / n |
| v = sum((c - m) ** 2 for c in col) / n |
| s = math.sqrt(v) if v > 1e-12 else 1.0 |
| means[j] = m |
| stds[j] = s |
| for i in range(n): |
| X[i][j] = (X[i][j] - m) / s |
|
|
| active = set(range(len(keys))) |
| coeff = [0.0] * len(keys) |
|
|
| def solve_least_squares(active_idx): |
| |
| a_idx = sorted(active_idx) |
| m = len(a_idx) |
| if m == 0: |
| return [0.0] * len(keys) |
| xtx = [[0.0 for _ in range(m)] for _ in range(m)] |
| xty = [0.0 for _ in range(m)] |
| for i in range(n): |
| row = [X[i][j] for j in a_idx] |
| for r in range(m): |
| xty[r] += row[r] * y[i] |
| for c in range(m): |
| xtx[r][c] += row[r] * row[c] |
| |
| beta = [0.0] * m |
| for _ in range(30): |
| for r in range(m): |
| denom = xtx[r][r] if abs(xtx[r][r]) > 1e-8 else 1e-8 |
| num = xty[r] |
| for c in range(m): |
| if c == r: |
| continue |
| num -= xtx[r][c] * beta[c] |
| beta[r] = num / denom |
| full = [0.0] * len(keys) |
| for r, j in enumerate(a_idx): |
| full[j] = beta[r] |
| return full |
|
|
| |
| for _ in range(6): |
| coeff = solve_least_squares(active) |
| |
| coeff_unnorm = coeff[:] |
| for j, k in enumerate(keys): |
| if k == "1": |
| continue |
| coeff_unnorm[j] = coeff[j] / stds[j] |
| |
| new_active = set(i for i, v in enumerate(coeff_unnorm) if abs(v) >= 0.02) |
| new_active.add(keys.index("1")) |
| if new_active == active: |
| coeff = coeff_unnorm |
| break |
| active = new_active |
| coeff = coeff_unnorm |
|
|
| pruned = {k: round(v, 3) for k, v in zip(keys, coeff) if abs(v) >= 0.02} |
| if center: |
| pruned["1"] = round(pruned.get("1", 0.0) + y_mean, 3) |
|
|
| if not return_mse: |
| return pruned |
|
|
| |
| mse = 0.0 |
| for row, y in zip(feature_rows, targets): |
| y_hat = sum(pruned.get(k, 0.0) * row[k] for k in row) |
| mse += (y - y_hat) ** 2 |
| mse /= len(feature_rows) |
| return pruned, mse |
|
|
| def _eval_model(self, model, s: State): |
| features = { |
| "1": 1.0, |
| "vx": s.vx, |
| "vy": s.vy, |
| "vx_abs_vx": (s.vx * abs(s.vx)) - self.feature_means["vx_abs_vx"], |
| "vy_abs_vy": (s.vy * abs(s.vy)) - self.feature_means["vy_abs_vy"], |
| } |
| return sum(model.get(k, 0.0) * features[k] for k in model) |
|
|
|
|
| def run_stress_test( |
| episodes=50, |
| steps=200, |
| g=9.81, |
| drag=0.12, |
| wind=0.4, |
| dt=0.1, |
| drag_power=2.0, |
| seed=123 |
| ): |
| random.seed(seed) |
| world = ProjectileWorld(g=g, drag=drag, wind=wind, dt=dt, drag_power=drag_power) |
| agent = CuriosityAgent(dt=dt) |
|
|
| surprises = [] |
| for _ in range(episodes): |
| |
| speed = random.uniform(8, 20) |
| angle = random.uniform(20, 70) * math.pi / 180.0 |
| s = State( |
| x=0.0, |
| y=0.0, |
| vx=speed * math.cos(angle), |
| vy=speed * math.sin(angle), |
| ) |
| for _ in range(steps): |
| s_next = world.step(s) |
| surprise = agent.update(s, s_next) |
| surprises.append(surprise) |
| s = s_next |
| if s.y < 0.0: |
| break |
| agent.fit_params() |
|
|
| |
| wind_est = None |
| g_est = None |
| if "vx_abs_vx" in agent.model_ax and "1" in agent.model_ax: |
| wind_est = agent.model_ax["1"] - agent.model_ax["vx_abs_vx"] * agent.feature_means["vx_abs_vx"] |
| if "vy_abs_vy" in agent.model_ay and "1" in agent.model_ay: |
| g_est = -(agent.model_ay["1"] - agent.model_ay["vy_abs_vy"] * agent.feature_means["vy_abs_vy"]) |
|
|
| return { |
| "g_true": g, |
| "drag_true": drag, |
| "wind_true": wind, |
| "drag_power_true": drag_power, |
| "model_ax": agent.model_ax, |
| "model_ay": agent.model_ay, |
| "wind_est": round(wind_est, 3) if wind_est is not None else None, |
| "g_est": round(g_est, 3) if g_est is not None else None, |
| "invented": agent.invented, |
| "avg_surprise": round(sum(surprises) / len(surprises), 3), |
| "max_surprise": round(max(surprises), 3), |
| "samples": len(surprises), |
| } |
|
|
|
|
| def _parse_args(): |
| import argparse |
|
|
| p = argparse.ArgumentParser(description="Staticplay CurioDynamics runner") |
| p.add_argument("--episodes", type=int, default=50) |
| p.add_argument("--steps", type=int, default=200) |
| p.add_argument("--g", type=float, default=9.81) |
| p.add_argument("--drag", type=float, default=0.12) |
| p.add_argument("--wind", type=float, default=0.4) |
| p.add_argument("--drag_power", type=float, default=2.0) |
| p.add_argument("--dt", type=float, default=0.1) |
| p.add_argument("--seed", type=int, default=123) |
| return p.parse_args() |
|
|
|
|
| if __name__ == "__main__": |
| args = _parse_args() |
| result = run_stress_test( |
| episodes=args.episodes, |
| steps=args.steps, |
| g=args.g, |
| drag=args.drag, |
| wind=args.wind, |
| drag_power=args.drag_power, |
| dt=args.dt, |
| seed=args.seed, |
| ) |
| print(result) |
|
|