snesbitt's picture
Mountain Waves — deploy to Hugging Face Space
7c3bfa9
Raw
History Blame Contribute Delete
13.3 kB
"""HRRR-on-AWS profile fetcher.
Pulls a vertical profile of wind and temperature out of the HRRR analysis
(0-h forecast) on the NODD public bucket ``noaa-hrrr-bdp-pds`` and returns
it as (z, u_along, theta) arrays ready to drop into the profile editor.
Design notes
------------
The full ``wrfprsf00.grib2`` is ~140 MB. We only need a handful of fields
(HGT, TMP, UGRD, VGRD on pressure levels) at a single grid cell, so we:
1. Fetch the ``.idx`` sidecar (a few kB) to find byte offsets.
2. Issue ranged GETs for just the GRIB messages we need (few MB total).
3. Splice them together into a local scratch file.
4. Open with cfgrib, pick the nearest grid column, and compute θ and the
user-specified flow-direction wind component.
Dependencies: ``boto3``, ``xarray``, ``cfgrib``. cfgrib needs the eccodes
C library; we pull it in from PyPI only (via ``eccodes`` + ``eccodeslib``
on macOS/Linux, or ``eccodes`` + ``ecmwflibs`` on Windows) rather than a
system package, so ``uv sync`` is enough and no ``brew install eccodes``
step is required. See ``pyproject.toml`` for the platform markers.
"""
from __future__ import annotations
import io
import os
import tempfile
from dataclasses import dataclass
from datetime import datetime
from typing import List, Tuple
import numpy as np
BUCKET = "noaa-hrrr-bdp-pds"
R_OVER_CP = 0.2854 # R_d / c_p for dry air
# Variables and level type we care about.
_WANTED_VARS = ("HGT", "TMP", "UGRD", "VGRD")
_LEVEL_SUFFIX = "mb"
@dataclass
class _IdxRecord:
num: int
start: int
var: str
level_mb: float
def _parse_yyyymmddhh(s: str) -> datetime:
s = (s or "").strip()
if len(s) != 10 or not s.isdigit():
raise ValueError(
f"Expected YYYYMMDDHH (10 digits); got {s!r}. "
f"Example: 2024060112"
)
return datetime.strptime(s, "%Y%m%d%H")
def _s3_key(dt: datetime) -> str:
return f"hrrr.{dt:%Y%m%d}/conus/hrrr.t{dt:%H}z.wrfprsf00.grib2"
def _unsigned_s3_client():
"""boto3 client configured for anonymous (unsigned) access."""
import boto3
from botocore import UNSIGNED
from botocore.config import Config
return boto3.client("s3", config=Config(signature_version=UNSIGNED))
def _parse_idx(idx_text: str) -> Tuple[List[_IdxRecord], List[int]]:
"""Parse the HRRR .idx file into (wanted_records, all_start_bytes).
Format of each line (colon-separated):
record_num:start_byte:d=YYYYMMDDHH:VAR:LEVEL:FCST:anl
"""
wanted: List[_IdxRecord] = []
all_starts: List[int] = []
for line in idx_text.splitlines():
parts = line.split(":")
if len(parts) < 6 or not parts[1].isdigit():
continue
start = int(parts[1])
all_starts.append(start)
var = parts[3]
level = parts[4].strip()
if var not in _WANTED_VARS or not level.endswith(_LEVEL_SUFFIX):
continue
try:
level_mb = float(level.split()[0])
except ValueError:
continue
wanted.append(_IdxRecord(num=int(parts[0]), start=start, var=var, level_mb=level_mb))
return wanted, sorted(all_starts)
def _byte_ranges(records: List[_IdxRecord], all_starts: List[int]) -> List[Tuple[int, int]]:
"""Convert each wanted record's start byte to a (start, end) range.
End byte is the next record's start minus one, or open-ended for the
final record. We collapse adjacent ranges into contiguous chunks to
cut down on the number of HTTP calls.
"""
ranges: List[Tuple[int, int]] = []
for r in records:
i = all_starts.index(r.start)
if i + 1 < len(all_starts):
end = all_starts[i + 1] - 1
else:
end = -1 # open-ended
ranges.append((r.start, end))
# Merge contiguous ranges to reduce request count.
ranges.sort()
merged: List[Tuple[int, int]] = []
for start, end in ranges:
if merged and end != -1 and merged[-1][1] != -1 and start == merged[-1][1] + 1:
merged[-1] = (merged[-1][0], end)
else:
merged.append((start, end))
return merged
def _download_subset(client, key: str, ranges: List[Tuple[int, int]]) -> bytes:
buf = io.BytesIO()
for start, end in ranges:
header = f"bytes={start}-" + ("" if end == -1 else str(end))
obj = client.get_object(Bucket=BUCKET, Key=key, Range=header)
buf.write(obj["Body"].read())
return buf.getvalue()
def _nearest_ij(lats: np.ndarray, lons: np.ndarray, lat0: float, lon0: float) -> Tuple[int, int]:
"""Nearest-neighbor grid cell to (lat0, lon0) on a 2-D HRRR grid."""
lon0 = ((lon0 + 180) % 360) - 180
# Convert HRRR lons to -180..180 as well.
lons = ((lons + 180) % 360) - 180
d2 = (lats - lat0) ** 2 + (lons - lon0) ** 2
j, i = np.unravel_index(np.argmin(d2), d2.shape)
return int(j), int(i)
def along_flow_signed(
u: np.ndarray, v: np.ndarray, flow_from_deg: float
) -> np.ndarray:
"""Signed along-flow wind component for the mountain-wave solver.
``flow_from_deg`` follows the standard meteorological convention — the
azimuth the wind is blowing *from* (270° = westerly, 160° = from the SSE).
The returned scalar is the signed component of the wind parallel to
that "from" direction: **positive** when the wind is blowing *from* a
direction within 90° of ``flow_from_deg``, and **negative** when the
wind reverses relative to that reference direction. Callers that
depended on the old zero-clipped behavior should take
``np.maximum(along_flow_signed(...), 0.0)`` explicitly; the solver
itself now tolerates negative U via the Scorer-parameter critical-level
clamp, so wind reversals aloft should pass through unmodified and be
surfaced to the user as actual reversals.
Derivation: a wind with components ``(u, v)`` (east- and north-positive)
blowing *from* azimuth ``φ_act`` has magnitude ``s`` and
``(u, v) = -s · (sin φ_act, cos φ_act)``. Projecting onto the unit
vector pointing in the direction the flow is going when it comes from
``φ_spec`` (i.e. ``φ_spec + 180``) gives ``s · cos(φ_act − φ_spec)``.
That evaluates to ``-(u sin φ_spec + v cos φ_spec)``, positive when
actual and specified "from" directions are parallel and negative when
antiparallel. No clamping is applied.
"""
rad = np.deg2rad(flow_from_deg)
return -(np.asarray(u) * np.sin(rad) + np.asarray(v) * np.cos(rad))
# Backward-compatibility alias — the old name is retained so external
# imports keep working, but it now returns the *signed* along-flow
# component (no zero-clip). Callers that genuinely need the clipped
# variant must apply ``np.maximum(_, 0.0)`` themselves.
along_flow_positive = along_flow_signed
def _theta(T_K: np.ndarray, p_hpa: np.ndarray) -> np.ndarray:
"""Potential temperature (K) referenced to 1000 hPa."""
return T_K * (1000.0 / p_hpa) ** R_OVER_CP
def fetch_profile(
lat: float,
lon: float,
yyyymmddhh: str,
z_target_m: np.ndarray | None = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, dict]:
"""Return ``(z, u, v, theta, meta)`` for the HRRR column at (lat, lon).
Raw east/north wind components are returned (not the along-flow
projection) so the caller can re-project onto any user-chosen flow
direction cheaply without re-downloading. Use :func:`along_flow_signed`
to turn ``(u, v, flow_from_deg)`` into the mountain-wave input; wind
reversals produce negative values, which the solver handles via the
Scorer critical-level clamp rather than silently clipping to zero.
Parameters
----------
lat, lon : float
Point of interest in decimal degrees. ``lon`` may be ±180 or 0..360.
yyyymmddhh : str
UTC cycle time, e.g. ``"2024060112"``.
z_target_m : np.ndarray, optional
If given, the profile is linearly interpolated onto these heights
(meters above ground level). If ``None``, the native HRRR
pressure-level heights (AGL) are returned as ``z``.
Returns
-------
z : np.ndarray
Heights in m AGL.
u, v : np.ndarray
East- and north-positive wind components in m/s.
theta : np.ndarray
Potential temperature in K.
meta : dict
Diagnostic info (nearest grid lat/lon, S3 key, bytes transferred).
"""
try:
import xarray as xr # noqa: F401 (used via cfgrib backend)
except ImportError as exc: # pragma: no cover
raise RuntimeError(
"xarray is required to read HRRR GRIB2. "
"Install with: pip install xarray cfgrib"
) from exc
dt = _parse_yyyymmddhh(yyyymmddhh)
key = _s3_key(dt)
client = _unsigned_s3_client()
# 1. Fetch the .idx sidecar.
try:
idx_obj = client.get_object(Bucket=BUCKET, Key=key + ".idx")
except Exception as exc:
raise RuntimeError(
f"HRRR .idx not found at s3://{BUCKET}/{key}.idx "
f"(cycle may not exist yet). Error: {exc}"
) from exc
idx_text = idx_obj["Body"].read().decode("utf-8", errors="replace")
records, all_starts = _parse_idx(idx_text)
if not records:
raise RuntimeError(f"No HGT/TMP/UGRD/VGRD pressure-level records in idx for {key}")
# 2. Byte-range fetch just the records we need.
ranges = _byte_ranges(records, all_starts)
blob = _download_subset(client, key, ranges)
bytes_downloaded = len(blob)
# 3. Splice into a scratch file and open with cfgrib.
with tempfile.NamedTemporaryFile(delete=False, suffix=".grib2") as f:
f.write(blob)
grib_path = f.name
try:
import xarray as xr
ds = xr.open_dataset(
grib_path,
engine="cfgrib",
backend_kwargs={
"indexpath": "", # don't leave .idx files around
"filter_by_keys": {"typeOfLevel": "isobaricInhPa"},
},
)
# cfgrib exposes variables as {'gh' or 'HGT', 't', 'u', 'v'} depending on
# shortName/cfName. Find them robustly.
def _pick(ds, candidates):
for name in candidates:
if name in ds.variables:
return ds[name]
raise KeyError(f"None of {candidates} found in dataset: {list(ds.variables)}")
# cfgrib reads data lazily — it keeps the grib file open and re-reads
# when .values is touched. We delete the scratch file below, so we
# must materialize every array we care about *before* the unlink.
hgt_a = _pick(ds, ["gh", "HGT", "h"]).values
tmp_a = _pick(ds, ["t", "TMP"]).values
ugrd_a = _pick(ds, ["u", "UGRD"]).values
vgrd_a = _pick(ds, ["v", "VGRD"]).values
p_dim = "isobaricInhPa"
pressures_full = ds[p_dim].values.astype(float) # hPa
lats_full = ds["latitude"].values
lons_full = ds["longitude"].values
ds.close()
finally:
try:
os.unlink(grib_path)
except OSError:
pass
# 4. Pick the nearest column.
j, i = _nearest_ij(lats_full, lons_full, lat, lon)
grid_lat = float(lats_full[j, i])
grid_lon = float(((lons_full[j, i] + 180) % 360) - 180)
pressures = pressures_full
h_col = hgt_a[:, j, i] # geopotential height, m (MSL)
t_col = tmp_a[:, j, i] # K
u_col = ugrd_a[:, j, i]
v_col = vgrd_a[:, j, i]
# Sort by pressure descending (so surface first, top last).
order = np.argsort(-pressures)
pressures = pressures[order]
h_col = h_col[order]
t_col = t_col[order]
u_col = u_col[order]
v_col = v_col[order]
# Keep only levels at or above ground (HRRR pressure levels below the
# surface are filled with extrapolated values — drop those by requiring
# monotonic height increase from the surface up).
sfc_h = float(np.min(h_col))
valid = h_col >= sfc_h - 1.0
h_col = h_col[valid]
t_col = t_col[valid]
u_col = u_col[valid]
v_col = v_col[valid]
pressures = pressures[valid]
# Make strictly monotonic increasing in height (in case of ties).
order = np.argsort(h_col)
h_col = h_col[order]
t_col = t_col[order]
u_col = u_col[order]
v_col = v_col[order]
pressures = pressures[order]
# Convert MSL heights to AGL by subtracting the lowest valid level.
z_agl = h_col - h_col[0]
theta = _theta(t_col, pressures)
if z_target_m is not None:
z_target = np.asarray(z_target_m, dtype=float)
# Clip target range to what HRRR actually covers at this point.
z_clipped = np.clip(z_target, float(z_agl[0]), float(z_agl[-1]))
u_out = np.interp(z_clipped, z_agl, u_col)
v_out = np.interp(z_clipped, z_agl, v_col)
th_out = np.interp(z_clipped, z_agl, theta)
z_out = z_target.copy()
else:
z_out = z_agl
u_out = u_col
v_out = v_col
th_out = theta
meta = {
"s3_key": key,
"grid_lat": grid_lat,
"grid_lon": grid_lon,
"bytes": bytes_downloaded,
"n_levels": int(z_agl.size),
"sfc_height_msl": float(h_col[0]),
}
return (
z_out.astype(float),
u_out.astype(float),
v_out.astype(float),
th_out.astype(float),
meta,
)