Spaces:
Sleeping
Sleeping
File size: 13,347 Bytes
7c3bfa9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 | """HRRR-on-AWS profile fetcher.
Pulls a vertical profile of wind and temperature out of the HRRR analysis
(0-h forecast) on the NODD public bucket ``noaa-hrrr-bdp-pds`` and returns
it as (z, u_along, theta) arrays ready to drop into the profile editor.
Design notes
------------
The full ``wrfprsf00.grib2`` is ~140 MB. We only need a handful of fields
(HGT, TMP, UGRD, VGRD on pressure levels) at a single grid cell, so we:
1. Fetch the ``.idx`` sidecar (a few kB) to find byte offsets.
2. Issue ranged GETs for just the GRIB messages we need (few MB total).
3. Splice them together into a local scratch file.
4. Open with cfgrib, pick the nearest grid column, and compute θ and the
user-specified flow-direction wind component.
Dependencies: ``boto3``, ``xarray``, ``cfgrib``. cfgrib needs the eccodes
C library; we pull it in from PyPI only (via ``eccodes`` + ``eccodeslib``
on macOS/Linux, or ``eccodes`` + ``ecmwflibs`` on Windows) rather than a
system package, so ``uv sync`` is enough and no ``brew install eccodes``
step is required. See ``pyproject.toml`` for the platform markers.
"""
from __future__ import annotations
import io
import os
import tempfile
from dataclasses import dataclass
from datetime import datetime
from typing import List, Tuple
import numpy as np
BUCKET = "noaa-hrrr-bdp-pds"
R_OVER_CP = 0.2854 # R_d / c_p for dry air
# Variables and level type we care about.
_WANTED_VARS = ("HGT", "TMP", "UGRD", "VGRD")
_LEVEL_SUFFIX = "mb"
@dataclass
class _IdxRecord:
num: int
start: int
var: str
level_mb: float
def _parse_yyyymmddhh(s: str) -> datetime:
s = (s or "").strip()
if len(s) != 10 or not s.isdigit():
raise ValueError(
f"Expected YYYYMMDDHH (10 digits); got {s!r}. "
f"Example: 2024060112"
)
return datetime.strptime(s, "%Y%m%d%H")
def _s3_key(dt: datetime) -> str:
return f"hrrr.{dt:%Y%m%d}/conus/hrrr.t{dt:%H}z.wrfprsf00.grib2"
def _unsigned_s3_client():
"""boto3 client configured for anonymous (unsigned) access."""
import boto3
from botocore import UNSIGNED
from botocore.config import Config
return boto3.client("s3", config=Config(signature_version=UNSIGNED))
def _parse_idx(idx_text: str) -> Tuple[List[_IdxRecord], List[int]]:
"""Parse the HRRR .idx file into (wanted_records, all_start_bytes).
Format of each line (colon-separated):
record_num:start_byte:d=YYYYMMDDHH:VAR:LEVEL:FCST:anl
"""
wanted: List[_IdxRecord] = []
all_starts: List[int] = []
for line in idx_text.splitlines():
parts = line.split(":")
if len(parts) < 6 or not parts[1].isdigit():
continue
start = int(parts[1])
all_starts.append(start)
var = parts[3]
level = parts[4].strip()
if var not in _WANTED_VARS or not level.endswith(_LEVEL_SUFFIX):
continue
try:
level_mb = float(level.split()[0])
except ValueError:
continue
wanted.append(_IdxRecord(num=int(parts[0]), start=start, var=var, level_mb=level_mb))
return wanted, sorted(all_starts)
def _byte_ranges(records: List[_IdxRecord], all_starts: List[int]) -> List[Tuple[int, int]]:
"""Convert each wanted record's start byte to a (start, end) range.
End byte is the next record's start minus one, or open-ended for the
final record. We collapse adjacent ranges into contiguous chunks to
cut down on the number of HTTP calls.
"""
ranges: List[Tuple[int, int]] = []
for r in records:
i = all_starts.index(r.start)
if i + 1 < len(all_starts):
end = all_starts[i + 1] - 1
else:
end = -1 # open-ended
ranges.append((r.start, end))
# Merge contiguous ranges to reduce request count.
ranges.sort()
merged: List[Tuple[int, int]] = []
for start, end in ranges:
if merged and end != -1 and merged[-1][1] != -1 and start == merged[-1][1] + 1:
merged[-1] = (merged[-1][0], end)
else:
merged.append((start, end))
return merged
def _download_subset(client, key: str, ranges: List[Tuple[int, int]]) -> bytes:
buf = io.BytesIO()
for start, end in ranges:
header = f"bytes={start}-" + ("" if end == -1 else str(end))
obj = client.get_object(Bucket=BUCKET, Key=key, Range=header)
buf.write(obj["Body"].read())
return buf.getvalue()
def _nearest_ij(lats: np.ndarray, lons: np.ndarray, lat0: float, lon0: float) -> Tuple[int, int]:
"""Nearest-neighbor grid cell to (lat0, lon0) on a 2-D HRRR grid."""
lon0 = ((lon0 + 180) % 360) - 180
# Convert HRRR lons to -180..180 as well.
lons = ((lons + 180) % 360) - 180
d2 = (lats - lat0) ** 2 + (lons - lon0) ** 2
j, i = np.unravel_index(np.argmin(d2), d2.shape)
return int(j), int(i)
def along_flow_signed(
u: np.ndarray, v: np.ndarray, flow_from_deg: float
) -> np.ndarray:
"""Signed along-flow wind component for the mountain-wave solver.
``flow_from_deg`` follows the standard meteorological convention — the
azimuth the wind is blowing *from* (270° = westerly, 160° = from the SSE).
The returned scalar is the signed component of the wind parallel to
that "from" direction: **positive** when the wind is blowing *from* a
direction within 90° of ``flow_from_deg``, and **negative** when the
wind reverses relative to that reference direction. Callers that
depended on the old zero-clipped behavior should take
``np.maximum(along_flow_signed(...), 0.0)`` explicitly; the solver
itself now tolerates negative U via the Scorer-parameter critical-level
clamp, so wind reversals aloft should pass through unmodified and be
surfaced to the user as actual reversals.
Derivation: a wind with components ``(u, v)`` (east- and north-positive)
blowing *from* azimuth ``φ_act`` has magnitude ``s`` and
``(u, v) = -s · (sin φ_act, cos φ_act)``. Projecting onto the unit
vector pointing in the direction the flow is going when it comes from
``φ_spec`` (i.e. ``φ_spec + 180``) gives ``s · cos(φ_act − φ_spec)``.
That evaluates to ``-(u sin φ_spec + v cos φ_spec)``, positive when
actual and specified "from" directions are parallel and negative when
antiparallel. No clamping is applied.
"""
rad = np.deg2rad(flow_from_deg)
return -(np.asarray(u) * np.sin(rad) + np.asarray(v) * np.cos(rad))
# Backward-compatibility alias — the old name is retained so external
# imports keep working, but it now returns the *signed* along-flow
# component (no zero-clip). Callers that genuinely need the clipped
# variant must apply ``np.maximum(_, 0.0)`` themselves.
along_flow_positive = along_flow_signed
def _theta(T_K: np.ndarray, p_hpa: np.ndarray) -> np.ndarray:
"""Potential temperature (K) referenced to 1000 hPa."""
return T_K * (1000.0 / p_hpa) ** R_OVER_CP
def fetch_profile(
lat: float,
lon: float,
yyyymmddhh: str,
z_target_m: np.ndarray | None = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, dict]:
"""Return ``(z, u, v, theta, meta)`` for the HRRR column at (lat, lon).
Raw east/north wind components are returned (not the along-flow
projection) so the caller can re-project onto any user-chosen flow
direction cheaply without re-downloading. Use :func:`along_flow_signed`
to turn ``(u, v, flow_from_deg)`` into the mountain-wave input; wind
reversals produce negative values, which the solver handles via the
Scorer critical-level clamp rather than silently clipping to zero.
Parameters
----------
lat, lon : float
Point of interest in decimal degrees. ``lon`` may be ±180 or 0..360.
yyyymmddhh : str
UTC cycle time, e.g. ``"2024060112"``.
z_target_m : np.ndarray, optional
If given, the profile is linearly interpolated onto these heights
(meters above ground level). If ``None``, the native HRRR
pressure-level heights (AGL) are returned as ``z``.
Returns
-------
z : np.ndarray
Heights in m AGL.
u, v : np.ndarray
East- and north-positive wind components in m/s.
theta : np.ndarray
Potential temperature in K.
meta : dict
Diagnostic info (nearest grid lat/lon, S3 key, bytes transferred).
"""
try:
import xarray as xr # noqa: F401 (used via cfgrib backend)
except ImportError as exc: # pragma: no cover
raise RuntimeError(
"xarray is required to read HRRR GRIB2. "
"Install with: pip install xarray cfgrib"
) from exc
dt = _parse_yyyymmddhh(yyyymmddhh)
key = _s3_key(dt)
client = _unsigned_s3_client()
# 1. Fetch the .idx sidecar.
try:
idx_obj = client.get_object(Bucket=BUCKET, Key=key + ".idx")
except Exception as exc:
raise RuntimeError(
f"HRRR .idx not found at s3://{BUCKET}/{key}.idx "
f"(cycle may not exist yet). Error: {exc}"
) from exc
idx_text = idx_obj["Body"].read().decode("utf-8", errors="replace")
records, all_starts = _parse_idx(idx_text)
if not records:
raise RuntimeError(f"No HGT/TMP/UGRD/VGRD pressure-level records in idx for {key}")
# 2. Byte-range fetch just the records we need.
ranges = _byte_ranges(records, all_starts)
blob = _download_subset(client, key, ranges)
bytes_downloaded = len(blob)
# 3. Splice into a scratch file and open with cfgrib.
with tempfile.NamedTemporaryFile(delete=False, suffix=".grib2") as f:
f.write(blob)
grib_path = f.name
try:
import xarray as xr
ds = xr.open_dataset(
grib_path,
engine="cfgrib",
backend_kwargs={
"indexpath": "", # don't leave .idx files around
"filter_by_keys": {"typeOfLevel": "isobaricInhPa"},
},
)
# cfgrib exposes variables as {'gh' or 'HGT', 't', 'u', 'v'} depending on
# shortName/cfName. Find them robustly.
def _pick(ds, candidates):
for name in candidates:
if name in ds.variables:
return ds[name]
raise KeyError(f"None of {candidates} found in dataset: {list(ds.variables)}")
# cfgrib reads data lazily — it keeps the grib file open and re-reads
# when .values is touched. We delete the scratch file below, so we
# must materialize every array we care about *before* the unlink.
hgt_a = _pick(ds, ["gh", "HGT", "h"]).values
tmp_a = _pick(ds, ["t", "TMP"]).values
ugrd_a = _pick(ds, ["u", "UGRD"]).values
vgrd_a = _pick(ds, ["v", "VGRD"]).values
p_dim = "isobaricInhPa"
pressures_full = ds[p_dim].values.astype(float) # hPa
lats_full = ds["latitude"].values
lons_full = ds["longitude"].values
ds.close()
finally:
try:
os.unlink(grib_path)
except OSError:
pass
# 4. Pick the nearest column.
j, i = _nearest_ij(lats_full, lons_full, lat, lon)
grid_lat = float(lats_full[j, i])
grid_lon = float(((lons_full[j, i] + 180) % 360) - 180)
pressures = pressures_full
h_col = hgt_a[:, j, i] # geopotential height, m (MSL)
t_col = tmp_a[:, j, i] # K
u_col = ugrd_a[:, j, i]
v_col = vgrd_a[:, j, i]
# Sort by pressure descending (so surface first, top last).
order = np.argsort(-pressures)
pressures = pressures[order]
h_col = h_col[order]
t_col = t_col[order]
u_col = u_col[order]
v_col = v_col[order]
# Keep only levels at or above ground (HRRR pressure levels below the
# surface are filled with extrapolated values — drop those by requiring
# monotonic height increase from the surface up).
sfc_h = float(np.min(h_col))
valid = h_col >= sfc_h - 1.0
h_col = h_col[valid]
t_col = t_col[valid]
u_col = u_col[valid]
v_col = v_col[valid]
pressures = pressures[valid]
# Make strictly monotonic increasing in height (in case of ties).
order = np.argsort(h_col)
h_col = h_col[order]
t_col = t_col[order]
u_col = u_col[order]
v_col = v_col[order]
pressures = pressures[order]
# Convert MSL heights to AGL by subtracting the lowest valid level.
z_agl = h_col - h_col[0]
theta = _theta(t_col, pressures)
if z_target_m is not None:
z_target = np.asarray(z_target_m, dtype=float)
# Clip target range to what HRRR actually covers at this point.
z_clipped = np.clip(z_target, float(z_agl[0]), float(z_agl[-1]))
u_out = np.interp(z_clipped, z_agl, u_col)
v_out = np.interp(z_clipped, z_agl, v_col)
th_out = np.interp(z_clipped, z_agl, theta)
z_out = z_target.copy()
else:
z_out = z_agl
u_out = u_col
v_out = v_col
th_out = theta
meta = {
"s3_key": key,
"grid_lat": grid_lat,
"grid_lon": grid_lon,
"bytes": bytes_downloaded,
"n_levels": int(z_agl.size),
"sfc_height_msl": float(h_col[0]),
}
return (
z_out.astype(float),
u_out.astype(float),
v_out.astype(float),
th_out.astype(float),
meta,
)
|