from __future__ import annotations from dataclasses import dataclass from typing import Any import numpy as np PAPER_FLOW = { "uniform_min": 0.03, "uniform_max": 0.24, "vortex_base_max": 0.05, "vortex_gamma_min": 0.12, "vortex_gamma_max": 0.24, "vortex_max": 0.36, "gradient_base_min": 0.00, "gradient_base_max": 0.08, "gradient_matrix_min": 0.018, "gradient_matrix_max": 0.040, "gradient_max": 0.34, "turbulent_base_max": 0.05, "turbulent_vector_std": 0.105, "turbulent_max": 0.34, "shear_min": 0.035, "shear_max_rate": 0.085, "shear_max": 0.38, "double_gyre_amp_min": 0.20, "double_gyre_amp_max": 0.34, "double_gyre_max": 0.36, "source_base_max": 0.03, "source_strength_min": 0.16, "source_strength_max": 0.30, "source_max": 0.36, "random_fourier_amp_std": 0.045, "random_fourier_max": 0.38, } @dataclass class Flow: name: str flow_id: int def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: raise NotImplementedError def metadata(self) -> dict[str, Any]: return {"flow_type": self.name, "flow_id": int(self.flow_id)} @dataclass class NoFlow(Flow): def __init__(self, flow_id: int = 0): super().__init__("noflow", flow_id) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: return np.zeros_like(np.asarray(pos, dtype=np.float32)) @dataclass class UniformFlow(Flow): vector: np.ndarray def __init__(self, vector: np.ndarray, flow_id: int): super().__init__("uniform", flow_id) self.vector = np.asarray(vector, dtype=np.float32) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: pos = np.asarray(pos, dtype=np.float32) return np.broadcast_to(self.vector, pos.shape).astype(np.float32) def metadata(self) -> dict[str, Any]: out = super().metadata() out["vector"] = self.vector.astype(float).tolist() return out @dataclass class VortexFlow(Flow): base: np.ndarray center: np.ndarray gamma: float radius_eps: float max_speed: float def __init__( self, base: np.ndarray, center: np.ndarray, gamma: float, flow_id: int, radius_eps: float = 0.30, max_speed: float = 0.50, name: str = "vortex", ): super().__init__(name, flow_id) self.base = np.asarray(base, dtype=np.float32) self.center = np.asarray(center, dtype=np.float32) self.gamma = float(gamma) self.radius_eps = float(radius_eps) self.max_speed = float(max_speed) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: pos = np.asarray(pos, dtype=np.float32) rel = pos - self.center denom = np.sum(rel * rel, axis=-1, keepdims=True) + self.radius_eps**2 swirl = self.gamma * np.concatenate([-rel[..., 1:2], rel[..., 0:1]], axis=-1) / denom vel = self.base + swirl speed = np.linalg.norm(vel, axis=-1, keepdims=True) scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) return (vel * scale).astype(np.float32) def metadata(self) -> dict[str, Any]: out = super().metadata() out.update( { "base": self.base.astype(float).tolist(), "center": self.center.astype(float).tolist(), "gamma": self.gamma, } ) return out @dataclass class GradientFlow(Flow): base: np.ndarray center: np.ndarray matrix: np.ndarray max_speed: float def __init__(self, base: np.ndarray, center: np.ndarray, matrix: np.ndarray, flow_id: int, max_speed: float = 0.55): super().__init__("gradient", flow_id) self.base = np.asarray(base, dtype=np.float32) self.center = np.asarray(center, dtype=np.float32) self.matrix = np.asarray(matrix, dtype=np.float32) self.max_speed = float(max_speed) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: pos = np.asarray(pos, dtype=np.float32) rel = pos - self.center vel = self.base + rel @ self.matrix.T speed = np.linalg.norm(vel, axis=-1, keepdims=True) scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) return (vel * scale).astype(np.float32) def metadata(self) -> dict[str, Any]: out = super().metadata() out.update( { "base": self.base.astype(float).tolist(), "center": self.center.astype(float).tolist(), "matrix": self.matrix.astype(float).tolist(), } ) return out @dataclass class TurbulentPatchFlow(Flow): base: np.ndarray centers: np.ndarray vectors: np.ndarray sigma: float max_speed: float def __init__( self, base: np.ndarray, centers: np.ndarray, vectors: np.ndarray, flow_id: int, sigma: float = 1.15, max_speed: float = 0.55, ): super().__init__("turbulent_patch", flow_id) self.base = np.asarray(base, dtype=np.float32) self.centers = np.asarray(centers, dtype=np.float32) self.vectors = np.asarray(vectors, dtype=np.float32) self.sigma = float(sigma) self.max_speed = float(max_speed) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: pos = np.asarray(pos, dtype=np.float32) rel = pos[..., None, :] - self.centers weights = np.exp(-np.sum(rel * rel, axis=-1, keepdims=True) / (2.0 * self.sigma**2)) perturb = np.sum(weights * self.vectors, axis=-2) vel = self.base + perturb speed = np.linalg.norm(vel, axis=-1, keepdims=True) scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) return (vel * scale).astype(np.float32) def metadata(self) -> dict[str, Any]: out = super().metadata() out.update({"base": self.base.astype(float).tolist(), "centers": self.centers.astype(float).tolist()}) return out @dataclass class ShearFlow(Flow): base: np.ndarray center_y: float shear: float max_speed: float def __init__(self, base: np.ndarray, center_y: float, shear: float, flow_id: int, max_speed: float = 0.60): super().__init__("shear", flow_id) self.base = np.asarray(base, dtype=np.float32) self.center_y = float(center_y) self.shear = float(shear) self.max_speed = float(max_speed) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: pos = np.asarray(pos, dtype=np.float32) vel = np.broadcast_to(self.base, pos.shape).copy() vel[..., 0] += self.shear * (pos[..., 1] - self.center_y) speed = np.linalg.norm(vel, axis=-1, keepdims=True) scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) return (vel * scale).astype(np.float32) def metadata(self) -> dict[str, Any]: out = super().metadata() out.update({"base": self.base.astype(float).tolist(), "center_y": self.center_y, "shear": self.shear}) return out @dataclass class DoubleGyreFlow(Flow): amp: float phase: float workspace: tuple[float, float, float, float] max_speed: float def __init__( self, amp: float, phase: float, flow_id: int, workspace: tuple[float, float, float, float], max_speed: float = 0.50, ): super().__init__("double_gyre", flow_id) self.amp = float(amp) self.phase = float(phase) self.workspace = workspace self.max_speed = float(max_speed) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: pos = np.asarray(pos, dtype=np.float32) xmin, xmax, ymin, ymax = self.workspace x = (pos[..., 0] - xmin) / max(xmax - xmin, 1e-6) y = (pos[..., 1] - ymin) / max(ymax - ymin, 1e-6) sx = np.sin(np.pi * x + self.phase) cx = np.cos(np.pi * x + self.phase) sy = np.sin(2.0 * np.pi * y) cy = np.cos(2.0 * np.pi * y) vel = np.stack([self.amp * sx * cy, -0.5 * self.amp * cx * sy], axis=-1) speed = np.linalg.norm(vel, axis=-1, keepdims=True) scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) return (vel * scale).astype(np.float32) def metadata(self) -> dict[str, Any]: out = super().metadata() out.update({"amp": self.amp, "phase": self.phase}) return out @dataclass class SourceSinkFlow(Flow): base: np.ndarray centers: np.ndarray strengths: np.ndarray radius_eps: float max_speed: float def __init__( self, name: str, base: np.ndarray, centers: np.ndarray, strengths: np.ndarray, flow_id: int, radius_eps: float = 0.45, max_speed: float = 0.50, ): super().__init__(name, flow_id) self.base = np.asarray(base, dtype=np.float32) self.centers = np.asarray(centers, dtype=np.float32) self.strengths = np.asarray(strengths, dtype=np.float32) self.radius_eps = float(radius_eps) self.max_speed = float(max_speed) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: pos = np.asarray(pos, dtype=np.float32) rel = pos[..., None, :] - self.centers denom = np.sum(rel * rel, axis=-1, keepdims=True) + self.radius_eps**2 strength_shape = (1,) * (rel.ndim - 2) + (self.strengths.shape[0], 1) radial = np.sum(self.strengths.reshape(strength_shape) * rel / denom, axis=-2) vel = self.base + radial speed = np.linalg.norm(vel, axis=-1, keepdims=True) scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) return (vel * scale).astype(np.float32) def metadata(self) -> dict[str, Any]: out = super().metadata() out.update( { "base": self.base.astype(float).tolist(), "centers": self.centers.astype(float).tolist(), "strengths": self.strengths.astype(float).tolist(), } ) return out @dataclass class RandomFourierFlow(Flow): base: np.ndarray k: np.ndarray amp: np.ndarray phase: np.ndarray max_speed: float def __init__( self, base: np.ndarray, k: np.ndarray, amp: np.ndarray, phase: np.ndarray, flow_id: int, max_speed: float = 0.60, ): super().__init__("random_fourier", flow_id) self.base = np.asarray(base, dtype=np.float32) self.k = np.asarray(k, dtype=np.float32) self.amp = np.asarray(amp, dtype=np.float32) self.phase = np.asarray(phase, dtype=np.float32) self.max_speed = float(max_speed) def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: pos = np.asarray(pos, dtype=np.float32) flat = pos.reshape(-1, 2) arg = flat @ self.k.T + self.phase[None, :] # Divergence-free field via stream function psi: v=(dpsi/dy, -dpsi/dx). coeff = self.amp[None, :] * np.cos(arg) vx = np.sum(coeff * self.k[None, :, 1], axis=1) vy = -np.sum(coeff * self.k[None, :, 0], axis=1) vel = np.stack([vx, vy], axis=-1).reshape(pos.shape) + self.base speed = np.linalg.norm(vel, axis=-1, keepdims=True) scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) return (vel * scale).astype(np.float32) def metadata(self) -> dict[str, Any]: out = super().metadata() out.update({"base": self.base.astype(float).tolist(), "num_modes": int(self.k.shape[0])}) return out def _sample_uniform_vector(rng: np.random.Generator, min_speed: float = 0.05, max_speed: float = 0.35) -> np.ndarray: speed = rng.uniform(min_speed, max_speed) direction = rng.uniform(0.0, 2.0 * np.pi) return np.array([speed * np.cos(direction), speed * np.sin(direction)], dtype=np.float32) def _sample_signed_uniform(rng: np.random.Generator, min_abs: float, max_abs: float) -> float: sign = -1.0 if rng.random() < 0.5 else 1.0 return float(sign * rng.uniform(min_abs, max_abs)) def _route_anchors(workspace: tuple[float, float, float, float]) -> np.ndarray: xmin, xmax, ymin, ymax = workspace w = xmax - xmin h = ymax - ymin points = np.array( [ [0.20, 0.20], [0.35, 0.35], [0.50, 0.50], [0.65, 0.65], [0.80, 0.80], [0.20, 0.80], [0.35, 0.65], [0.50, 0.50], [0.65, 0.35], [0.80, 0.20], [0.50, 0.25], [0.50, 0.75], [0.25, 0.50], [0.75, 0.50], ], dtype=np.float32, ) points[:, 0] = xmin + points[:, 0] * w points[:, 1] = ymin + points[:, 1] * h return points def _sample_route_center( rng: np.random.Generator, workspace: tuple[float, float, float, float], jitter: float = 0.65, ) -> np.ndarray: xmin, xmax, ymin, ymax = workspace anchors = _route_anchors(workspace) center = anchors[int(rng.integers(0, len(anchors)))] + rng.normal(0.0, jitter, size=2).astype(np.float32) return np.array([np.clip(center[0], xmin + 1.2, xmax - 1.2), np.clip(center[1], ymin + 1.2, ymax - 1.2)], dtype=np.float32) def _sample_route_centers( rng: np.random.Generator, workspace: tuple[float, float, float, float], count: int, jitter: float = 0.65, ) -> np.ndarray: return np.stack([_sample_route_center(rng, workspace, jitter=jitter) for _ in range(count)], axis=0).astype(np.float32) def sample_flow( flow_type: str, rng: np.random.Generator, flow_id: int, workspace: tuple[float, float, float, float] = (0.0, 10.0, 0.0, 10.0), ) -> Flow: flow_type = flow_type.lower() profile = PAPER_FLOW if flow_type == "noflow": return NoFlow(flow_id=0) if flow_type == "uniform": return UniformFlow(_sample_uniform_vector(rng, profile["uniform_min"], profile["uniform_max"]), flow_id=flow_id) if flow_type == "vortex_center": center = _sample_route_center(rng, workspace, jitter=0.45) base = _sample_uniform_vector(rng, 0.0, profile["vortex_base_max"]) gamma = _sample_signed_uniform(rng, profile["vortex_gamma_min"], profile["vortex_gamma_max"]) return VortexFlow(base=base, center=center, gamma=gamma, flow_id=flow_id, max_speed=profile["vortex_max"], name=flow_type) if flow_type == "gradient": center = _sample_route_center(rng, workspace, jitter=0.35) base = _sample_uniform_vector(rng, profile["gradient_base_min"], profile["gradient_base_max"]) scale = rng.uniform(profile["gradient_matrix_min"], profile["gradient_matrix_max"]) mat = rng.normal(0.0, 1.0, size=(2, 2)).astype(np.float32) mat = mat / max(float(np.linalg.norm(mat)), 1e-6) * scale return GradientFlow(base=base, center=center, matrix=mat, flow_id=flow_id, max_speed=profile["gradient_max"]) if flow_type == "turbulent_patch": base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"]) centers = _sample_route_centers(rng, workspace, count=5, jitter=0.85) vectors = rng.normal(0.0, profile["turbulent_vector_std"], size=(5, 2)).astype(np.float32) return TurbulentPatchFlow(base=base, centers=centers, vectors=vectors, flow_id=flow_id, max_speed=profile["turbulent_max"]) if flow_type == "shear": xmin, xmax, ymin, ymax = workspace base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"]) center_y = 0.5 * (ymin + ymax) shear = _sample_signed_uniform(rng, profile["shear_min"], profile["shear_max_rate"]) return ShearFlow(base=base, center_y=center_y, shear=shear, flow_id=flow_id, max_speed=profile["shear_max"]) if flow_type == "double_gyre": amp = float(rng.uniform(profile["double_gyre_amp_min"], profile["double_gyre_amp_max"])) phase = float(rng.uniform(0.0, 2.0 * np.pi)) return DoubleGyreFlow(amp=amp, phase=phase, flow_id=flow_id, workspace=workspace, max_speed=profile["double_gyre_max"]) if flow_type == "source_sink": base = _sample_uniform_vector(rng, 0.0, profile["source_base_max"]) center = _sample_route_centers(rng, workspace, count=1, jitter=0.65) strength = np.array([_sample_signed_uniform(rng, profile["source_strength_min"], profile["source_strength_max"])], dtype=np.float32) return SourceSinkFlow( name="source_sink", base=base, centers=center, strengths=strength, flow_id=flow_id, max_speed=profile["source_max"], ) if flow_type == "source_sink_pair": base = _sample_uniform_vector(rng, 0.0, profile["source_base_max"]) centers = _sample_route_centers(rng, workspace, count=2, jitter=0.75) strength = float(rng.uniform(profile["source_strength_min"], profile["source_strength_max"])) if rng.random() < 0.5: strength = -strength strengths = np.array([strength, -strength], dtype=np.float32) return SourceSinkFlow( name="source_sink_pair", base=base, centers=centers, strengths=strengths, flow_id=flow_id, max_speed=profile["source_max"], ) if flow_type == "random_fourier": base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"]) modes = 8 k = rng.integers(1, 5, size=(modes, 2)).astype(np.float32) signs = rng.choice([-1.0, 1.0], size=(modes, 2)).astype(np.float32) k = signs * k * (2.0 * np.pi / 10.0) amp_std = profile["random_fourier_amp_std"] amp = rng.normal(0.0, amp_std, size=(modes,)).astype(np.float32) phase = rng.uniform(0.0, 2.0 * np.pi, size=(modes,)).astype(np.float32) return RandomFourierFlow( base=base, k=k, amp=amp, phase=phase, flow_id=flow_id, max_speed=profile["random_fourier_max"], ) raise ValueError(f"unknown flow type: {flow_type}")