FlowMo-WM / driftwm /data /generate.py
cccat6's picture
Update FlowMo-WM code and static flow protocol
ccf9f1b verified
from __future__ import annotations
import argparse
import copy
import json
from pathlib import Path
import numpy as np
from driftwm.sim.env import SurfaceBoatEnv
from driftwm.sim.flow import sample_flow
from driftwm.utils import ensure_dir, pad_action
BOAT_TO_ID = {"twin": 0, "triangle": 1}
ID_TO_BOAT = {v: k for k, v in BOAT_TO_ID.items()}
FLOW_TO_ID = {
"noflow": 0,
"uniform": 1,
"vortex_center": 2,
"double_gyre": 3,
"source_sink": 4,
"source_sink_pair": 5,
"gradient": 6,
"shear": 7,
"turbulent_patch": 8,
"random_fourier": 9,
}
ID_TO_FLOW = {v: k for k, v in FLOW_TO_ID.items()}
TRAJ_TO_ID = {
"noflow_random_action": 0,
"noflow_action_then_zero": 1,
"flow_zero_action": 2,
"flow_active_control": 3,
"flow_waypoint_control": 4,
}
ID_TO_TRAJ = {v: k for k, v in TRAJ_TO_ID.items()}
PAPER_FLOW_POOL_SIZE = 80
def smooth_random_actions(rng: np.random.Generator, steps: int, action_dim: int, scale: float = 1.0) -> np.ndarray:
actions = np.zeros((steps, action_dim), dtype=np.float32)
current = rng.uniform(-0.2, 0.2, size=action_dim).astype(np.float32)
for t in range(steps):
if t % 12 == 0:
target = rng.uniform(-1.0, 1.0, size=action_dim).astype(np.float32)
current = 0.82 * current + 0.18 * target
actions[t] = np.clip(scale * current, -1.0, 1.0)
return actions
def simple_goal_policy(env: SurfaceBoatEnv, goal: np.ndarray, rng: np.random.Generator) -> np.ndarray:
pos = env.state[:2]
theta = float(env.state[2])
delta = goal - pos
target_angle = float(np.arctan2(delta[1], delta[0]))
err = float(np.arctan2(np.sin(target_angle - theta), np.cos(target_angle - theta)))
if env.spec.name == "twin":
forward = np.clip(0.35 + 0.55 * np.cos(err), -0.4, 0.9)
turn = np.clip(-0.75 * np.sin(err), -0.8, 0.8)
action = np.array([forward + turn, forward - turn], dtype=np.float32)
else:
base = np.array([np.sin(err), -0.5 * np.sin(err) + 0.35, -0.5 * np.sin(err) - 0.35], dtype=np.float32)
action = 0.7 * base
action += rng.normal(0.0, 0.08, size=env.action_dim).astype(np.float32)
return np.clip(action, -1.0, 1.0)
def choose_trajectory_type(rng: np.random.Generator) -> str:
keys = list(TRAJ_TO_ID)
weights = np.array([0.14, 0.14, 0.14, 0.28, 0.30], dtype=np.float64)
weights = weights[: len(keys)]
weights = weights / weights.sum()
return str(rng.choice(keys, p=weights))
def make_flow_pool(
flow_types: list[str],
rng: np.random.Generator,
workspace: tuple[float, float, float, float],
pool_size: int = PAPER_FLOW_POOL_SIZE,
) -> dict[str, list]:
pool = {ft: [] for ft in flow_types}
fid = 1
for ft in flow_types:
if ft == "noflow":
pool[ft] = [sample_flow("noflow", rng, flow_id=0, workspace=workspace)]
continue
for _ in range(pool_size):
pool[ft].append(sample_flow(ft, rng, flow_id=fid, workspace=workspace))
fid += 1
return pool
def generate_dataset(
boats: list[str],
flow_types: list[str],
episodes: int,
steps: int,
out: str | Path,
seed: int = 0,
workspace: tuple[float, float, float, float] = (0.0, 10.0, 0.0, 10.0),
boundary: str = "terminate",
randomize_params: bool = True,
flow_pool_size: int = PAPER_FLOW_POOL_SIZE,
) -> None:
rng = np.random.default_rng(seed)
out = Path(out)
ensure_dir(out.parent)
flow_pool = make_flow_pool(flow_types, rng, workspace, flow_pool_size)
obs = np.zeros((episodes, steps + 1, 4), dtype=np.float32)
actions = np.zeros((episodes, steps, 3), dtype=np.float32)
states = np.zeros((episodes, steps + 1, 9), dtype=np.float32)
true_flow = np.zeros((episodes, steps + 1, 2), dtype=np.float32)
boat_ids = np.zeros((episodes,), dtype=np.int64)
action_dims = np.zeros((episodes,), dtype=np.int64)
flow_type_ids = np.zeros((episodes,), dtype=np.int64)
flow_ids = np.zeros((episodes,), dtype=np.int64)
traj_type_ids = np.zeros((episodes,), dtype=np.int64)
env = SurfaceBoatEnv(
seed=seed,
episode_steps=steps,
workspace=workspace,
boundary=boundary,
randomize_params=randomize_params,
)
for ep in range(episodes):
traj_type = choose_trajectory_type(rng)
boat = boats[int(rng.integers(0, len(boats)))]
if traj_type.startswith("noflow"):
flow_type = "noflow"
else:
available = [ft for ft in flow_types if ft != "noflow"]
if not available:
raise ValueError("flow_types must include at least one nonzero flow family")
flow_type = available[int(rng.integers(0, len(available)))]
flow_template = flow_pool[flow_type][int(rng.integers(0, len(flow_pool[flow_type])))]
flow = copy.deepcopy(flow_template)
random_velocity = traj_type != "flow_zero_action"
env.reset(
boat=boat,
flow_type=flow_type,
flow=flow,
random_velocity=random_velocity,
randomize_params=randomize_params,
)
if traj_type == "noflow_random_action":
planned_actions = smooth_random_actions(rng, steps, env.action_dim, scale=1.0)
elif traj_type == "noflow_action_then_zero":
planned_actions = np.zeros((steps, env.action_dim), dtype=np.float32)
push_len = max(12, min(50, steps // 4))
planned_actions[:push_len] = smooth_random_actions(rng, push_len, env.action_dim, scale=0.95)
elif traj_type == "flow_zero_action":
planned_actions = np.zeros((steps, env.action_dim), dtype=np.float32)
env.state[3:6] = 0.0
else:
planned_actions = smooth_random_actions(rng, steps, env.action_dim, scale=0.7)
goal = rng.uniform([1.5, 1.5], [8.5, 8.5]).astype(np.float32)
waypoints = rng.uniform([1.5, 1.5], [8.5, 8.5], size=(4, 2)).astype(np.float32)
waypoint_idx = 0
boat_ids[ep] = BOAT_TO_ID[boat]
action_dims[ep] = env.action_dim
flow_type_ids[ep] = FLOW_TO_ID[flow_type]
flow_ids[ep] = int(env.flow.flow_id)
traj_type_ids[ep] = TRAJ_TO_ID[traj_type]
obs[ep, 0] = env.observation()
states[ep, 0, : 6 + env.action_dim] = env.full_state()
true_flow[ep, 0] = env.flow_at(env.state[:2])
done = False
for t in range(steps):
if traj_type == "flow_waypoint_control":
goal = waypoints[min(waypoint_idx, len(waypoints) - 1)]
if np.linalg.norm(env.state[:2] - goal) < 0.75 and waypoint_idx < len(waypoints) - 1:
waypoint_idx += 1
goal = waypoints[waypoint_idx]
if rng.random() < 0.88:
action = simple_goal_policy(env, goal, rng)
else:
action = planned_actions[t]
elif traj_type == "flow_active_control" and rng.random() < 0.75:
action = simple_goal_policy(env, goal, rng)
else:
action = planned_actions[t]
actions[ep, t] = pad_action(action)
ob, _, done, _ = env.step(action)
obs[ep, t + 1] = ob
states[ep, t + 1, : 6 + env.action_dim] = env.full_state()
true_flow[ep, t + 1] = env.flow_at(env.state[:2])
if done:
obs[ep, t + 2 :] = obs[ep, t + 1]
states[ep, t + 2 :] = states[ep, t + 1]
true_flow[ep, t + 2 :] = true_flow[ep, t + 1]
break
metadata = {
"boats": BOAT_TO_ID,
"flows": FLOW_TO_ID,
"trajectories": TRAJ_TO_ID,
"steps": steps,
"workspace": list(workspace),
"boundary": boundary,
"seed": seed,
"max_action_dim": 3,
"flow_pool_size": flow_pool_size,
}
np.savez_compressed(
out,
obs=obs,
actions=actions,
states=states,
true_flow=true_flow,
boat_ids=boat_ids,
action_dims=action_dims,
flow_type_ids=flow_type_ids,
flow_ids=flow_ids,
traj_type_ids=traj_type_ids,
metadata=json.dumps(metadata),
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--boats", nargs="+", choices=list(BOAT_TO_ID), default=["twin"])
parser.add_argument("--flow-types", nargs="+", choices=list(FLOW_TO_ID), default=["noflow", "uniform"])
parser.add_argument("--episodes", type=int, default=2000)
parser.add_argument("--steps", type=int, default=200)
parser.add_argument("--out", required=True)
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--boundary", choices=["terminate", "bounce", "clip"], default="terminate")
parser.add_argument("--flow-pool-size", type=int, default=PAPER_FLOW_POOL_SIZE)
parser.add_argument("--no-randomize-params", action="store_true")
args = parser.parse_args()
generate_dataset(
boats=args.boats,
flow_types=args.flow_types,
episodes=args.episodes,
steps=args.steps,
out=args.out,
seed=args.seed,
boundary=args.boundary,
flow_pool_size=args.flow_pool_size,
randomize_params=not args.no_randomize_params,
)
if __name__ == "__main__":
main()