| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import Any |
|
|
| import numpy as np |
|
|
|
|
| PAPER_FLOW = { |
| "uniform_min": 0.03, |
| "uniform_max": 0.24, |
| "vortex_base_max": 0.05, |
| "vortex_gamma_min": 0.12, |
| "vortex_gamma_max": 0.24, |
| "vortex_max": 0.36, |
| "gradient_base_min": 0.00, |
| "gradient_base_max": 0.08, |
| "gradient_matrix_min": 0.018, |
| "gradient_matrix_max": 0.040, |
| "gradient_max": 0.34, |
| "turbulent_base_max": 0.05, |
| "turbulent_vector_std": 0.105, |
| "turbulent_max": 0.34, |
| "shear_min": 0.035, |
| "shear_max_rate": 0.085, |
| "shear_max": 0.38, |
| "double_gyre_amp_min": 0.20, |
| "double_gyre_amp_max": 0.34, |
| "double_gyre_max": 0.36, |
| "source_base_max": 0.03, |
| "source_strength_min": 0.16, |
| "source_strength_max": 0.30, |
| "source_max": 0.36, |
| "random_fourier_amp_std": 0.045, |
| "random_fourier_max": 0.38, |
| } |
|
|
|
|
| @dataclass |
| class Flow: |
| name: str |
| flow_id: int |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| raise NotImplementedError |
|
|
| def metadata(self) -> dict[str, Any]: |
| return {"flow_type": self.name, "flow_id": int(self.flow_id)} |
|
|
|
|
| @dataclass |
| class NoFlow(Flow): |
| def __init__(self, flow_id: int = 0): |
| super().__init__("noflow", flow_id) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| return np.zeros_like(np.asarray(pos, dtype=np.float32)) |
|
|
|
|
| @dataclass |
| class UniformFlow(Flow): |
| vector: np.ndarray |
|
|
| def __init__(self, vector: np.ndarray, flow_id: int): |
| super().__init__("uniform", flow_id) |
| self.vector = np.asarray(vector, dtype=np.float32) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| pos = np.asarray(pos, dtype=np.float32) |
| return np.broadcast_to(self.vector, pos.shape).astype(np.float32) |
|
|
| def metadata(self) -> dict[str, Any]: |
| out = super().metadata() |
| out["vector"] = self.vector.astype(float).tolist() |
| return out |
|
|
| @dataclass |
| class VortexFlow(Flow): |
| base: np.ndarray |
| center: np.ndarray |
| gamma: float |
| radius_eps: float |
| max_speed: float |
|
|
| def __init__( |
| self, |
| base: np.ndarray, |
| center: np.ndarray, |
| gamma: float, |
| flow_id: int, |
| radius_eps: float = 0.30, |
| max_speed: float = 0.50, |
| name: str = "vortex", |
| ): |
| super().__init__(name, flow_id) |
| self.base = np.asarray(base, dtype=np.float32) |
| self.center = np.asarray(center, dtype=np.float32) |
| self.gamma = float(gamma) |
| self.radius_eps = float(radius_eps) |
| self.max_speed = float(max_speed) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| pos = np.asarray(pos, dtype=np.float32) |
| rel = pos - self.center |
| denom = np.sum(rel * rel, axis=-1, keepdims=True) + self.radius_eps**2 |
| swirl = self.gamma * np.concatenate([-rel[..., 1:2], rel[..., 0:1]], axis=-1) / denom |
| vel = self.base + swirl |
| speed = np.linalg.norm(vel, axis=-1, keepdims=True) |
| scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) |
| return (vel * scale).astype(np.float32) |
|
|
| def metadata(self) -> dict[str, Any]: |
| out = super().metadata() |
| out.update( |
| { |
| "base": self.base.astype(float).tolist(), |
| "center": self.center.astype(float).tolist(), |
| "gamma": self.gamma, |
| } |
| ) |
| return out |
|
|
|
|
| @dataclass |
| class GradientFlow(Flow): |
| base: np.ndarray |
| center: np.ndarray |
| matrix: np.ndarray |
| max_speed: float |
|
|
| def __init__(self, base: np.ndarray, center: np.ndarray, matrix: np.ndarray, flow_id: int, max_speed: float = 0.55): |
| super().__init__("gradient", flow_id) |
| self.base = np.asarray(base, dtype=np.float32) |
| self.center = np.asarray(center, dtype=np.float32) |
| self.matrix = np.asarray(matrix, dtype=np.float32) |
| self.max_speed = float(max_speed) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| pos = np.asarray(pos, dtype=np.float32) |
| rel = pos - self.center |
| vel = self.base + rel @ self.matrix.T |
| speed = np.linalg.norm(vel, axis=-1, keepdims=True) |
| scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) |
| return (vel * scale).astype(np.float32) |
|
|
| def metadata(self) -> dict[str, Any]: |
| out = super().metadata() |
| out.update( |
| { |
| "base": self.base.astype(float).tolist(), |
| "center": self.center.astype(float).tolist(), |
| "matrix": self.matrix.astype(float).tolist(), |
| } |
| ) |
| return out |
|
|
|
|
| @dataclass |
| class TurbulentPatchFlow(Flow): |
| base: np.ndarray |
| centers: np.ndarray |
| vectors: np.ndarray |
| sigma: float |
| max_speed: float |
|
|
| def __init__( |
| self, |
| base: np.ndarray, |
| centers: np.ndarray, |
| vectors: np.ndarray, |
| flow_id: int, |
| sigma: float = 1.15, |
| max_speed: float = 0.55, |
| ): |
| super().__init__("turbulent_patch", flow_id) |
| self.base = np.asarray(base, dtype=np.float32) |
| self.centers = np.asarray(centers, dtype=np.float32) |
| self.vectors = np.asarray(vectors, dtype=np.float32) |
| self.sigma = float(sigma) |
| self.max_speed = float(max_speed) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| pos = np.asarray(pos, dtype=np.float32) |
| rel = pos[..., None, :] - self.centers |
| weights = np.exp(-np.sum(rel * rel, axis=-1, keepdims=True) / (2.0 * self.sigma**2)) |
| perturb = np.sum(weights * self.vectors, axis=-2) |
| vel = self.base + perturb |
| speed = np.linalg.norm(vel, axis=-1, keepdims=True) |
| scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) |
| return (vel * scale).astype(np.float32) |
|
|
| def metadata(self) -> dict[str, Any]: |
| out = super().metadata() |
| out.update({"base": self.base.astype(float).tolist(), "centers": self.centers.astype(float).tolist()}) |
| return out |
|
|
|
|
| @dataclass |
| class ShearFlow(Flow): |
| base: np.ndarray |
| center_y: float |
| shear: float |
| max_speed: float |
|
|
| def __init__(self, base: np.ndarray, center_y: float, shear: float, flow_id: int, max_speed: float = 0.60): |
| super().__init__("shear", flow_id) |
| self.base = np.asarray(base, dtype=np.float32) |
| self.center_y = float(center_y) |
| self.shear = float(shear) |
| self.max_speed = float(max_speed) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| pos = np.asarray(pos, dtype=np.float32) |
| vel = np.broadcast_to(self.base, pos.shape).copy() |
| vel[..., 0] += self.shear * (pos[..., 1] - self.center_y) |
| speed = np.linalg.norm(vel, axis=-1, keepdims=True) |
| scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) |
| return (vel * scale).astype(np.float32) |
|
|
| def metadata(self) -> dict[str, Any]: |
| out = super().metadata() |
| out.update({"base": self.base.astype(float).tolist(), "center_y": self.center_y, "shear": self.shear}) |
| return out |
|
|
|
|
| @dataclass |
| class DoubleGyreFlow(Flow): |
| amp: float |
| phase: float |
| workspace: tuple[float, float, float, float] |
| max_speed: float |
|
|
| def __init__( |
| self, |
| amp: float, |
| phase: float, |
| flow_id: int, |
| workspace: tuple[float, float, float, float], |
| max_speed: float = 0.50, |
| ): |
| super().__init__("double_gyre", flow_id) |
| self.amp = float(amp) |
| self.phase = float(phase) |
| self.workspace = workspace |
| self.max_speed = float(max_speed) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| pos = np.asarray(pos, dtype=np.float32) |
| xmin, xmax, ymin, ymax = self.workspace |
| x = (pos[..., 0] - xmin) / max(xmax - xmin, 1e-6) |
| y = (pos[..., 1] - ymin) / max(ymax - ymin, 1e-6) |
| sx = np.sin(np.pi * x + self.phase) |
| cx = np.cos(np.pi * x + self.phase) |
| sy = np.sin(2.0 * np.pi * y) |
| cy = np.cos(2.0 * np.pi * y) |
| vel = np.stack([self.amp * sx * cy, -0.5 * self.amp * cx * sy], axis=-1) |
| speed = np.linalg.norm(vel, axis=-1, keepdims=True) |
| scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) |
| return (vel * scale).astype(np.float32) |
|
|
| def metadata(self) -> dict[str, Any]: |
| out = super().metadata() |
| out.update({"amp": self.amp, "phase": self.phase}) |
| return out |
|
|
|
|
| @dataclass |
| class SourceSinkFlow(Flow): |
| base: np.ndarray |
| centers: np.ndarray |
| strengths: np.ndarray |
| radius_eps: float |
| max_speed: float |
|
|
| def __init__( |
| self, |
| name: str, |
| base: np.ndarray, |
| centers: np.ndarray, |
| strengths: np.ndarray, |
| flow_id: int, |
| radius_eps: float = 0.45, |
| max_speed: float = 0.50, |
| ): |
| super().__init__(name, flow_id) |
| self.base = np.asarray(base, dtype=np.float32) |
| self.centers = np.asarray(centers, dtype=np.float32) |
| self.strengths = np.asarray(strengths, dtype=np.float32) |
| self.radius_eps = float(radius_eps) |
| self.max_speed = float(max_speed) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| pos = np.asarray(pos, dtype=np.float32) |
| rel = pos[..., None, :] - self.centers |
| denom = np.sum(rel * rel, axis=-1, keepdims=True) + self.radius_eps**2 |
| strength_shape = (1,) * (rel.ndim - 2) + (self.strengths.shape[0], 1) |
| radial = np.sum(self.strengths.reshape(strength_shape) * rel / denom, axis=-2) |
| vel = self.base + radial |
| speed = np.linalg.norm(vel, axis=-1, keepdims=True) |
| scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) |
| return (vel * scale).astype(np.float32) |
|
|
| def metadata(self) -> dict[str, Any]: |
| out = super().metadata() |
| out.update( |
| { |
| "base": self.base.astype(float).tolist(), |
| "centers": self.centers.astype(float).tolist(), |
| "strengths": self.strengths.astype(float).tolist(), |
| } |
| ) |
| return out |
|
|
|
|
| @dataclass |
| class RandomFourierFlow(Flow): |
| base: np.ndarray |
| k: np.ndarray |
| amp: np.ndarray |
| phase: np.ndarray |
| max_speed: float |
|
|
| def __init__( |
| self, |
| base: np.ndarray, |
| k: np.ndarray, |
| amp: np.ndarray, |
| phase: np.ndarray, |
| flow_id: int, |
| max_speed: float = 0.60, |
| ): |
| super().__init__("random_fourier", flow_id) |
| self.base = np.asarray(base, dtype=np.float32) |
| self.k = np.asarray(k, dtype=np.float32) |
| self.amp = np.asarray(amp, dtype=np.float32) |
| self.phase = np.asarray(phase, dtype=np.float32) |
| self.max_speed = float(max_speed) |
|
|
| def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray: |
| pos = np.asarray(pos, dtype=np.float32) |
| flat = pos.reshape(-1, 2) |
| arg = flat @ self.k.T + self.phase[None, :] |
| |
| coeff = self.amp[None, :] * np.cos(arg) |
| vx = np.sum(coeff * self.k[None, :, 1], axis=1) |
| vy = -np.sum(coeff * self.k[None, :, 0], axis=1) |
| vel = np.stack([vx, vy], axis=-1).reshape(pos.shape) + self.base |
| speed = np.linalg.norm(vel, axis=-1, keepdims=True) |
| scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6)) |
| return (vel * scale).astype(np.float32) |
|
|
| def metadata(self) -> dict[str, Any]: |
| out = super().metadata() |
| out.update({"base": self.base.astype(float).tolist(), "num_modes": int(self.k.shape[0])}) |
| return out |
|
|
|
|
| def _sample_uniform_vector(rng: np.random.Generator, min_speed: float = 0.05, max_speed: float = 0.35) -> np.ndarray: |
| speed = rng.uniform(min_speed, max_speed) |
| direction = rng.uniform(0.0, 2.0 * np.pi) |
| return np.array([speed * np.cos(direction), speed * np.sin(direction)], dtype=np.float32) |
|
|
|
|
| def _sample_signed_uniform(rng: np.random.Generator, min_abs: float, max_abs: float) -> float: |
| sign = -1.0 if rng.random() < 0.5 else 1.0 |
| return float(sign * rng.uniform(min_abs, max_abs)) |
|
|
|
|
| def _route_anchors(workspace: tuple[float, float, float, float]) -> np.ndarray: |
| xmin, xmax, ymin, ymax = workspace |
| w = xmax - xmin |
| h = ymax - ymin |
| points = np.array( |
| [ |
| [0.20, 0.20], |
| [0.35, 0.35], |
| [0.50, 0.50], |
| [0.65, 0.65], |
| [0.80, 0.80], |
| [0.20, 0.80], |
| [0.35, 0.65], |
| [0.50, 0.50], |
| [0.65, 0.35], |
| [0.80, 0.20], |
| [0.50, 0.25], |
| [0.50, 0.75], |
| [0.25, 0.50], |
| [0.75, 0.50], |
| ], |
| dtype=np.float32, |
| ) |
| points[:, 0] = xmin + points[:, 0] * w |
| points[:, 1] = ymin + points[:, 1] * h |
| return points |
|
|
|
|
| def _sample_route_center( |
| rng: np.random.Generator, |
| workspace: tuple[float, float, float, float], |
| jitter: float = 0.65, |
| ) -> np.ndarray: |
| xmin, xmax, ymin, ymax = workspace |
| anchors = _route_anchors(workspace) |
| center = anchors[int(rng.integers(0, len(anchors)))] + rng.normal(0.0, jitter, size=2).astype(np.float32) |
| return np.array([np.clip(center[0], xmin + 1.2, xmax - 1.2), np.clip(center[1], ymin + 1.2, ymax - 1.2)], dtype=np.float32) |
|
|
|
|
| def _sample_route_centers( |
| rng: np.random.Generator, |
| workspace: tuple[float, float, float, float], |
| count: int, |
| jitter: float = 0.65, |
| ) -> np.ndarray: |
| return np.stack([_sample_route_center(rng, workspace, jitter=jitter) for _ in range(count)], axis=0).astype(np.float32) |
|
|
|
|
| def sample_flow( |
| flow_type: str, |
| rng: np.random.Generator, |
| flow_id: int, |
| workspace: tuple[float, float, float, float] = (0.0, 10.0, 0.0, 10.0), |
| ) -> Flow: |
| flow_type = flow_type.lower() |
| profile = PAPER_FLOW |
| if flow_type == "noflow": |
| return NoFlow(flow_id=0) |
| if flow_type == "uniform": |
| return UniformFlow(_sample_uniform_vector(rng, profile["uniform_min"], profile["uniform_max"]), flow_id=flow_id) |
| if flow_type == "vortex_center": |
| center = _sample_route_center(rng, workspace, jitter=0.45) |
| base = _sample_uniform_vector(rng, 0.0, profile["vortex_base_max"]) |
| gamma = _sample_signed_uniform(rng, profile["vortex_gamma_min"], profile["vortex_gamma_max"]) |
| return VortexFlow(base=base, center=center, gamma=gamma, flow_id=flow_id, max_speed=profile["vortex_max"], name=flow_type) |
| if flow_type == "gradient": |
| center = _sample_route_center(rng, workspace, jitter=0.35) |
| base = _sample_uniform_vector(rng, profile["gradient_base_min"], profile["gradient_base_max"]) |
| scale = rng.uniform(profile["gradient_matrix_min"], profile["gradient_matrix_max"]) |
| mat = rng.normal(0.0, 1.0, size=(2, 2)).astype(np.float32) |
| mat = mat / max(float(np.linalg.norm(mat)), 1e-6) * scale |
| return GradientFlow(base=base, center=center, matrix=mat, flow_id=flow_id, max_speed=profile["gradient_max"]) |
| if flow_type == "turbulent_patch": |
| base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"]) |
| centers = _sample_route_centers(rng, workspace, count=5, jitter=0.85) |
| vectors = rng.normal(0.0, profile["turbulent_vector_std"], size=(5, 2)).astype(np.float32) |
| return TurbulentPatchFlow(base=base, centers=centers, vectors=vectors, flow_id=flow_id, max_speed=profile["turbulent_max"]) |
| if flow_type == "shear": |
| xmin, xmax, ymin, ymax = workspace |
| base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"]) |
| center_y = 0.5 * (ymin + ymax) |
| shear = _sample_signed_uniform(rng, profile["shear_min"], profile["shear_max_rate"]) |
| return ShearFlow(base=base, center_y=center_y, shear=shear, flow_id=flow_id, max_speed=profile["shear_max"]) |
| if flow_type == "double_gyre": |
| amp = float(rng.uniform(profile["double_gyre_amp_min"], profile["double_gyre_amp_max"])) |
| phase = float(rng.uniform(0.0, 2.0 * np.pi)) |
| return DoubleGyreFlow(amp=amp, phase=phase, flow_id=flow_id, workspace=workspace, max_speed=profile["double_gyre_max"]) |
| if flow_type == "source_sink": |
| base = _sample_uniform_vector(rng, 0.0, profile["source_base_max"]) |
| center = _sample_route_centers(rng, workspace, count=1, jitter=0.65) |
| strength = np.array([_sample_signed_uniform(rng, profile["source_strength_min"], profile["source_strength_max"])], dtype=np.float32) |
| return SourceSinkFlow( |
| name="source_sink", |
| base=base, |
| centers=center, |
| strengths=strength, |
| flow_id=flow_id, |
| max_speed=profile["source_max"], |
| ) |
| if flow_type == "source_sink_pair": |
| base = _sample_uniform_vector(rng, 0.0, profile["source_base_max"]) |
| centers = _sample_route_centers(rng, workspace, count=2, jitter=0.75) |
| strength = float(rng.uniform(profile["source_strength_min"], profile["source_strength_max"])) |
| if rng.random() < 0.5: |
| strength = -strength |
| strengths = np.array([strength, -strength], dtype=np.float32) |
| return SourceSinkFlow( |
| name="source_sink_pair", |
| base=base, |
| centers=centers, |
| strengths=strengths, |
| flow_id=flow_id, |
| max_speed=profile["source_max"], |
| ) |
| if flow_type == "random_fourier": |
| base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"]) |
| modes = 8 |
| k = rng.integers(1, 5, size=(modes, 2)).astype(np.float32) |
| signs = rng.choice([-1.0, 1.0], size=(modes, 2)).astype(np.float32) |
| k = signs * k * (2.0 * np.pi / 10.0) |
| amp_std = profile["random_fourier_amp_std"] |
| amp = rng.normal(0.0, amp_std, size=(modes,)).astype(np.float32) |
| phase = rng.uniform(0.0, 2.0 * np.pi, size=(modes,)).astype(np.float32) |
| return RandomFourierFlow( |
| base=base, |
| k=k, |
| amp=amp, |
| phase=phase, |
| flow_id=flow_id, |
| max_speed=profile["random_fourier_max"], |
| ) |
| raise ValueError(f"unknown flow type: {flow_type}") |
|
|