| from __future__ import annotations |
|
|
| import numpy as np |
|
|
| from driftwm.sim.boat import BoatSpec, thruster_wrench_body |
| from driftwm.utils import wrap_angle |
|
|
|
|
| def rot_body_to_world(theta: float) -> np.ndarray: |
| c = np.cos(theta) |
| s = np.sin(theta) |
| return np.array([[c, -s], [s, c]], dtype=np.float32) |
|
|
|
|
| def step_dynamics( |
| state: np.ndarray, |
| action: np.ndarray, |
| spec: BoatSpec, |
| params: dict[str, float], |
| flow_velocity: np.ndarray, |
| dt: float, |
| workspace: tuple[float, float, float, float], |
| boundary: str = "terminate", |
| ) -> tuple[np.ndarray, bool]: |
| action = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) |
| state = np.asarray(state, dtype=np.float32).copy() |
| x, y, theta, vx, vy, omega = state[:6] |
| u = state[6 : 6 + spec.action_dim] |
|
|
| tau_a = max(params["actuator_tau"], 1e-3) |
| alpha = min(1.0, dt / tau_a) |
| u_next = u + alpha * (action - u) |
|
|
| rot = rot_body_to_world(float(theta)) |
| vel_world = np.array([vx, vy], dtype=np.float32) |
| rel_world = vel_world - np.asarray(flow_velocity, dtype=np.float32) |
| rel_body = rot.T @ rel_world |
|
|
| d1 = np.array([params["drag_linear_x"], params["drag_linear_y"]], dtype=np.float32) |
| d2 = np.array([params["drag_quad_x"], params["drag_quad_y"]], dtype=np.float32) |
| drag_body = -d1 * rel_body - d2 * np.abs(rel_body) * rel_body |
|
|
| thrust_body, tau_thr = thruster_wrench_body(spec, u_next, params) |
| force_world = rot @ (thrust_body + drag_body) |
|
|
| tau_drag = -params["drag_angular"] * omega - params["drag_angular_quad"] * abs(float(omega)) * omega |
| tau_total = tau_thr + tau_drag |
|
|
| vel_next = vel_world + dt * force_world / params["mass"] |
| pos_next = np.array([x, y], dtype=np.float32) + dt * vel_next |
| omega_next = omega + dt * tau_total / params["inertia"] |
| theta_next = wrap_angle(theta + dt * omega_next) |
|
|
| xmin, xmax, ymin, ymax = workspace |
| done = bool(pos_next[0] < xmin or pos_next[0] > xmax or pos_next[1] < ymin or pos_next[1] > ymax) |
| if done and boundary == "bounce": |
| restitution = 0.45 |
| if pos_next[0] < xmin or pos_next[0] > xmax: |
| vel_next[0] *= -restitution |
| if pos_next[1] < ymin or pos_next[1] > ymax: |
| vel_next[1] *= -restitution |
| pos_next[0] = np.clip(pos_next[0], xmin, xmax) |
| pos_next[1] = np.clip(pos_next[1], ymin, ymax) |
| done = False |
| elif done and boundary == "clip": |
| pos_next[0] = np.clip(pos_next[0], xmin, xmax) |
| pos_next[1] = np.clip(pos_next[1], ymin, ymax) |
| done = False |
|
|
| next_state = state.copy() |
| next_state[:6] = np.array([pos_next[0], pos_next[1], theta_next, vel_next[0], vel_next[1], omega_next], dtype=np.float32) |
| next_state[6 : 6 + spec.action_dim] = u_next |
| return next_state.astype(np.float32), done |
|
|