Spaces:
Sleeping
Sleeping
| """HRRR-on-AWS profile fetcher. | |
| Pulls a vertical profile of wind and temperature out of the HRRR analysis | |
| (0-h forecast) on the NODD public bucket ``noaa-hrrr-bdp-pds`` and returns | |
| it as (z, u_along, theta) arrays ready to drop into the profile editor. | |
| Design notes | |
| ------------ | |
| The full ``wrfprsf00.grib2`` is ~140 MB. We only need a handful of fields | |
| (HGT, TMP, UGRD, VGRD on pressure levels) at a single grid cell, so we: | |
| 1. Fetch the ``.idx`` sidecar (a few kB) to find byte offsets. | |
| 2. Issue ranged GETs for just the GRIB messages we need (few MB total). | |
| 3. Splice them together into a local scratch file. | |
| 4. Open with cfgrib, pick the nearest grid column, and compute θ and the | |
| user-specified flow-direction wind component. | |
| Dependencies: ``boto3``, ``xarray``, ``cfgrib``. cfgrib needs the eccodes | |
| C library; we pull it in from PyPI only (via ``eccodes`` + ``eccodeslib`` | |
| on macOS/Linux, or ``eccodes`` + ``ecmwflibs`` on Windows) rather than a | |
| system package, so ``uv sync`` is enough and no ``brew install eccodes`` | |
| step is required. See ``pyproject.toml`` for the platform markers. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import os | |
| import tempfile | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import List, Tuple | |
| import numpy as np | |
| BUCKET = "noaa-hrrr-bdp-pds" | |
| R_OVER_CP = 0.2854 # R_d / c_p for dry air | |
| # Variables and level type we care about. | |
| _WANTED_VARS = ("HGT", "TMP", "UGRD", "VGRD") | |
| _LEVEL_SUFFIX = "mb" | |
| class _IdxRecord: | |
| num: int | |
| start: int | |
| var: str | |
| level_mb: float | |
| def _parse_yyyymmddhh(s: str) -> datetime: | |
| s = (s or "").strip() | |
| if len(s) != 10 or not s.isdigit(): | |
| raise ValueError( | |
| f"Expected YYYYMMDDHH (10 digits); got {s!r}. " | |
| f"Example: 2024060112" | |
| ) | |
| return datetime.strptime(s, "%Y%m%d%H") | |
| def _s3_key(dt: datetime) -> str: | |
| return f"hrrr.{dt:%Y%m%d}/conus/hrrr.t{dt:%H}z.wrfprsf00.grib2" | |
| def _unsigned_s3_client(): | |
| """boto3 client configured for anonymous (unsigned) access.""" | |
| import boto3 | |
| from botocore import UNSIGNED | |
| from botocore.config import Config | |
| return boto3.client("s3", config=Config(signature_version=UNSIGNED)) | |
| def _parse_idx(idx_text: str) -> Tuple[List[_IdxRecord], List[int]]: | |
| """Parse the HRRR .idx file into (wanted_records, all_start_bytes). | |
| Format of each line (colon-separated): | |
| record_num:start_byte:d=YYYYMMDDHH:VAR:LEVEL:FCST:anl | |
| """ | |
| wanted: List[_IdxRecord] = [] | |
| all_starts: List[int] = [] | |
| for line in idx_text.splitlines(): | |
| parts = line.split(":") | |
| if len(parts) < 6 or not parts[1].isdigit(): | |
| continue | |
| start = int(parts[1]) | |
| all_starts.append(start) | |
| var = parts[3] | |
| level = parts[4].strip() | |
| if var not in _WANTED_VARS or not level.endswith(_LEVEL_SUFFIX): | |
| continue | |
| try: | |
| level_mb = float(level.split()[0]) | |
| except ValueError: | |
| continue | |
| wanted.append(_IdxRecord(num=int(parts[0]), start=start, var=var, level_mb=level_mb)) | |
| return wanted, sorted(all_starts) | |
| def _byte_ranges(records: List[_IdxRecord], all_starts: List[int]) -> List[Tuple[int, int]]: | |
| """Convert each wanted record's start byte to a (start, end) range. | |
| End byte is the next record's start minus one, or open-ended for the | |
| final record. We collapse adjacent ranges into contiguous chunks to | |
| cut down on the number of HTTP calls. | |
| """ | |
| ranges: List[Tuple[int, int]] = [] | |
| for r in records: | |
| i = all_starts.index(r.start) | |
| if i + 1 < len(all_starts): | |
| end = all_starts[i + 1] - 1 | |
| else: | |
| end = -1 # open-ended | |
| ranges.append((r.start, end)) | |
| # Merge contiguous ranges to reduce request count. | |
| ranges.sort() | |
| merged: List[Tuple[int, int]] = [] | |
| for start, end in ranges: | |
| if merged and end != -1 and merged[-1][1] != -1 and start == merged[-1][1] + 1: | |
| merged[-1] = (merged[-1][0], end) | |
| else: | |
| merged.append((start, end)) | |
| return merged | |
| def _download_subset(client, key: str, ranges: List[Tuple[int, int]]) -> bytes: | |
| buf = io.BytesIO() | |
| for start, end in ranges: | |
| header = f"bytes={start}-" + ("" if end == -1 else str(end)) | |
| obj = client.get_object(Bucket=BUCKET, Key=key, Range=header) | |
| buf.write(obj["Body"].read()) | |
| return buf.getvalue() | |
| def _nearest_ij(lats: np.ndarray, lons: np.ndarray, lat0: float, lon0: float) -> Tuple[int, int]: | |
| """Nearest-neighbor grid cell to (lat0, lon0) on a 2-D HRRR grid.""" | |
| lon0 = ((lon0 + 180) % 360) - 180 | |
| # Convert HRRR lons to -180..180 as well. | |
| lons = ((lons + 180) % 360) - 180 | |
| d2 = (lats - lat0) ** 2 + (lons - lon0) ** 2 | |
| j, i = np.unravel_index(np.argmin(d2), d2.shape) | |
| return int(j), int(i) | |
| def along_flow_signed( | |
| u: np.ndarray, v: np.ndarray, flow_from_deg: float | |
| ) -> np.ndarray: | |
| """Signed along-flow wind component for the mountain-wave solver. | |
| ``flow_from_deg`` follows the standard meteorological convention — the | |
| azimuth the wind is blowing *from* (270° = westerly, 160° = from the SSE). | |
| The returned scalar is the signed component of the wind parallel to | |
| that "from" direction: **positive** when the wind is blowing *from* a | |
| direction within 90° of ``flow_from_deg``, and **negative** when the | |
| wind reverses relative to that reference direction. Callers that | |
| depended on the old zero-clipped behavior should take | |
| ``np.maximum(along_flow_signed(...), 0.0)`` explicitly; the solver | |
| itself now tolerates negative U via the Scorer-parameter critical-level | |
| clamp, so wind reversals aloft should pass through unmodified and be | |
| surfaced to the user as actual reversals. | |
| Derivation: a wind with components ``(u, v)`` (east- and north-positive) | |
| blowing *from* azimuth ``φ_act`` has magnitude ``s`` and | |
| ``(u, v) = -s · (sin φ_act, cos φ_act)``. Projecting onto the unit | |
| vector pointing in the direction the flow is going when it comes from | |
| ``φ_spec`` (i.e. ``φ_spec + 180``) gives ``s · cos(φ_act − φ_spec)``. | |
| That evaluates to ``-(u sin φ_spec + v cos φ_spec)``, positive when | |
| actual and specified "from" directions are parallel and negative when | |
| antiparallel. No clamping is applied. | |
| """ | |
| rad = np.deg2rad(flow_from_deg) | |
| return -(np.asarray(u) * np.sin(rad) + np.asarray(v) * np.cos(rad)) | |
| # Backward-compatibility alias — the old name is retained so external | |
| # imports keep working, but it now returns the *signed* along-flow | |
| # component (no zero-clip). Callers that genuinely need the clipped | |
| # variant must apply ``np.maximum(_, 0.0)`` themselves. | |
| along_flow_positive = along_flow_signed | |
| def _theta(T_K: np.ndarray, p_hpa: np.ndarray) -> np.ndarray: | |
| """Potential temperature (K) referenced to 1000 hPa.""" | |
| return T_K * (1000.0 / p_hpa) ** R_OVER_CP | |
| def fetch_profile( | |
| lat: float, | |
| lon: float, | |
| yyyymmddhh: str, | |
| z_target_m: np.ndarray | None = None, | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, dict]: | |
| """Return ``(z, u, v, theta, meta)`` for the HRRR column at (lat, lon). | |
| Raw east/north wind components are returned (not the along-flow | |
| projection) so the caller can re-project onto any user-chosen flow | |
| direction cheaply without re-downloading. Use :func:`along_flow_signed` | |
| to turn ``(u, v, flow_from_deg)`` into the mountain-wave input; wind | |
| reversals produce negative values, which the solver handles via the | |
| Scorer critical-level clamp rather than silently clipping to zero. | |
| Parameters | |
| ---------- | |
| lat, lon : float | |
| Point of interest in decimal degrees. ``lon`` may be ±180 or 0..360. | |
| yyyymmddhh : str | |
| UTC cycle time, e.g. ``"2024060112"``. | |
| z_target_m : np.ndarray, optional | |
| If given, the profile is linearly interpolated onto these heights | |
| (meters above ground level). If ``None``, the native HRRR | |
| pressure-level heights (AGL) are returned as ``z``. | |
| Returns | |
| ------- | |
| z : np.ndarray | |
| Heights in m AGL. | |
| u, v : np.ndarray | |
| East- and north-positive wind components in m/s. | |
| theta : np.ndarray | |
| Potential temperature in K. | |
| meta : dict | |
| Diagnostic info (nearest grid lat/lon, S3 key, bytes transferred). | |
| """ | |
| try: | |
| import xarray as xr # noqa: F401 (used via cfgrib backend) | |
| except ImportError as exc: # pragma: no cover | |
| raise RuntimeError( | |
| "xarray is required to read HRRR GRIB2. " | |
| "Install with: pip install xarray cfgrib" | |
| ) from exc | |
| dt = _parse_yyyymmddhh(yyyymmddhh) | |
| key = _s3_key(dt) | |
| client = _unsigned_s3_client() | |
| # 1. Fetch the .idx sidecar. | |
| try: | |
| idx_obj = client.get_object(Bucket=BUCKET, Key=key + ".idx") | |
| except Exception as exc: | |
| raise RuntimeError( | |
| f"HRRR .idx not found at s3://{BUCKET}/{key}.idx " | |
| f"(cycle may not exist yet). Error: {exc}" | |
| ) from exc | |
| idx_text = idx_obj["Body"].read().decode("utf-8", errors="replace") | |
| records, all_starts = _parse_idx(idx_text) | |
| if not records: | |
| raise RuntimeError(f"No HGT/TMP/UGRD/VGRD pressure-level records in idx for {key}") | |
| # 2. Byte-range fetch just the records we need. | |
| ranges = _byte_ranges(records, all_starts) | |
| blob = _download_subset(client, key, ranges) | |
| bytes_downloaded = len(blob) | |
| # 3. Splice into a scratch file and open with cfgrib. | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".grib2") as f: | |
| f.write(blob) | |
| grib_path = f.name | |
| try: | |
| import xarray as xr | |
| ds = xr.open_dataset( | |
| grib_path, | |
| engine="cfgrib", | |
| backend_kwargs={ | |
| "indexpath": "", # don't leave .idx files around | |
| "filter_by_keys": {"typeOfLevel": "isobaricInhPa"}, | |
| }, | |
| ) | |
| # cfgrib exposes variables as {'gh' or 'HGT', 't', 'u', 'v'} depending on | |
| # shortName/cfName. Find them robustly. | |
| def _pick(ds, candidates): | |
| for name in candidates: | |
| if name in ds.variables: | |
| return ds[name] | |
| raise KeyError(f"None of {candidates} found in dataset: {list(ds.variables)}") | |
| # cfgrib reads data lazily — it keeps the grib file open and re-reads | |
| # when .values is touched. We delete the scratch file below, so we | |
| # must materialize every array we care about *before* the unlink. | |
| hgt_a = _pick(ds, ["gh", "HGT", "h"]).values | |
| tmp_a = _pick(ds, ["t", "TMP"]).values | |
| ugrd_a = _pick(ds, ["u", "UGRD"]).values | |
| vgrd_a = _pick(ds, ["v", "VGRD"]).values | |
| p_dim = "isobaricInhPa" | |
| pressures_full = ds[p_dim].values.astype(float) # hPa | |
| lats_full = ds["latitude"].values | |
| lons_full = ds["longitude"].values | |
| ds.close() | |
| finally: | |
| try: | |
| os.unlink(grib_path) | |
| except OSError: | |
| pass | |
| # 4. Pick the nearest column. | |
| j, i = _nearest_ij(lats_full, lons_full, lat, lon) | |
| grid_lat = float(lats_full[j, i]) | |
| grid_lon = float(((lons_full[j, i] + 180) % 360) - 180) | |
| pressures = pressures_full | |
| h_col = hgt_a[:, j, i] # geopotential height, m (MSL) | |
| t_col = tmp_a[:, j, i] # K | |
| u_col = ugrd_a[:, j, i] | |
| v_col = vgrd_a[:, j, i] | |
| # Sort by pressure descending (so surface first, top last). | |
| order = np.argsort(-pressures) | |
| pressures = pressures[order] | |
| h_col = h_col[order] | |
| t_col = t_col[order] | |
| u_col = u_col[order] | |
| v_col = v_col[order] | |
| # Keep only levels at or above ground (HRRR pressure levels below the | |
| # surface are filled with extrapolated values — drop those by requiring | |
| # monotonic height increase from the surface up). | |
| sfc_h = float(np.min(h_col)) | |
| valid = h_col >= sfc_h - 1.0 | |
| h_col = h_col[valid] | |
| t_col = t_col[valid] | |
| u_col = u_col[valid] | |
| v_col = v_col[valid] | |
| pressures = pressures[valid] | |
| # Make strictly monotonic increasing in height (in case of ties). | |
| order = np.argsort(h_col) | |
| h_col = h_col[order] | |
| t_col = t_col[order] | |
| u_col = u_col[order] | |
| v_col = v_col[order] | |
| pressures = pressures[order] | |
| # Convert MSL heights to AGL by subtracting the lowest valid level. | |
| z_agl = h_col - h_col[0] | |
| theta = _theta(t_col, pressures) | |
| if z_target_m is not None: | |
| z_target = np.asarray(z_target_m, dtype=float) | |
| # Clip target range to what HRRR actually covers at this point. | |
| z_clipped = np.clip(z_target, float(z_agl[0]), float(z_agl[-1])) | |
| u_out = np.interp(z_clipped, z_agl, u_col) | |
| v_out = np.interp(z_clipped, z_agl, v_col) | |
| th_out = np.interp(z_clipped, z_agl, theta) | |
| z_out = z_target.copy() | |
| else: | |
| z_out = z_agl | |
| u_out = u_col | |
| v_out = v_col | |
| th_out = theta | |
| meta = { | |
| "s3_key": key, | |
| "grid_lat": grid_lat, | |
| "grid_lon": grid_lon, | |
| "bytes": bytes_downloaded, | |
| "n_levels": int(z_agl.size), | |
| "sfc_height_msl": float(h_col[0]), | |
| } | |
| return ( | |
| z_out.astype(float), | |
| u_out.astype(float), | |
| v_out.astype(float), | |
| th_out.astype(float), | |
| meta, | |
| ) | |