FlowMo-WM / driftwm /sim /flow.py
cccat6's picture
Update FlowMo-WM code and static flow protocol
ccf9f1b verified
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import numpy as np
PAPER_FLOW = {
"uniform_min": 0.03,
"uniform_max": 0.24,
"vortex_base_max": 0.05,
"vortex_gamma_min": 0.12,
"vortex_gamma_max": 0.24,
"vortex_max": 0.36,
"gradient_base_min": 0.00,
"gradient_base_max": 0.08,
"gradient_matrix_min": 0.018,
"gradient_matrix_max": 0.040,
"gradient_max": 0.34,
"turbulent_base_max": 0.05,
"turbulent_vector_std": 0.105,
"turbulent_max": 0.34,
"shear_min": 0.035,
"shear_max_rate": 0.085,
"shear_max": 0.38,
"double_gyre_amp_min": 0.20,
"double_gyre_amp_max": 0.34,
"double_gyre_max": 0.36,
"source_base_max": 0.03,
"source_strength_min": 0.16,
"source_strength_max": 0.30,
"source_max": 0.36,
"random_fourier_amp_std": 0.045,
"random_fourier_max": 0.38,
}
@dataclass
class Flow:
name: str
flow_id: int
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
raise NotImplementedError
def metadata(self) -> dict[str, Any]:
return {"flow_type": self.name, "flow_id": int(self.flow_id)}
@dataclass
class NoFlow(Flow):
def __init__(self, flow_id: int = 0):
super().__init__("noflow", flow_id)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
return np.zeros_like(np.asarray(pos, dtype=np.float32))
@dataclass
class UniformFlow(Flow):
vector: np.ndarray
def __init__(self, vector: np.ndarray, flow_id: int):
super().__init__("uniform", flow_id)
self.vector = np.asarray(vector, dtype=np.float32)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
pos = np.asarray(pos, dtype=np.float32)
return np.broadcast_to(self.vector, pos.shape).astype(np.float32)
def metadata(self) -> dict[str, Any]:
out = super().metadata()
out["vector"] = self.vector.astype(float).tolist()
return out
@dataclass
class VortexFlow(Flow):
base: np.ndarray
center: np.ndarray
gamma: float
radius_eps: float
max_speed: float
def __init__(
self,
base: np.ndarray,
center: np.ndarray,
gamma: float,
flow_id: int,
radius_eps: float = 0.30,
max_speed: float = 0.50,
name: str = "vortex",
):
super().__init__(name, flow_id)
self.base = np.asarray(base, dtype=np.float32)
self.center = np.asarray(center, dtype=np.float32)
self.gamma = float(gamma)
self.radius_eps = float(radius_eps)
self.max_speed = float(max_speed)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
pos = np.asarray(pos, dtype=np.float32)
rel = pos - self.center
denom = np.sum(rel * rel, axis=-1, keepdims=True) + self.radius_eps**2
swirl = self.gamma * np.concatenate([-rel[..., 1:2], rel[..., 0:1]], axis=-1) / denom
vel = self.base + swirl
speed = np.linalg.norm(vel, axis=-1, keepdims=True)
scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6))
return (vel * scale).astype(np.float32)
def metadata(self) -> dict[str, Any]:
out = super().metadata()
out.update(
{
"base": self.base.astype(float).tolist(),
"center": self.center.astype(float).tolist(),
"gamma": self.gamma,
}
)
return out
@dataclass
class GradientFlow(Flow):
base: np.ndarray
center: np.ndarray
matrix: np.ndarray
max_speed: float
def __init__(self, base: np.ndarray, center: np.ndarray, matrix: np.ndarray, flow_id: int, max_speed: float = 0.55):
super().__init__("gradient", flow_id)
self.base = np.asarray(base, dtype=np.float32)
self.center = np.asarray(center, dtype=np.float32)
self.matrix = np.asarray(matrix, dtype=np.float32)
self.max_speed = float(max_speed)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
pos = np.asarray(pos, dtype=np.float32)
rel = pos - self.center
vel = self.base + rel @ self.matrix.T
speed = np.linalg.norm(vel, axis=-1, keepdims=True)
scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6))
return (vel * scale).astype(np.float32)
def metadata(self) -> dict[str, Any]:
out = super().metadata()
out.update(
{
"base": self.base.astype(float).tolist(),
"center": self.center.astype(float).tolist(),
"matrix": self.matrix.astype(float).tolist(),
}
)
return out
@dataclass
class TurbulentPatchFlow(Flow):
base: np.ndarray
centers: np.ndarray
vectors: np.ndarray
sigma: float
max_speed: float
def __init__(
self,
base: np.ndarray,
centers: np.ndarray,
vectors: np.ndarray,
flow_id: int,
sigma: float = 1.15,
max_speed: float = 0.55,
):
super().__init__("turbulent_patch", flow_id)
self.base = np.asarray(base, dtype=np.float32)
self.centers = np.asarray(centers, dtype=np.float32)
self.vectors = np.asarray(vectors, dtype=np.float32)
self.sigma = float(sigma)
self.max_speed = float(max_speed)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
pos = np.asarray(pos, dtype=np.float32)
rel = pos[..., None, :] - self.centers
weights = np.exp(-np.sum(rel * rel, axis=-1, keepdims=True) / (2.0 * self.sigma**2))
perturb = np.sum(weights * self.vectors, axis=-2)
vel = self.base + perturb
speed = np.linalg.norm(vel, axis=-1, keepdims=True)
scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6))
return (vel * scale).astype(np.float32)
def metadata(self) -> dict[str, Any]:
out = super().metadata()
out.update({"base": self.base.astype(float).tolist(), "centers": self.centers.astype(float).tolist()})
return out
@dataclass
class ShearFlow(Flow):
base: np.ndarray
center_y: float
shear: float
max_speed: float
def __init__(self, base: np.ndarray, center_y: float, shear: float, flow_id: int, max_speed: float = 0.60):
super().__init__("shear", flow_id)
self.base = np.asarray(base, dtype=np.float32)
self.center_y = float(center_y)
self.shear = float(shear)
self.max_speed = float(max_speed)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
pos = np.asarray(pos, dtype=np.float32)
vel = np.broadcast_to(self.base, pos.shape).copy()
vel[..., 0] += self.shear * (pos[..., 1] - self.center_y)
speed = np.linalg.norm(vel, axis=-1, keepdims=True)
scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6))
return (vel * scale).astype(np.float32)
def metadata(self) -> dict[str, Any]:
out = super().metadata()
out.update({"base": self.base.astype(float).tolist(), "center_y": self.center_y, "shear": self.shear})
return out
@dataclass
class DoubleGyreFlow(Flow):
amp: float
phase: float
workspace: tuple[float, float, float, float]
max_speed: float
def __init__(
self,
amp: float,
phase: float,
flow_id: int,
workspace: tuple[float, float, float, float],
max_speed: float = 0.50,
):
super().__init__("double_gyre", flow_id)
self.amp = float(amp)
self.phase = float(phase)
self.workspace = workspace
self.max_speed = float(max_speed)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
pos = np.asarray(pos, dtype=np.float32)
xmin, xmax, ymin, ymax = self.workspace
x = (pos[..., 0] - xmin) / max(xmax - xmin, 1e-6)
y = (pos[..., 1] - ymin) / max(ymax - ymin, 1e-6)
sx = np.sin(np.pi * x + self.phase)
cx = np.cos(np.pi * x + self.phase)
sy = np.sin(2.0 * np.pi * y)
cy = np.cos(2.0 * np.pi * y)
vel = np.stack([self.amp * sx * cy, -0.5 * self.amp * cx * sy], axis=-1)
speed = np.linalg.norm(vel, axis=-1, keepdims=True)
scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6))
return (vel * scale).astype(np.float32)
def metadata(self) -> dict[str, Any]:
out = super().metadata()
out.update({"amp": self.amp, "phase": self.phase})
return out
@dataclass
class SourceSinkFlow(Flow):
base: np.ndarray
centers: np.ndarray
strengths: np.ndarray
radius_eps: float
max_speed: float
def __init__(
self,
name: str,
base: np.ndarray,
centers: np.ndarray,
strengths: np.ndarray,
flow_id: int,
radius_eps: float = 0.45,
max_speed: float = 0.50,
):
super().__init__(name, flow_id)
self.base = np.asarray(base, dtype=np.float32)
self.centers = np.asarray(centers, dtype=np.float32)
self.strengths = np.asarray(strengths, dtype=np.float32)
self.radius_eps = float(radius_eps)
self.max_speed = float(max_speed)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
pos = np.asarray(pos, dtype=np.float32)
rel = pos[..., None, :] - self.centers
denom = np.sum(rel * rel, axis=-1, keepdims=True) + self.radius_eps**2
strength_shape = (1,) * (rel.ndim - 2) + (self.strengths.shape[0], 1)
radial = np.sum(self.strengths.reshape(strength_shape) * rel / denom, axis=-2)
vel = self.base + radial
speed = np.linalg.norm(vel, axis=-1, keepdims=True)
scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6))
return (vel * scale).astype(np.float32)
def metadata(self) -> dict[str, Any]:
out = super().metadata()
out.update(
{
"base": self.base.astype(float).tolist(),
"centers": self.centers.astype(float).tolist(),
"strengths": self.strengths.astype(float).tolist(),
}
)
return out
@dataclass
class RandomFourierFlow(Flow):
base: np.ndarray
k: np.ndarray
amp: np.ndarray
phase: np.ndarray
max_speed: float
def __init__(
self,
base: np.ndarray,
k: np.ndarray,
amp: np.ndarray,
phase: np.ndarray,
flow_id: int,
max_speed: float = 0.60,
):
super().__init__("random_fourier", flow_id)
self.base = np.asarray(base, dtype=np.float32)
self.k = np.asarray(k, dtype=np.float32)
self.amp = np.asarray(amp, dtype=np.float32)
self.phase = np.asarray(phase, dtype=np.float32)
self.max_speed = float(max_speed)
def velocity(self, pos: np.ndarray, t: float = 0.0) -> np.ndarray:
pos = np.asarray(pos, dtype=np.float32)
flat = pos.reshape(-1, 2)
arg = flat @ self.k.T + self.phase[None, :]
# Divergence-free field via stream function psi: v=(dpsi/dy, -dpsi/dx).
coeff = self.amp[None, :] * np.cos(arg)
vx = np.sum(coeff * self.k[None, :, 1], axis=1)
vy = -np.sum(coeff * self.k[None, :, 0], axis=1)
vel = np.stack([vx, vy], axis=-1).reshape(pos.shape) + self.base
speed = np.linalg.norm(vel, axis=-1, keepdims=True)
scale = np.minimum(1.0, self.max_speed / np.maximum(speed, 1e-6))
return (vel * scale).astype(np.float32)
def metadata(self) -> dict[str, Any]:
out = super().metadata()
out.update({"base": self.base.astype(float).tolist(), "num_modes": int(self.k.shape[0])})
return out
def _sample_uniform_vector(rng: np.random.Generator, min_speed: float = 0.05, max_speed: float = 0.35) -> np.ndarray:
speed = rng.uniform(min_speed, max_speed)
direction = rng.uniform(0.0, 2.0 * np.pi)
return np.array([speed * np.cos(direction), speed * np.sin(direction)], dtype=np.float32)
def _sample_signed_uniform(rng: np.random.Generator, min_abs: float, max_abs: float) -> float:
sign = -1.0 if rng.random() < 0.5 else 1.0
return float(sign * rng.uniform(min_abs, max_abs))
def _route_anchors(workspace: tuple[float, float, float, float]) -> np.ndarray:
xmin, xmax, ymin, ymax = workspace
w = xmax - xmin
h = ymax - ymin
points = np.array(
[
[0.20, 0.20],
[0.35, 0.35],
[0.50, 0.50],
[0.65, 0.65],
[0.80, 0.80],
[0.20, 0.80],
[0.35, 0.65],
[0.50, 0.50],
[0.65, 0.35],
[0.80, 0.20],
[0.50, 0.25],
[0.50, 0.75],
[0.25, 0.50],
[0.75, 0.50],
],
dtype=np.float32,
)
points[:, 0] = xmin + points[:, 0] * w
points[:, 1] = ymin + points[:, 1] * h
return points
def _sample_route_center(
rng: np.random.Generator,
workspace: tuple[float, float, float, float],
jitter: float = 0.65,
) -> np.ndarray:
xmin, xmax, ymin, ymax = workspace
anchors = _route_anchors(workspace)
center = anchors[int(rng.integers(0, len(anchors)))] + rng.normal(0.0, jitter, size=2).astype(np.float32)
return np.array([np.clip(center[0], xmin + 1.2, xmax - 1.2), np.clip(center[1], ymin + 1.2, ymax - 1.2)], dtype=np.float32)
def _sample_route_centers(
rng: np.random.Generator,
workspace: tuple[float, float, float, float],
count: int,
jitter: float = 0.65,
) -> np.ndarray:
return np.stack([_sample_route_center(rng, workspace, jitter=jitter) for _ in range(count)], axis=0).astype(np.float32)
def sample_flow(
flow_type: str,
rng: np.random.Generator,
flow_id: int,
workspace: tuple[float, float, float, float] = (0.0, 10.0, 0.0, 10.0),
) -> Flow:
flow_type = flow_type.lower()
profile = PAPER_FLOW
if flow_type == "noflow":
return NoFlow(flow_id=0)
if flow_type == "uniform":
return UniformFlow(_sample_uniform_vector(rng, profile["uniform_min"], profile["uniform_max"]), flow_id=flow_id)
if flow_type == "vortex_center":
center = _sample_route_center(rng, workspace, jitter=0.45)
base = _sample_uniform_vector(rng, 0.0, profile["vortex_base_max"])
gamma = _sample_signed_uniform(rng, profile["vortex_gamma_min"], profile["vortex_gamma_max"])
return VortexFlow(base=base, center=center, gamma=gamma, flow_id=flow_id, max_speed=profile["vortex_max"], name=flow_type)
if flow_type == "gradient":
center = _sample_route_center(rng, workspace, jitter=0.35)
base = _sample_uniform_vector(rng, profile["gradient_base_min"], profile["gradient_base_max"])
scale = rng.uniform(profile["gradient_matrix_min"], profile["gradient_matrix_max"])
mat = rng.normal(0.0, 1.0, size=(2, 2)).astype(np.float32)
mat = mat / max(float(np.linalg.norm(mat)), 1e-6) * scale
return GradientFlow(base=base, center=center, matrix=mat, flow_id=flow_id, max_speed=profile["gradient_max"])
if flow_type == "turbulent_patch":
base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"])
centers = _sample_route_centers(rng, workspace, count=5, jitter=0.85)
vectors = rng.normal(0.0, profile["turbulent_vector_std"], size=(5, 2)).astype(np.float32)
return TurbulentPatchFlow(base=base, centers=centers, vectors=vectors, flow_id=flow_id, max_speed=profile["turbulent_max"])
if flow_type == "shear":
xmin, xmax, ymin, ymax = workspace
base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"])
center_y = 0.5 * (ymin + ymax)
shear = _sample_signed_uniform(rng, profile["shear_min"], profile["shear_max_rate"])
return ShearFlow(base=base, center_y=center_y, shear=shear, flow_id=flow_id, max_speed=profile["shear_max"])
if flow_type == "double_gyre":
amp = float(rng.uniform(profile["double_gyre_amp_min"], profile["double_gyre_amp_max"]))
phase = float(rng.uniform(0.0, 2.0 * np.pi))
return DoubleGyreFlow(amp=amp, phase=phase, flow_id=flow_id, workspace=workspace, max_speed=profile["double_gyre_max"])
if flow_type == "source_sink":
base = _sample_uniform_vector(rng, 0.0, profile["source_base_max"])
center = _sample_route_centers(rng, workspace, count=1, jitter=0.65)
strength = np.array([_sample_signed_uniform(rng, profile["source_strength_min"], profile["source_strength_max"])], dtype=np.float32)
return SourceSinkFlow(
name="source_sink",
base=base,
centers=center,
strengths=strength,
flow_id=flow_id,
max_speed=profile["source_max"],
)
if flow_type == "source_sink_pair":
base = _sample_uniform_vector(rng, 0.0, profile["source_base_max"])
centers = _sample_route_centers(rng, workspace, count=2, jitter=0.75)
strength = float(rng.uniform(profile["source_strength_min"], profile["source_strength_max"]))
if rng.random() < 0.5:
strength = -strength
strengths = np.array([strength, -strength], dtype=np.float32)
return SourceSinkFlow(
name="source_sink_pair",
base=base,
centers=centers,
strengths=strengths,
flow_id=flow_id,
max_speed=profile["source_max"],
)
if flow_type == "random_fourier":
base = _sample_uniform_vector(rng, 0.0, profile["turbulent_base_max"])
modes = 8
k = rng.integers(1, 5, size=(modes, 2)).astype(np.float32)
signs = rng.choice([-1.0, 1.0], size=(modes, 2)).astype(np.float32)
k = signs * k * (2.0 * np.pi / 10.0)
amp_std = profile["random_fourier_amp_std"]
amp = rng.normal(0.0, amp_std, size=(modes,)).astype(np.float32)
phase = rng.uniform(0.0, 2.0 * np.pi, size=(modes,)).astype(np.float32)
return RandomFourierFlow(
base=base,
k=k,
amp=amp,
phase=phase,
flow_id=flow_id,
max_speed=profile["random_fourier_max"],
)
raise ValueError(f"unknown flow type: {flow_type}")