Spaces:
Sleeping
Sleeping
| """Updraft/downdraft field prescription from a 1-D parcel model.""" | |
| from __future__ import annotations | |
| import numpy as np | |
| from scipy.interpolate import CubicSpline | |
| from .sounding import G, lift_parcel | |
| def diagnose_w_profile( | |
| z: np.ndarray, | |
| T_env: np.ndarray, | |
| qv_env: np.ndarray, | |
| p_hPa: np.ndarray, | |
| delta_T_K: float = 2.0, | |
| ) -> dict: | |
| """Diagnose the updraft vertical velocity profile from a 1-D parcel model. | |
| The parcel is initialized with a surface temperature excess of ``delta_T_K`` | |
| K above the environment. Vertical velocity is computed from the cumulative | |
| integral of buoyancy: | |
| w²(z) = 2 · ∫₀ᶻ B(z') dz' (zero w² is floor-clipped) | |
| Above the EL the buoyancy is negative; w decelerates to zero at the | |
| overshoot top z_top. | |
| Returns dict: w_z, B_z, LCL_m, LFC_m, EL_m, CAPE, CIN, z_top_m. | |
| """ | |
| parcel = lift_parcel(z, T_env, qv_env, p_hPa, delta_T_K) | |
| B = parcel["B"] | |
| EL_m = parcel["EL_m"] | |
| # Cumulative KE: w² = 2 · ∫₀ᶻ B dz | |
| # scipy.integrate.cumulative_trapezoid returns N-1 values; prepend 0 | |
| from scipy.integrate import cumulative_trapezoid as cumtrapz | |
| ke2 = np.empty_like(z) | |
| ke2[0] = 0.0 | |
| ke2[1:] = 2.0 * cumtrapz(B, z) | |
| ke2 = np.maximum(ke2, 0.0) | |
| w_z = np.sqrt(ke2) | |
| # Above EL: continue the integration with negative B until w → 0 | |
| el_idx = int(np.searchsorted(z, EL_m)) | |
| w_el_sq = ke2[el_idx] if el_idx < len(z) else 0.0 | |
| z_top_m = EL_m | |
| if el_idx < len(z) - 1: | |
| w_sq_above = w_el_sq + 2.0 * cumtrapz(B[el_idx:], z[el_idx:], initial=0.0) | |
| w_sq_above = np.maximum(w_sq_above, 0.0) | |
| zero_cross = np.where(w_sq_above <= 0.0)[0] | |
| if len(zero_cross) > 0: | |
| first_zero = el_idx + zero_cross[0] | |
| z_top_m = float(z[first_zero]) | |
| w_z[first_zero:] = 0.0 | |
| else: | |
| # Overshoot extends beyond domain top; continue deceleration profile | |
| w_z[el_idx:] = np.sqrt(w_sq_above) | |
| z_top_m = float(z[-1]) | |
| return { | |
| "w_z": w_z, | |
| "B_z": B, | |
| "LCL_m": parcel["LCL_m"], | |
| "LFC_m": parcel["LFC_m"], | |
| "EL_m": EL_m, | |
| "CAPE": parcel["CAPE"], | |
| "CIN": parcel["CIN"], | |
| "z_top_m": z_top_m, | |
| "T_parcel": parcel["T_parcel"], | |
| } | |
| def tophat_profile(r: np.ndarray, r0: float, rolloff_frac: float = 0.10) -> np.ndarray: | |
| """Radial shape: uniform core, cosine taper in outer ``rolloff_frac`` fraction.""" | |
| r = np.asarray(r, dtype=float) | |
| r_inner = r0 * (1.0 - rolloff_frac) | |
| out = np.where(r <= r_inner, 1.0, 0.0) | |
| taper_mask = (r > r_inner) & (r <= r0) | |
| out = np.where( | |
| taper_mask, | |
| 0.5 * (1.0 + np.cos(np.pi * (r - r_inner) / (r0 - r_inner))), | |
| out, | |
| ) | |
| return out | |
| def cosine_profile(r: np.ndarray, r0: float) -> np.ndarray: | |
| """Radial shape: cosine bell w(r) = cos(π r / 2r₀) for r < r₀.""" | |
| r = np.asarray(r, dtype=float) | |
| return np.where(r < r0, np.cos(0.5 * np.pi * r / r0), 0.0) | |
| def build_updraft_fields( | |
| X: np.ndarray, | |
| Y: np.ndarray, | |
| z: np.ndarray, | |
| r0: float, | |
| shape: str, | |
| w_z: np.ndarray, | |
| zeta_cpts: np.ndarray, | |
| zeta_z_km: np.ndarray, | |
| env_u: np.ndarray, | |
| env_v: np.ndarray, | |
| theta_env: np.ndarray, | |
| theta_parcel: np.ndarray, | |
| ) -> dict: | |
| """Construct 3-D updraft fields on the (Nx, Ny, Nz) grid. | |
| Parameters | |
| ---------- | |
| X, Y : (Nx, Ny) horizontal coordinate arrays (meters) | |
| z : (Nz,) height array (meters) | |
| r0 : updraft radius (m) | |
| shape : 'tophat' or 'cosine' | |
| w_z : (Nz,) diagnosed vertical velocity profile (m/s) | |
| zeta_cpts : (Ncpts,) prescribed vorticity values (s⁻¹) representing | |
| accumulated rotation from tilting/stretching in a mature storm | |
| zeta_z_km : (Ncpts,) heights of control points (km) | |
| env_u, env_v : (Nz,) environmental wind components (m/s) | |
| theta_env, theta_parcel : (Nz,) potential temperature arrays (K) | |
| Returns dict of 3-D arrays: w3d, u3d, v3d, zeta3d, theta_prime3d. | |
| """ | |
| Nx, Ny = X.shape | |
| Nz = len(z) | |
| # Radial distance from domain center | |
| xc = X.mean() | |
| yc = Y.mean() | |
| R = np.sqrt((X - xc) ** 2 + (Y - yc) ** 2) # (Nx, Ny) | |
| # Radial shape function | |
| if shape == "tophat": | |
| shape_fn = tophat_profile(R, r0) | |
| else: | |
| shape_fn = cosine_profile(R, r0) | |
| # Interpolate prescribed vorticity control points to full z grid | |
| if len(zeta_cpts) >= 2: | |
| z_cpts_m = np.asarray(zeta_z_km) * 1000.0 | |
| cs = CubicSpline(z_cpts_m, np.asarray(zeta_cpts), extrapolate=True) | |
| zeta_z = cs(z) | |
| else: | |
| zeta_z = np.zeros(Nz) | |
| # Build 3-D fields | |
| w3d = np.empty((Nx, Ny, Nz)) | |
| u3d = np.empty((Nx, Ny, Nz)) | |
| v3d = np.empty((Nx, Ny, Nz)) | |
| zeta3d = np.zeros((Nx, Ny, Nz)) | |
| theta_prime3d = np.zeros((Nx, Ny, Nz)) | |
| # Azimuthal angle from center | |
| phi = np.arctan2(Y - yc, X - xc) # (Nx, Ny) | |
| for k in range(Nz): | |
| w3d[:, :, k] = shape_fn * w_z[k] | |
| # Solid-body rotation: v_θ = ζ(z) * r / 2 | |
| v_theta = zeta_z[k] * R / 2.0 | |
| u_core = -v_theta * np.sin(phi) | |
| v_core = v_theta * np.cos(phi) | |
| # Apply radial taper to the rotational wind too | |
| u3d[:, :, k] = env_u[k] + shape_fn * u_core | |
| v3d[:, :, k] = env_v[k] + shape_fn * v_core | |
| # Vertical vorticity within core: ζ_z ≈ shape_fn * zeta_z[k] | |
| zeta3d[:, :, k] = shape_fn * zeta_z[k] | |
| # Potential temperature perturbation within core | |
| theta_prime3d[:, :, k] = shape_fn * (theta_parcel[k] - theta_env[k]) | |
| return { | |
| "w3d": w3d, | |
| "u3d": u3d, | |
| "v3d": v3d, | |
| "zeta3d": zeta3d, | |
| "theta_prime3d": theta_prime3d, | |
| } | |