Spaces:
Sleeping
Sleeping
| """Pure-Python reference implementation of the mountain-wave solver. | |
| This module is a direct transcription of the MATLAB routines ``tlwplot.m`` | |
| and ``stream.m`` written by Dr. Robert E. (Bob) Hart in 1995 as a Penn | |
| State Meteo 574 seminar project (see | |
| https://moe.met.fsu.edu/~rhart/mtnwave.html). The two-layer routine here | |
| mirrors Hart's MATLAB code line-for-line (reformulated in NumPy); the | |
| multi-layer routine is a natural generalization using the same Fourier + | |
| transfer-matrix scheme. | |
| It exists for two reasons: | |
| 1. It's the fallback when the Rust extension isn't built. | |
| 2. It's the reference used by ``validate.py`` to confirm the Rust core | |
| produces bit-similar results. | |
| The multi-layer solver implements the same transfer-matrix scheme as the | |
| Rust version so they can be compared. | |
| """ | |
| from __future__ import annotations | |
| import math | |
| import numpy as np | |
| G = 9.80665 | |
| # --------------------------------------------------------------------------- | |
| # Two-layer analytic solver (port of tlwplot.m) | |
| # --------------------------------------------------------------------------- | |
| def compute_two_layer( | |
| l_upper: float, | |
| l_lower: float, | |
| u: float, | |
| h: float, | |
| a: float, | |
| ho: float, | |
| xdom: float, | |
| zdom: float, | |
| mink: float, | |
| maxk: float, | |
| npts: int = 100, | |
| ): | |
| """Return ``(x, z, w, u_prime)`` for flow over a witch-of-Agnesi mountain. | |
| Arrays match the MATLAB conventions: ``x`` has shape ``(npts + 1,)``, | |
| ``z`` has shape ``(npts + 1,)``, and both ``w`` and ``u_prime`` have | |
| shape ``(z.size, x.size)`` indexed as ``[z_index, x_index]``. | |
| ``u_prime`` is the wave-induced horizontal wind perturbation, obtained | |
| from linearized continuity ``∂u'/∂x + ∂w/∂z = 0``. Per wavenumber, | |
| ``u'_k = −(i/k) · ∂ŵ_k/∂z``; we analytically differentiate the two-layer | |
| eigenfunctions (``A e^{−n z}`` above the interface, ``C e^{i m z} + | |
| D e^{−i m z}`` below) and accumulate the same trapezoidal k-integration | |
| used for ``w``. | |
| """ | |
| dk = 0.367 / a | |
| nk = max(1, int((maxk - mink) // dk)) | |
| minx = -0.25 * xdom | |
| maxx = 0.75 * xdom | |
| dx = (maxx - minx) / npts | |
| dz = zdom / npts | |
| x = minx + dx * np.arange(npts + 1) | |
| z = dz * np.arange(npts + 1) | |
| X, Z = np.meshgrid(x, z) # shape (npts+1, npts+1), Z[i, j] varies along i | |
| matrix1 = np.zeros_like(X, dtype=complex) | |
| matrix3 = np.zeros_like(X, dtype=complex) | |
| matrix1_u = np.zeros_like(X, dtype=complex) | |
| matrix3_u = np.zeros_like(X, dtype=complex) | |
| ht = 0.0 | |
| for kloop in range(nk + 1): | |
| kk = mink + dk * kloop | |
| m = np.sqrt(complex(l_lower ** 2 - kk ** 2)) | |
| n = np.sqrt(complex(kk ** 2 - l_upper ** 2)) | |
| denom = m + 1j * n | |
| if abs(denom) < 1e-300: | |
| r = complex(9e99, 0.0) | |
| else: | |
| r = (m - 1j * n) / denom | |
| R = r * np.exp(2j * m * h) | |
| A = (1 + r) * np.exp(h * n + 1j * h * m) / (1 + R) | |
| C = 1.0 / (1 + R) | |
| D = R * C | |
| ksign = abs(kk) | |
| hs = np.pi * a * ho * np.exp(-a * ksign) | |
| ht += np.pi * dk * a * np.exp(-a * ksign) if kloop > 0 else 0.0 | |
| above = A * np.exp(-Z * n) * (Z > h) | |
| below = (C * np.exp(1j * Z * m) + D * np.exp(-1j * Z * m)) * (Z <= h) | |
| matrix2 = (-1j * kk * hs * u * (above + below)) * np.exp(-1j * X * kk) | |
| # Analytic z-derivative of the eigenfunctions (the two branches are | |
| # continuous at z=h by construction, so the jump in the step factor | |
| # contributes nothing to the derivative inside each region). | |
| dabove = (-n) * A * np.exp(-Z * n) * (Z > h) | |
| dbelow = (1j * m) * (C * np.exp(1j * Z * m) - D * np.exp(-1j * Z * m)) * (Z <= h) | |
| # u'_k(x, z) = (-i/k) · ∂ŵ_k/∂z. Combining the −ik factor baked into | |
| # matrix2's ŵ formula with the −i/k in front yields −hs·U·∂(above+ | |
| # below)/∂z. This removes the apparent 1/k singularity at k=0 — the | |
| # result is analytic there — and avoids division edge cases. | |
| matrix2_u = (-hs * u) * (dabove + dbelow) * np.exp(-1j * X * kk) | |
| if kloop > 0: | |
| matrix3 += 0.5 * (matrix1 + matrix2) * dk | |
| matrix3_u += 0.5 * (matrix1_u + matrix2_u) * dk | |
| matrix1 = matrix2 | |
| matrix1_u = matrix2_u | |
| if ht == 0.0: | |
| ht = 1.0 | |
| w = np.real(matrix3 / ht) | |
| u_prime = np.real(matrix3_u / ht) | |
| return x, z, w, u_prime | |
| # --------------------------------------------------------------------------- | |
| # Multi-layer profile solver | |
| # --------------------------------------------------------------------------- | |
| # Minimum |U| used in the Scorer-parameter denominator. In pure linear | |
| # theory, U(z) = 0 is a critical level where l² = N²/U² − (U″/U) is | |
| # singular; linear Scorer/Taylor-Goldstein cannot honestly solve across | |
| # such a level. In a teaching tool we *want* students to be able to set | |
| # up a wind-reversal profile and see what happens away from the critical | |
| # level rather than have the whole solve NaN out. We clamp |U| to this | |
| # floor (preserving sign) when evaluating the Scorer coefficients. Away | |
| # from U≈0 this is a no-op; within ±0.5 m/s it caps l² at a large but | |
| # finite value and the UI emits a "critical level detected" warning so | |
| # nobody is misled into treating the capped zone as physical. | |
| U_FLOOR_SCORER = 0.5 # m/s | |
| def _u_clamped_for_scorer(uu: float) -> float: | |
| """Return ``uu`` with ``|uu|`` lifted to ``U_FLOOR_SCORER``; sign preserved.""" | |
| if uu >= 0.0: | |
| return max(uu, U_FLOOR_SCORER) | |
| return min(uu, -U_FLOOR_SCORER) | |
| def scorer_from_profile(z_profile, u_profile, theta_profile): | |
| """Return Scorer parameter L^2(z) computed from profile data. | |
| Handles wind reversals (sign changes in ``u_profile``) by clamping the | |
| magnitude of ``U`` at ``U_FLOOR_SCORER`` when it evaluates the | |
| ``N²/U² − U″/U`` combination. This keeps the solver numerically | |
| well-behaved across a critical level (``U = 0``) at the cost of a | |
| physically sharp feature there — see the ``critical_levels`` helper | |
| below for the companion diagnostic surfaced in the UI. | |
| """ | |
| z = np.asarray(z_profile, dtype=float) | |
| u = np.asarray(u_profile, dtype=float) | |
| theta = np.asarray(theta_profile, dtype=float) | |
| n = z.size | |
| l2 = np.zeros(n) | |
| for i in range(n): | |
| if i == 0: | |
| dthdz = (theta[1] - theta[0]) / (z[1] - z[0]) | |
| if n >= 3: | |
| h1 = z[1] - z[0] | |
| h2 = z[2] - z[1] | |
| d2u = 2.0 * (u[2] * h1 - u[1] * (h1 + h2) + u[0] * h2) / (h1 * h2 * (h1 + h2)) | |
| else: | |
| d2u = 0.0 | |
| elif i == n - 1: | |
| dthdz = (theta[-1] - theta[-2]) / (z[-1] - z[-2]) | |
| if n >= 3: | |
| h1 = z[-2] - z[-3] | |
| h2 = z[-1] - z[-2] | |
| d2u = 2.0 * (u[-1] * h1 - u[-2] * (h1 + h2) + u[-3] * h2) / (h1 * h2 * (h1 + h2)) | |
| else: | |
| d2u = 0.0 | |
| else: | |
| h1 = z[i] - z[i - 1] | |
| h2 = z[i + 1] - z[i] | |
| dthdz = ( | |
| theta[i + 1] * h1 ** 2 | |
| - theta[i - 1] * h2 ** 2 | |
| + theta[i] * (h2 ** 2 - h1 ** 2) | |
| ) / (h1 * h2 * (h1 + h2)) | |
| d2u = 2.0 * (u[i + 1] * h1 - u[i] * (h1 + h2) + u[i - 1] * h2) / (h1 * h2 * (h1 + h2)) | |
| n2 = (G / theta[i]) * dthdz | |
| uu = _u_clamped_for_scorer(u[i]) | |
| l2[i] = n2 / uu ** 2 - d2u / uu | |
| return l2 | |
| def critical_levels(z_profile, u_profile): | |
| """Return heights (m) where ``u_profile`` crosses zero, linearly interpolated. | |
| A "critical level" for steady, 2-D, horizontally uniform linear mountain | |
| waves is a height where the mean flow vanishes (``U = 0``). Linear | |
| Scorer/Taylor-Goldstein theory is singular there — wave energy is | |
| absorbed rather than propagated (Booker & Bretherton 1967) — so any | |
| result the solver returns *near* a critical level should be read as | |
| "this is where the linear model breaks down," not as a prediction. | |
| Caller (the Dash UI) surfaces the returned heights in a diagnostics | |
| badge so students can see where their profile is violating the | |
| assumptions of the model. | |
| """ | |
| z = np.asarray(z_profile, dtype=float) | |
| u = np.asarray(u_profile, dtype=float) | |
| heights = [] | |
| for i in range(1, z.size): | |
| u_prev, u_curr = u[i - 1], u[i] | |
| # Treat exact zeros as crossings at that sample. | |
| if u_curr == 0.0: | |
| heights.append(float(z[i])) | |
| continue | |
| if u_prev == 0.0: | |
| # Already recorded by the previous iteration's "u_curr == 0" branch. | |
| continue | |
| if (u_prev > 0.0 and u_curr < 0.0) or (u_prev < 0.0 and u_curr > 0.0): | |
| # Linear interp to the zero crossing. | |
| t = u_prev / (u_prev - u_curr) | |
| heights.append(float(z[i - 1] + t * (z[i] - z[i - 1]))) | |
| return heights | |
| def compute_from_profile( | |
| z_profile, | |
| u_profile, | |
| theta_profile, | |
| a: float, | |
| ho: float, | |
| xdom: float, | |
| zdom: float, | |
| mink: float, | |
| maxk: float, | |
| npts: int = 100, | |
| ): | |
| """Arbitrary u(z)/theta(z) mountain-wave solver using transfer matrices. | |
| Returns ``(x, z, w, u_prime)``. The atmosphere is split into | |
| piecewise-constant L² layers centered on the profile points. Inside | |
| each layer the wave-transform equation reduces to an exponential | |
| ansatz; continuity of ŵ and ŵ' at interfaces plus a radiation / decay | |
| condition aloft closes the system. | |
| The wave-induced horizontal wind perturbation ``u_prime`` is obtained | |
| in the same Fourier loop: for each wavenumber ``k ≠ 0`` we take the | |
| analytic z-derivative of the per-layer ŵ basis (``σ_j · (−a_j | |
| e^{−σ_j Δz} + b_j e^{+σ_j Δz})``) and multiply by ``−i/k`` from the | |
| linearized continuity relation ``u'_k = −(i/k) · ∂ŵ_k/∂z``. | |
| """ | |
| zp = np.asarray(z_profile, dtype=float) | |
| up = np.asarray(u_profile, dtype=float) | |
| tp = np.asarray(theta_profile, dtype=float) | |
| l2 = scorer_from_profile(zp, up, tp) | |
| u_surface = float(up[0]) | |
| nlayers = zp.size | |
| layer_bot = np.empty(nlayers) | |
| layer_top = np.empty(nlayers) | |
| for j in range(nlayers): | |
| layer_bot[j] = 0.0 if j == 0 else 0.5 * (zp[j - 1] + zp[j]) | |
| layer_top[j] = np.inf if j == nlayers - 1 else 0.5 * (zp[j] + zp[j + 1]) | |
| dk = 0.367 / a | |
| nk = max(1, int((maxk - mink) // dk)) | |
| nslab = nk + 1 | |
| minx = -0.25 * xdom | |
| maxx = 0.75 * xdom | |
| dx = (maxx - minx) / npts | |
| dz = zdom / npts | |
| x = minx + dx * np.arange(npts + 1) | |
| z = dz * np.arange(npts + 1) | |
| # Layer index for each vertical grid point | |
| layer_of = np.zeros(z.size, dtype=int) | |
| for j, zj in enumerate(z): | |
| idx = nlayers - 1 | |
| for lj in range(nlayers): | |
| if zj < layer_top[lj]: | |
| idx = lj | |
| break | |
| layer_of[j] = idx | |
| matrix1 = np.zeros((z.size, x.size), dtype=complex) | |
| matrix3 = np.zeros((z.size, x.size), dtype=complex) | |
| matrix1_u = np.zeros((z.size, x.size), dtype=complex) | |
| matrix3_u = np.zeros((z.size, x.size), dtype=complex) | |
| ht = 0.0 | |
| for kloop in range(nslab): | |
| kk = mink + dk * kloop | |
| ksign = abs(kk) | |
| hs = np.pi * a * ho * np.exp(-a * ksign) | |
| if kloop > 0: | |
| ht += np.pi * dk * a * np.exp(-a * ksign) | |
| if kk == 0.0: | |
| # DC mode has no wave contribution; u' also vanishes here. | |
| matrix2 = np.zeros_like(matrix1) | |
| matrix2_u = np.zeros_like(matrix1) | |
| else: | |
| # Principal-branch sigma: in each layer, the "a" coefficient | |
| # multiplies exp(-sigma*dz), which is always the outgoing / | |
| # decaying branch when Im(sigma) >= 0. | |
| sigma = np.empty(nlayers, dtype=complex) | |
| for j in range(nlayers): | |
| s = np.sqrt(complex(kk ** 2 - l2[j])) | |
| if s.imag < 0: | |
| s = -s | |
| sigma[j] = s | |
| aj = np.zeros(nlayers, dtype=complex) | |
| bj = np.zeros(nlayers, dtype=complex) | |
| aj[-1] = 1.0 | |
| bj[-1] = 0.0 | |
| for j in range(nlayers - 2, -1, -1): | |
| dz_j = layer_top[j] - layer_bot[j] | |
| e_minus = np.exp(-sigma[j] * dz_j) | |
| e_plus = np.exp(sigma[j] * dz_j) | |
| alpha = aj[j + 1] + bj[j + 1] | |
| beta = -aj[j + 1] + bj[j + 1] | |
| ratio = sigma[j + 1] / sigma[j] | |
| aj[j] = 0.5 * (alpha - ratio * beta) * e_plus | |
| bj[j] = 0.5 * (alpha + ratio * beta) * e_minus | |
| w_surface = aj[0] + bj[0] | |
| if abs(w_surface) < 1e-300: | |
| amp = 0.0 + 0.0j | |
| else: | |
| amp = -1j * kk * u_surface * hs / w_surface | |
| aj *= amp | |
| bj *= amp | |
| # Build ŵ and ∂ŵ/∂z on the vertical grid. The per-layer basis | |
| # ŵ_j(z) = a_j e^{−σ_j Δz} + b_j e^{+σ_j Δz} | |
| # differentiates cleanly to | |
| # ∂ŵ_j/∂z = σ_j · (−a_j e^{−σ_j Δz} + b_j e^{+σ_j Δz}) | |
| # and the continuity relation gives u'_k = −(i/k) · ∂ŵ/∂z. | |
| zfac = np.zeros(z.size, dtype=complex) | |
| zfac_u = np.zeros(z.size, dtype=complex) | |
| inv_ik = -1j / kk | |
| for j in range(z.size): | |
| lj = layer_of[j] | |
| dz_l = z[j] - layer_bot[lj] | |
| e_minus = np.exp(-sigma[lj] * dz_l) | |
| e_plus = np.exp(sigma[lj] * dz_l) | |
| zfac[j] = aj[lj] * e_minus + bj[lj] * e_plus | |
| dwdz = sigma[lj] * (-aj[lj] * e_minus + bj[lj] * e_plus) | |
| zfac_u[j] = inv_ik * dwdz | |
| xfac = np.exp(-1j * x * kk) | |
| matrix2 = np.outer(zfac, xfac) | |
| matrix2_u = np.outer(zfac_u, xfac) | |
| if kloop > 0: | |
| matrix3 += 0.5 * (matrix1 + matrix2) * dk | |
| matrix3_u += 0.5 * (matrix1_u + matrix2_u) * dk | |
| matrix1 = matrix2 | |
| matrix1_u = matrix2_u | |
| if ht == 0.0: | |
| ht = 1.0 | |
| w = np.real(matrix3 / ht) | |
| u_prime = np.real(matrix3_u / ht) | |
| return x, z, w, u_prime | |
| # --------------------------------------------------------------------------- | |
| # Streamline tracer (port of stream.m) | |
| # --------------------------------------------------------------------------- | |
| def streamlines(x, z, u, w, num: int = 10): | |
| """Return ``num`` streamlines as ``[(xs, ys), ...]`` polylines. | |
| ``u`` may be a scalar (uniform mean flow, used for the two-layer solver) | |
| or a 1-D array of length ``nz`` giving the mean wind at each render-grid | |
| height. In linear wave theory the parcel displacement at height ``z₀`` is | |
| ``η(x, z₀) = (1/U(z₀)) · ∫ w(x', z₀) dx'``, so the time step used to | |
| integrate along each streamline depends on the wind at that streamline's | |
| height — not on the surface wind. Using a single scalar ``U_surface`` for | |
| every streamline (as Hart's MATLAB ``stream.m`` did because the two-layer | |
| case assumed uniform ``U``) over-amplifies upper streamlines whenever the | |
| real profile has shear. | |
| We guard against near-zero ``U(z₀)`` (which would blow up the tracer) with | |
| a 0.1 m/s floor — a parcel literally at rest cannot trace a linear | |
| streamline in this framework, so we just freeze it there. | |
| """ | |
| x = np.asarray(x) | |
| z = np.asarray(z) | |
| w = np.asarray(w) | |
| nx = x.size | |
| nz = z.size | |
| if nx < 2 or nz < 2 or num == 0: | |
| return [] | |
| minx = float(x[0]) | |
| dx = float(x[1] - x[0]) | |
| u_arr = np.atleast_1d(np.asarray(u, dtype=float)) | |
| if u_arr.size == 1: | |
| u_by_row = np.full(nz, float(u_arr[0])) | |
| elif u_arr.size == nz: | |
| u_by_row = u_arr | |
| else: | |
| # Caller gave an array of the wrong length — fall back to the mean so | |
| # the plot still renders rather than raising mid-draw. | |
| u_by_row = np.full(nz, float(np.mean(u_arr))) | |
| dh = nz / num | |
| lines = [] | |
| for j in range(num): | |
| ycell = 1.0 + dh * j | |
| if ycell < 1.0: | |
| ycell = 1.0 | |
| if ycell > nz: | |
| ycell = nz | |
| yci = int(round(ycell) - 1) | |
| yci = max(0, min(nz - 1, yci)) | |
| u_local = float(u_by_row[yci]) | |
| # 0.1 m/s floor prevents 1/u blowups at stagnant layers. | |
| u_local = u_local if abs(u_local) > 0.1 else math.copysign(0.1, u_local) if u_local != 0 else 0.1 | |
| tstep = dx / u_local | |
| xs = np.empty(nx) | |
| ys = np.empty(nx) | |
| xs[0] = minx | |
| ys[0] = z[yci] | |
| for i in range(1, nx): | |
| xs[i] = x[i] | |
| ys[i] = ys[i - 1] + tstep * w[yci, i] | |
| lines.append((xs, ys)) | |
| return lines | |