snesbitt's picture
Mountain Waves — deploy to Hugging Face Space
7c3bfa9
Raw
History Blame Contribute Delete
16.7 kB
"""Pure-Python reference implementation of the mountain-wave solver.
This module is a direct transcription of the MATLAB routines ``tlwplot.m``
and ``stream.m`` written by Dr. Robert E. (Bob) Hart in 1995 as a Penn
State Meteo 574 seminar project (see
https://moe.met.fsu.edu/~rhart/mtnwave.html). The two-layer routine here
mirrors Hart's MATLAB code line-for-line (reformulated in NumPy); the
multi-layer routine is a natural generalization using the same Fourier +
transfer-matrix scheme.
It exists for two reasons:
1. It's the fallback when the Rust extension isn't built.
2. It's the reference used by ``validate.py`` to confirm the Rust core
produces bit-similar results.
The multi-layer solver implements the same transfer-matrix scheme as the
Rust version so they can be compared.
"""
from __future__ import annotations
import math
import numpy as np
G = 9.80665
# ---------------------------------------------------------------------------
# Two-layer analytic solver (port of tlwplot.m)
# ---------------------------------------------------------------------------
def compute_two_layer(
l_upper: float,
l_lower: float,
u: float,
h: float,
a: float,
ho: float,
xdom: float,
zdom: float,
mink: float,
maxk: float,
npts: int = 100,
):
"""Return ``(x, z, w, u_prime)`` for flow over a witch-of-Agnesi mountain.
Arrays match the MATLAB conventions: ``x`` has shape ``(npts + 1,)``,
``z`` has shape ``(npts + 1,)``, and both ``w`` and ``u_prime`` have
shape ``(z.size, x.size)`` indexed as ``[z_index, x_index]``.
``u_prime`` is the wave-induced horizontal wind perturbation, obtained
from linearized continuity ``∂u'/∂x + ∂w/∂z = 0``. Per wavenumber,
``u'_k = −(i/k) · ∂ŵ_k/∂z``; we analytically differentiate the two-layer
eigenfunctions (``A e^{−n z}`` above the interface, ``C e^{i m z} +
D e^{−i m z}`` below) and accumulate the same trapezoidal k-integration
used for ``w``.
"""
dk = 0.367 / a
nk = max(1, int((maxk - mink) // dk))
minx = -0.25 * xdom
maxx = 0.75 * xdom
dx = (maxx - minx) / npts
dz = zdom / npts
x = minx + dx * np.arange(npts + 1)
z = dz * np.arange(npts + 1)
X, Z = np.meshgrid(x, z) # shape (npts+1, npts+1), Z[i, j] varies along i
matrix1 = np.zeros_like(X, dtype=complex)
matrix3 = np.zeros_like(X, dtype=complex)
matrix1_u = np.zeros_like(X, dtype=complex)
matrix3_u = np.zeros_like(X, dtype=complex)
ht = 0.0
for kloop in range(nk + 1):
kk = mink + dk * kloop
m = np.sqrt(complex(l_lower ** 2 - kk ** 2))
n = np.sqrt(complex(kk ** 2 - l_upper ** 2))
denom = m + 1j * n
if abs(denom) < 1e-300:
r = complex(9e99, 0.0)
else:
r = (m - 1j * n) / denom
R = r * np.exp(2j * m * h)
A = (1 + r) * np.exp(h * n + 1j * h * m) / (1 + R)
C = 1.0 / (1 + R)
D = R * C
ksign = abs(kk)
hs = np.pi * a * ho * np.exp(-a * ksign)
ht += np.pi * dk * a * np.exp(-a * ksign) if kloop > 0 else 0.0
above = A * np.exp(-Z * n) * (Z > h)
below = (C * np.exp(1j * Z * m) + D * np.exp(-1j * Z * m)) * (Z <= h)
matrix2 = (-1j * kk * hs * u * (above + below)) * np.exp(-1j * X * kk)
# Analytic z-derivative of the eigenfunctions (the two branches are
# continuous at z=h by construction, so the jump in the step factor
# contributes nothing to the derivative inside each region).
dabove = (-n) * A * np.exp(-Z * n) * (Z > h)
dbelow = (1j * m) * (C * np.exp(1j * Z * m) - D * np.exp(-1j * Z * m)) * (Z <= h)
# u'_k(x, z) = (-i/k) · ∂ŵ_k/∂z. Combining the −ik factor baked into
# matrix2's ŵ formula with the −i/k in front yields −hs·U·∂(above+
# below)/∂z. This removes the apparent 1/k singularity at k=0 — the
# result is analytic there — and avoids division edge cases.
matrix2_u = (-hs * u) * (dabove + dbelow) * np.exp(-1j * X * kk)
if kloop > 0:
matrix3 += 0.5 * (matrix1 + matrix2) * dk
matrix3_u += 0.5 * (matrix1_u + matrix2_u) * dk
matrix1 = matrix2
matrix1_u = matrix2_u
if ht == 0.0:
ht = 1.0
w = np.real(matrix3 / ht)
u_prime = np.real(matrix3_u / ht)
return x, z, w, u_prime
# ---------------------------------------------------------------------------
# Multi-layer profile solver
# ---------------------------------------------------------------------------
# Minimum |U| used in the Scorer-parameter denominator. In pure linear
# theory, U(z) = 0 is a critical level where l² = N²/U² − (U″/U) is
# singular; linear Scorer/Taylor-Goldstein cannot honestly solve across
# such a level. In a teaching tool we *want* students to be able to set
# up a wind-reversal profile and see what happens away from the critical
# level rather than have the whole solve NaN out. We clamp |U| to this
# floor (preserving sign) when evaluating the Scorer coefficients. Away
# from U≈0 this is a no-op; within ±0.5 m/s it caps l² at a large but
# finite value and the UI emits a "critical level detected" warning so
# nobody is misled into treating the capped zone as physical.
U_FLOOR_SCORER = 0.5 # m/s
def _u_clamped_for_scorer(uu: float) -> float:
"""Return ``uu`` with ``|uu|`` lifted to ``U_FLOOR_SCORER``; sign preserved."""
if uu >= 0.0:
return max(uu, U_FLOOR_SCORER)
return min(uu, -U_FLOOR_SCORER)
def scorer_from_profile(z_profile, u_profile, theta_profile):
"""Return Scorer parameter L^2(z) computed from profile data.
Handles wind reversals (sign changes in ``u_profile``) by clamping the
magnitude of ``U`` at ``U_FLOOR_SCORER`` when it evaluates the
``N²/U² − U″/U`` combination. This keeps the solver numerically
well-behaved across a critical level (``U = 0``) at the cost of a
physically sharp feature there — see the ``critical_levels`` helper
below for the companion diagnostic surfaced in the UI.
"""
z = np.asarray(z_profile, dtype=float)
u = np.asarray(u_profile, dtype=float)
theta = np.asarray(theta_profile, dtype=float)
n = z.size
l2 = np.zeros(n)
for i in range(n):
if i == 0:
dthdz = (theta[1] - theta[0]) / (z[1] - z[0])
if n >= 3:
h1 = z[1] - z[0]
h2 = z[2] - z[1]
d2u = 2.0 * (u[2] * h1 - u[1] * (h1 + h2) + u[0] * h2) / (h1 * h2 * (h1 + h2))
else:
d2u = 0.0
elif i == n - 1:
dthdz = (theta[-1] - theta[-2]) / (z[-1] - z[-2])
if n >= 3:
h1 = z[-2] - z[-3]
h2 = z[-1] - z[-2]
d2u = 2.0 * (u[-1] * h1 - u[-2] * (h1 + h2) + u[-3] * h2) / (h1 * h2 * (h1 + h2))
else:
d2u = 0.0
else:
h1 = z[i] - z[i - 1]
h2 = z[i + 1] - z[i]
dthdz = (
theta[i + 1] * h1 ** 2
- theta[i - 1] * h2 ** 2
+ theta[i] * (h2 ** 2 - h1 ** 2)
) / (h1 * h2 * (h1 + h2))
d2u = 2.0 * (u[i + 1] * h1 - u[i] * (h1 + h2) + u[i - 1] * h2) / (h1 * h2 * (h1 + h2))
n2 = (G / theta[i]) * dthdz
uu = _u_clamped_for_scorer(u[i])
l2[i] = n2 / uu ** 2 - d2u / uu
return l2
def critical_levels(z_profile, u_profile):
"""Return heights (m) where ``u_profile`` crosses zero, linearly interpolated.
A "critical level" for steady, 2-D, horizontally uniform linear mountain
waves is a height where the mean flow vanishes (``U = 0``). Linear
Scorer/Taylor-Goldstein theory is singular there — wave energy is
absorbed rather than propagated (Booker & Bretherton 1967) — so any
result the solver returns *near* a critical level should be read as
"this is where the linear model breaks down," not as a prediction.
Caller (the Dash UI) surfaces the returned heights in a diagnostics
badge so students can see where their profile is violating the
assumptions of the model.
"""
z = np.asarray(z_profile, dtype=float)
u = np.asarray(u_profile, dtype=float)
heights = []
for i in range(1, z.size):
u_prev, u_curr = u[i - 1], u[i]
# Treat exact zeros as crossings at that sample.
if u_curr == 0.0:
heights.append(float(z[i]))
continue
if u_prev == 0.0:
# Already recorded by the previous iteration's "u_curr == 0" branch.
continue
if (u_prev > 0.0 and u_curr < 0.0) or (u_prev < 0.0 and u_curr > 0.0):
# Linear interp to the zero crossing.
t = u_prev / (u_prev - u_curr)
heights.append(float(z[i - 1] + t * (z[i] - z[i - 1])))
return heights
def compute_from_profile(
z_profile,
u_profile,
theta_profile,
a: float,
ho: float,
xdom: float,
zdom: float,
mink: float,
maxk: float,
npts: int = 100,
):
"""Arbitrary u(z)/theta(z) mountain-wave solver using transfer matrices.
Returns ``(x, z, w, u_prime)``. The atmosphere is split into
piecewise-constant L² layers centered on the profile points. Inside
each layer the wave-transform equation reduces to an exponential
ansatz; continuity of ŵ and ŵ' at interfaces plus a radiation / decay
condition aloft closes the system.
The wave-induced horizontal wind perturbation ``u_prime`` is obtained
in the same Fourier loop: for each wavenumber ``k ≠ 0`` we take the
analytic z-derivative of the per-layer ŵ basis (``σ_j · (−a_j
e^{−σ_j Δz} + b_j e^{+σ_j Δz})``) and multiply by ``−i/k`` from the
linearized continuity relation ``u'_k = −(i/k) · ∂ŵ_k/∂z``.
"""
zp = np.asarray(z_profile, dtype=float)
up = np.asarray(u_profile, dtype=float)
tp = np.asarray(theta_profile, dtype=float)
l2 = scorer_from_profile(zp, up, tp)
u_surface = float(up[0])
nlayers = zp.size
layer_bot = np.empty(nlayers)
layer_top = np.empty(nlayers)
for j in range(nlayers):
layer_bot[j] = 0.0 if j == 0 else 0.5 * (zp[j - 1] + zp[j])
layer_top[j] = np.inf if j == nlayers - 1 else 0.5 * (zp[j] + zp[j + 1])
dk = 0.367 / a
nk = max(1, int((maxk - mink) // dk))
nslab = nk + 1
minx = -0.25 * xdom
maxx = 0.75 * xdom
dx = (maxx - minx) / npts
dz = zdom / npts
x = minx + dx * np.arange(npts + 1)
z = dz * np.arange(npts + 1)
# Layer index for each vertical grid point
layer_of = np.zeros(z.size, dtype=int)
for j, zj in enumerate(z):
idx = nlayers - 1
for lj in range(nlayers):
if zj < layer_top[lj]:
idx = lj
break
layer_of[j] = idx
matrix1 = np.zeros((z.size, x.size), dtype=complex)
matrix3 = np.zeros((z.size, x.size), dtype=complex)
matrix1_u = np.zeros((z.size, x.size), dtype=complex)
matrix3_u = np.zeros((z.size, x.size), dtype=complex)
ht = 0.0
for kloop in range(nslab):
kk = mink + dk * kloop
ksign = abs(kk)
hs = np.pi * a * ho * np.exp(-a * ksign)
if kloop > 0:
ht += np.pi * dk * a * np.exp(-a * ksign)
if kk == 0.0:
# DC mode has no wave contribution; u' also vanishes here.
matrix2 = np.zeros_like(matrix1)
matrix2_u = np.zeros_like(matrix1)
else:
# Principal-branch sigma: in each layer, the "a" coefficient
# multiplies exp(-sigma*dz), which is always the outgoing /
# decaying branch when Im(sigma) >= 0.
sigma = np.empty(nlayers, dtype=complex)
for j in range(nlayers):
s = np.sqrt(complex(kk ** 2 - l2[j]))
if s.imag < 0:
s = -s
sigma[j] = s
aj = np.zeros(nlayers, dtype=complex)
bj = np.zeros(nlayers, dtype=complex)
aj[-1] = 1.0
bj[-1] = 0.0
for j in range(nlayers - 2, -1, -1):
dz_j = layer_top[j] - layer_bot[j]
e_minus = np.exp(-sigma[j] * dz_j)
e_plus = np.exp(sigma[j] * dz_j)
alpha = aj[j + 1] + bj[j + 1]
beta = -aj[j + 1] + bj[j + 1]
ratio = sigma[j + 1] / sigma[j]
aj[j] = 0.5 * (alpha - ratio * beta) * e_plus
bj[j] = 0.5 * (alpha + ratio * beta) * e_minus
w_surface = aj[0] + bj[0]
if abs(w_surface) < 1e-300:
amp = 0.0 + 0.0j
else:
amp = -1j * kk * u_surface * hs / w_surface
aj *= amp
bj *= amp
# Build ŵ and ∂ŵ/∂z on the vertical grid. The per-layer basis
# ŵ_j(z) = a_j e^{−σ_j Δz} + b_j e^{+σ_j Δz}
# differentiates cleanly to
# ∂ŵ_j/∂z = σ_j · (−a_j e^{−σ_j Δz} + b_j e^{+σ_j Δz})
# and the continuity relation gives u'_k = −(i/k) · ∂ŵ/∂z.
zfac = np.zeros(z.size, dtype=complex)
zfac_u = np.zeros(z.size, dtype=complex)
inv_ik = -1j / kk
for j in range(z.size):
lj = layer_of[j]
dz_l = z[j] - layer_bot[lj]
e_minus = np.exp(-sigma[lj] * dz_l)
e_plus = np.exp(sigma[lj] * dz_l)
zfac[j] = aj[lj] * e_minus + bj[lj] * e_plus
dwdz = sigma[lj] * (-aj[lj] * e_minus + bj[lj] * e_plus)
zfac_u[j] = inv_ik * dwdz
xfac = np.exp(-1j * x * kk)
matrix2 = np.outer(zfac, xfac)
matrix2_u = np.outer(zfac_u, xfac)
if kloop > 0:
matrix3 += 0.5 * (matrix1 + matrix2) * dk
matrix3_u += 0.5 * (matrix1_u + matrix2_u) * dk
matrix1 = matrix2
matrix1_u = matrix2_u
if ht == 0.0:
ht = 1.0
w = np.real(matrix3 / ht)
u_prime = np.real(matrix3_u / ht)
return x, z, w, u_prime
# ---------------------------------------------------------------------------
# Streamline tracer (port of stream.m)
# ---------------------------------------------------------------------------
def streamlines(x, z, u, w, num: int = 10):
"""Return ``num`` streamlines as ``[(xs, ys), ...]`` polylines.
``u`` may be a scalar (uniform mean flow, used for the two-layer solver)
or a 1-D array of length ``nz`` giving the mean wind at each render-grid
height. In linear wave theory the parcel displacement at height ``z₀`` is
``η(x, z₀) = (1/U(z₀)) · ∫ w(x', z₀) dx'``, so the time step used to
integrate along each streamline depends on the wind at that streamline's
height — not on the surface wind. Using a single scalar ``U_surface`` for
every streamline (as Hart's MATLAB ``stream.m`` did because the two-layer
case assumed uniform ``U``) over-amplifies upper streamlines whenever the
real profile has shear.
We guard against near-zero ``U(z₀)`` (which would blow up the tracer) with
a 0.1 m/s floor — a parcel literally at rest cannot trace a linear
streamline in this framework, so we just freeze it there.
"""
x = np.asarray(x)
z = np.asarray(z)
w = np.asarray(w)
nx = x.size
nz = z.size
if nx < 2 or nz < 2 or num == 0:
return []
minx = float(x[0])
dx = float(x[1] - x[0])
u_arr = np.atleast_1d(np.asarray(u, dtype=float))
if u_arr.size == 1:
u_by_row = np.full(nz, float(u_arr[0]))
elif u_arr.size == nz:
u_by_row = u_arr
else:
# Caller gave an array of the wrong length — fall back to the mean so
# the plot still renders rather than raising mid-draw.
u_by_row = np.full(nz, float(np.mean(u_arr)))
dh = nz / num
lines = []
for j in range(num):
ycell = 1.0 + dh * j
if ycell < 1.0:
ycell = 1.0
if ycell > nz:
ycell = nz
yci = int(round(ycell) - 1)
yci = max(0, min(nz - 1, yci))
u_local = float(u_by_row[yci])
# 0.1 m/s floor prevents 1/u blowups at stagnant layers.
u_local = u_local if abs(u_local) > 0.1 else math.copysign(0.1, u_local) if u_local != 0 else 0.1
tstep = dx / u_local
xs = np.empty(nx)
ys = np.empty(nx)
xs[0] = minx
ys[0] = z[yci]
for i in range(1, nx):
xs[i] = x[i]
ys[i] = ys[i - 1] + tstep * w[yci, i]
lines.append((xs, ys))
return lines