| from __future__ import annotations |
|
|
| import argparse |
| import copy |
| import json |
| from pathlib import Path |
|
|
| import numpy as np |
|
|
| from driftwm.sim.env import SurfaceBoatEnv |
| from driftwm.sim.flow import sample_flow |
| from driftwm.utils import ensure_dir, pad_action |
|
|
|
|
| BOAT_TO_ID = {"twin": 0, "triangle": 1} |
| ID_TO_BOAT = {v: k for k, v in BOAT_TO_ID.items()} |
| FLOW_TO_ID = { |
| "noflow": 0, |
| "uniform": 1, |
| "vortex_center": 2, |
| "double_gyre": 3, |
| "source_sink": 4, |
| "source_sink_pair": 5, |
| "gradient": 6, |
| "shear": 7, |
| "turbulent_patch": 8, |
| "random_fourier": 9, |
| } |
| ID_TO_FLOW = {v: k for k, v in FLOW_TO_ID.items()} |
| TRAJ_TO_ID = { |
| "noflow_random_action": 0, |
| "noflow_action_then_zero": 1, |
| "flow_zero_action": 2, |
| "flow_active_control": 3, |
| "flow_waypoint_control": 4, |
| } |
| ID_TO_TRAJ = {v: k for k, v in TRAJ_TO_ID.items()} |
| PAPER_FLOW_POOL_SIZE = 80 |
|
|
|
|
| def smooth_random_actions(rng: np.random.Generator, steps: int, action_dim: int, scale: float = 1.0) -> np.ndarray: |
| actions = np.zeros((steps, action_dim), dtype=np.float32) |
| current = rng.uniform(-0.2, 0.2, size=action_dim).astype(np.float32) |
| for t in range(steps): |
| if t % 12 == 0: |
| target = rng.uniform(-1.0, 1.0, size=action_dim).astype(np.float32) |
| current = 0.82 * current + 0.18 * target |
| actions[t] = np.clip(scale * current, -1.0, 1.0) |
| return actions |
|
|
|
|
| def simple_goal_policy(env: SurfaceBoatEnv, goal: np.ndarray, rng: np.random.Generator) -> np.ndarray: |
| pos = env.state[:2] |
| theta = float(env.state[2]) |
| delta = goal - pos |
| target_angle = float(np.arctan2(delta[1], delta[0])) |
| err = float(np.arctan2(np.sin(target_angle - theta), np.cos(target_angle - theta))) |
| if env.spec.name == "twin": |
| forward = np.clip(0.35 + 0.55 * np.cos(err), -0.4, 0.9) |
| turn = np.clip(-0.75 * np.sin(err), -0.8, 0.8) |
| action = np.array([forward + turn, forward - turn], dtype=np.float32) |
| else: |
| base = np.array([np.sin(err), -0.5 * np.sin(err) + 0.35, -0.5 * np.sin(err) - 0.35], dtype=np.float32) |
| action = 0.7 * base |
| action += rng.normal(0.0, 0.08, size=env.action_dim).astype(np.float32) |
| return np.clip(action, -1.0, 1.0) |
|
|
|
|
| def choose_trajectory_type(rng: np.random.Generator) -> str: |
| keys = list(TRAJ_TO_ID) |
| weights = np.array([0.14, 0.14, 0.14, 0.28, 0.30], dtype=np.float64) |
| weights = weights[: len(keys)] |
| weights = weights / weights.sum() |
| return str(rng.choice(keys, p=weights)) |
|
|
|
|
| def make_flow_pool( |
| flow_types: list[str], |
| rng: np.random.Generator, |
| workspace: tuple[float, float, float, float], |
| pool_size: int = PAPER_FLOW_POOL_SIZE, |
| ) -> dict[str, list]: |
| pool = {ft: [] for ft in flow_types} |
| fid = 1 |
| for ft in flow_types: |
| if ft == "noflow": |
| pool[ft] = [sample_flow("noflow", rng, flow_id=0, workspace=workspace)] |
| continue |
| for _ in range(pool_size): |
| pool[ft].append(sample_flow(ft, rng, flow_id=fid, workspace=workspace)) |
| fid += 1 |
| return pool |
|
|
|
|
| def generate_dataset( |
| boats: list[str], |
| flow_types: list[str], |
| episodes: int, |
| steps: int, |
| out: str | Path, |
| seed: int = 0, |
| workspace: tuple[float, float, float, float] = (0.0, 10.0, 0.0, 10.0), |
| boundary: str = "terminate", |
| randomize_params: bool = True, |
| flow_pool_size: int = PAPER_FLOW_POOL_SIZE, |
| ) -> None: |
| rng = np.random.default_rng(seed) |
| out = Path(out) |
| ensure_dir(out.parent) |
| flow_pool = make_flow_pool(flow_types, rng, workspace, flow_pool_size) |
|
|
| obs = np.zeros((episodes, steps + 1, 4), dtype=np.float32) |
| actions = np.zeros((episodes, steps, 3), dtype=np.float32) |
| states = np.zeros((episodes, steps + 1, 9), dtype=np.float32) |
| true_flow = np.zeros((episodes, steps + 1, 2), dtype=np.float32) |
| boat_ids = np.zeros((episodes,), dtype=np.int64) |
| action_dims = np.zeros((episodes,), dtype=np.int64) |
| flow_type_ids = np.zeros((episodes,), dtype=np.int64) |
| flow_ids = np.zeros((episodes,), dtype=np.int64) |
| traj_type_ids = np.zeros((episodes,), dtype=np.int64) |
|
|
| env = SurfaceBoatEnv( |
| seed=seed, |
| episode_steps=steps, |
| workspace=workspace, |
| boundary=boundary, |
| randomize_params=randomize_params, |
| ) |
|
|
| for ep in range(episodes): |
| traj_type = choose_trajectory_type(rng) |
| boat = boats[int(rng.integers(0, len(boats)))] |
| if traj_type.startswith("noflow"): |
| flow_type = "noflow" |
| else: |
| available = [ft for ft in flow_types if ft != "noflow"] |
| if not available: |
| raise ValueError("flow_types must include at least one nonzero flow family") |
| flow_type = available[int(rng.integers(0, len(available)))] |
| flow_template = flow_pool[flow_type][int(rng.integers(0, len(flow_pool[flow_type])))] |
| flow = copy.deepcopy(flow_template) |
| random_velocity = traj_type != "flow_zero_action" |
| env.reset( |
| boat=boat, |
| flow_type=flow_type, |
| flow=flow, |
| random_velocity=random_velocity, |
| randomize_params=randomize_params, |
| ) |
| if traj_type == "noflow_random_action": |
| planned_actions = smooth_random_actions(rng, steps, env.action_dim, scale=1.0) |
| elif traj_type == "noflow_action_then_zero": |
| planned_actions = np.zeros((steps, env.action_dim), dtype=np.float32) |
| push_len = max(12, min(50, steps // 4)) |
| planned_actions[:push_len] = smooth_random_actions(rng, push_len, env.action_dim, scale=0.95) |
| elif traj_type == "flow_zero_action": |
| planned_actions = np.zeros((steps, env.action_dim), dtype=np.float32) |
| env.state[3:6] = 0.0 |
| else: |
| planned_actions = smooth_random_actions(rng, steps, env.action_dim, scale=0.7) |
| goal = rng.uniform([1.5, 1.5], [8.5, 8.5]).astype(np.float32) |
| waypoints = rng.uniform([1.5, 1.5], [8.5, 8.5], size=(4, 2)).astype(np.float32) |
| waypoint_idx = 0 |
|
|
| boat_ids[ep] = BOAT_TO_ID[boat] |
| action_dims[ep] = env.action_dim |
| flow_type_ids[ep] = FLOW_TO_ID[flow_type] |
| flow_ids[ep] = int(env.flow.flow_id) |
| traj_type_ids[ep] = TRAJ_TO_ID[traj_type] |
| obs[ep, 0] = env.observation() |
| states[ep, 0, : 6 + env.action_dim] = env.full_state() |
| true_flow[ep, 0] = env.flow_at(env.state[:2]) |
|
|
| done = False |
| for t in range(steps): |
| if traj_type == "flow_waypoint_control": |
| goal = waypoints[min(waypoint_idx, len(waypoints) - 1)] |
| if np.linalg.norm(env.state[:2] - goal) < 0.75 and waypoint_idx < len(waypoints) - 1: |
| waypoint_idx += 1 |
| goal = waypoints[waypoint_idx] |
| if rng.random() < 0.88: |
| action = simple_goal_policy(env, goal, rng) |
| else: |
| action = planned_actions[t] |
| elif traj_type == "flow_active_control" and rng.random() < 0.75: |
| action = simple_goal_policy(env, goal, rng) |
| else: |
| action = planned_actions[t] |
| actions[ep, t] = pad_action(action) |
| ob, _, done, _ = env.step(action) |
| obs[ep, t + 1] = ob |
| states[ep, t + 1, : 6 + env.action_dim] = env.full_state() |
| true_flow[ep, t + 1] = env.flow_at(env.state[:2]) |
| if done: |
| obs[ep, t + 2 :] = obs[ep, t + 1] |
| states[ep, t + 2 :] = states[ep, t + 1] |
| true_flow[ep, t + 2 :] = true_flow[ep, t + 1] |
| break |
|
|
| metadata = { |
| "boats": BOAT_TO_ID, |
| "flows": FLOW_TO_ID, |
| "trajectories": TRAJ_TO_ID, |
| "steps": steps, |
| "workspace": list(workspace), |
| "boundary": boundary, |
| "seed": seed, |
| "max_action_dim": 3, |
| "flow_pool_size": flow_pool_size, |
| } |
| np.savez_compressed( |
| out, |
| obs=obs, |
| actions=actions, |
| states=states, |
| true_flow=true_flow, |
| boat_ids=boat_ids, |
| action_dims=action_dims, |
| flow_type_ids=flow_type_ids, |
| flow_ids=flow_ids, |
| traj_type_ids=traj_type_ids, |
| metadata=json.dumps(metadata), |
| ) |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--boats", nargs="+", choices=list(BOAT_TO_ID), default=["twin"]) |
| parser.add_argument("--flow-types", nargs="+", choices=list(FLOW_TO_ID), default=["noflow", "uniform"]) |
| parser.add_argument("--episodes", type=int, default=2000) |
| parser.add_argument("--steps", type=int, default=200) |
| parser.add_argument("--out", required=True) |
| parser.add_argument("--seed", type=int, default=0) |
| parser.add_argument("--boundary", choices=["terminate", "bounce", "clip"], default="terminate") |
| parser.add_argument("--flow-pool-size", type=int, default=PAPER_FLOW_POOL_SIZE) |
| parser.add_argument("--no-randomize-params", action="store_true") |
| args = parser.parse_args() |
| generate_dataset( |
| boats=args.boats, |
| flow_types=args.flow_types, |
| episodes=args.episodes, |
| steps=args.steps, |
| out=args.out, |
| seed=args.seed, |
| boundary=args.boundary, |
| flow_pool_size=args.flow_pool_size, |
| randomize_params=not args.no_randomize_params, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|