from __future__ import annotations import argparse import copy import json from pathlib import Path import numpy as np from driftwm.sim.env import SurfaceBoatEnv from driftwm.sim.flow import sample_flow from driftwm.utils import ensure_dir, pad_action BOAT_TO_ID = {"twin": 0, "triangle": 1} ID_TO_BOAT = {v: k for k, v in BOAT_TO_ID.items()} FLOW_TO_ID = { "noflow": 0, "uniform": 1, "vortex_center": 2, "double_gyre": 3, "source_sink": 4, "source_sink_pair": 5, "gradient": 6, "shear": 7, "turbulent_patch": 8, "random_fourier": 9, } ID_TO_FLOW = {v: k for k, v in FLOW_TO_ID.items()} TRAJ_TO_ID = { "noflow_random_action": 0, "noflow_action_then_zero": 1, "flow_zero_action": 2, "flow_active_control": 3, "flow_waypoint_control": 4, } ID_TO_TRAJ = {v: k for k, v in TRAJ_TO_ID.items()} PAPER_FLOW_POOL_SIZE = 80 def smooth_random_actions(rng: np.random.Generator, steps: int, action_dim: int, scale: float = 1.0) -> np.ndarray: actions = np.zeros((steps, action_dim), dtype=np.float32) current = rng.uniform(-0.2, 0.2, size=action_dim).astype(np.float32) for t in range(steps): if t % 12 == 0: target = rng.uniform(-1.0, 1.0, size=action_dim).astype(np.float32) current = 0.82 * current + 0.18 * target actions[t] = np.clip(scale * current, -1.0, 1.0) return actions def simple_goal_policy(env: SurfaceBoatEnv, goal: np.ndarray, rng: np.random.Generator) -> np.ndarray: pos = env.state[:2] theta = float(env.state[2]) delta = goal - pos target_angle = float(np.arctan2(delta[1], delta[0])) err = float(np.arctan2(np.sin(target_angle - theta), np.cos(target_angle - theta))) if env.spec.name == "twin": forward = np.clip(0.35 + 0.55 * np.cos(err), -0.4, 0.9) turn = np.clip(-0.75 * np.sin(err), -0.8, 0.8) action = np.array([forward + turn, forward - turn], dtype=np.float32) else: base = np.array([np.sin(err), -0.5 * np.sin(err) + 0.35, -0.5 * np.sin(err) - 0.35], dtype=np.float32) action = 0.7 * base action += rng.normal(0.0, 0.08, size=env.action_dim).astype(np.float32) return np.clip(action, -1.0, 1.0) def choose_trajectory_type(rng: np.random.Generator) -> str: keys = list(TRAJ_TO_ID) weights = np.array([0.14, 0.14, 0.14, 0.28, 0.30], dtype=np.float64) weights = weights[: len(keys)] weights = weights / weights.sum() return str(rng.choice(keys, p=weights)) def make_flow_pool( flow_types: list[str], rng: np.random.Generator, workspace: tuple[float, float, float, float], pool_size: int = PAPER_FLOW_POOL_SIZE, ) -> dict[str, list]: pool = {ft: [] for ft in flow_types} fid = 1 for ft in flow_types: if ft == "noflow": pool[ft] = [sample_flow("noflow", rng, flow_id=0, workspace=workspace)] continue for _ in range(pool_size): pool[ft].append(sample_flow(ft, rng, flow_id=fid, workspace=workspace)) fid += 1 return pool def generate_dataset( boats: list[str], flow_types: list[str], episodes: int, steps: int, out: str | Path, seed: int = 0, workspace: tuple[float, float, float, float] = (0.0, 10.0, 0.0, 10.0), boundary: str = "terminate", randomize_params: bool = True, flow_pool_size: int = PAPER_FLOW_POOL_SIZE, ) -> None: rng = np.random.default_rng(seed) out = Path(out) ensure_dir(out.parent) flow_pool = make_flow_pool(flow_types, rng, workspace, flow_pool_size) obs = np.zeros((episodes, steps + 1, 4), dtype=np.float32) actions = np.zeros((episodes, steps, 3), dtype=np.float32) states = np.zeros((episodes, steps + 1, 9), dtype=np.float32) true_flow = np.zeros((episodes, steps + 1, 2), dtype=np.float32) boat_ids = np.zeros((episodes,), dtype=np.int64) action_dims = np.zeros((episodes,), dtype=np.int64) flow_type_ids = np.zeros((episodes,), dtype=np.int64) flow_ids = np.zeros((episodes,), dtype=np.int64) traj_type_ids = np.zeros((episodes,), dtype=np.int64) env = SurfaceBoatEnv( seed=seed, episode_steps=steps, workspace=workspace, boundary=boundary, randomize_params=randomize_params, ) for ep in range(episodes): traj_type = choose_trajectory_type(rng) boat = boats[int(rng.integers(0, len(boats)))] if traj_type.startswith("noflow"): flow_type = "noflow" else: available = [ft for ft in flow_types if ft != "noflow"] if not available: raise ValueError("flow_types must include at least one nonzero flow family") flow_type = available[int(rng.integers(0, len(available)))] flow_template = flow_pool[flow_type][int(rng.integers(0, len(flow_pool[flow_type])))] flow = copy.deepcopy(flow_template) random_velocity = traj_type != "flow_zero_action" env.reset( boat=boat, flow_type=flow_type, flow=flow, random_velocity=random_velocity, randomize_params=randomize_params, ) if traj_type == "noflow_random_action": planned_actions = smooth_random_actions(rng, steps, env.action_dim, scale=1.0) elif traj_type == "noflow_action_then_zero": planned_actions = np.zeros((steps, env.action_dim), dtype=np.float32) push_len = max(12, min(50, steps // 4)) planned_actions[:push_len] = smooth_random_actions(rng, push_len, env.action_dim, scale=0.95) elif traj_type == "flow_zero_action": planned_actions = np.zeros((steps, env.action_dim), dtype=np.float32) env.state[3:6] = 0.0 else: planned_actions = smooth_random_actions(rng, steps, env.action_dim, scale=0.7) goal = rng.uniform([1.5, 1.5], [8.5, 8.5]).astype(np.float32) waypoints = rng.uniform([1.5, 1.5], [8.5, 8.5], size=(4, 2)).astype(np.float32) waypoint_idx = 0 boat_ids[ep] = BOAT_TO_ID[boat] action_dims[ep] = env.action_dim flow_type_ids[ep] = FLOW_TO_ID[flow_type] flow_ids[ep] = int(env.flow.flow_id) traj_type_ids[ep] = TRAJ_TO_ID[traj_type] obs[ep, 0] = env.observation() states[ep, 0, : 6 + env.action_dim] = env.full_state() true_flow[ep, 0] = env.flow_at(env.state[:2]) done = False for t in range(steps): if traj_type == "flow_waypoint_control": goal = waypoints[min(waypoint_idx, len(waypoints) - 1)] if np.linalg.norm(env.state[:2] - goal) < 0.75 and waypoint_idx < len(waypoints) - 1: waypoint_idx += 1 goal = waypoints[waypoint_idx] if rng.random() < 0.88: action = simple_goal_policy(env, goal, rng) else: action = planned_actions[t] elif traj_type == "flow_active_control" and rng.random() < 0.75: action = simple_goal_policy(env, goal, rng) else: action = planned_actions[t] actions[ep, t] = pad_action(action) ob, _, done, _ = env.step(action) obs[ep, t + 1] = ob states[ep, t + 1, : 6 + env.action_dim] = env.full_state() true_flow[ep, t + 1] = env.flow_at(env.state[:2]) if done: obs[ep, t + 2 :] = obs[ep, t + 1] states[ep, t + 2 :] = states[ep, t + 1] true_flow[ep, t + 2 :] = true_flow[ep, t + 1] break metadata = { "boats": BOAT_TO_ID, "flows": FLOW_TO_ID, "trajectories": TRAJ_TO_ID, "steps": steps, "workspace": list(workspace), "boundary": boundary, "seed": seed, "max_action_dim": 3, "flow_pool_size": flow_pool_size, } np.savez_compressed( out, obs=obs, actions=actions, states=states, true_flow=true_flow, boat_ids=boat_ids, action_dims=action_dims, flow_type_ids=flow_type_ids, flow_ids=flow_ids, traj_type_ids=traj_type_ids, metadata=json.dumps(metadata), ) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--boats", nargs="+", choices=list(BOAT_TO_ID), default=["twin"]) parser.add_argument("--flow-types", nargs="+", choices=list(FLOW_TO_ID), default=["noflow", "uniform"]) parser.add_argument("--episodes", type=int, default=2000) parser.add_argument("--steps", type=int, default=200) parser.add_argument("--out", required=True) parser.add_argument("--seed", type=int, default=0) parser.add_argument("--boundary", choices=["terminate", "bounce", "clip"], default="terminate") parser.add_argument("--flow-pool-size", type=int, default=PAPER_FLOW_POOL_SIZE) parser.add_argument("--no-randomize-params", action="store_true") args = parser.parse_args() generate_dataset( boats=args.boats, flow_types=args.flow_types, episodes=args.episodes, steps=args.steps, out=args.out, seed=args.seed, boundary=args.boundary, flow_pool_size=args.flow_pool_size, randomize_params=not args.no_randomize_params, ) if __name__ == "__main__": main()