Spaces:
Running
Running
| """ | |
| Fetch real-time HRRR analysis data from NOAA AWS S3 via Herbie. | |
| Downloads 42 individual GRIB2 messages (one per input channel), | |
| extracts the New England subgrid (450x449), and stacks them into | |
| the model's expected input format. | |
| """ | |
| import logging | |
| from datetime import datetime, timedelta, timezone | |
| import numpy as np | |
| from var_mapping import HRRR_MAPPING, NE_Y_SLICE, NE_X_SLICE | |
| logger = logging.getLogger(__name__) | |
| def find_latest_hrrr_cycle(max_lookback_hours: int = 6) -> datetime: | |
| """ | |
| Find the most recent HRRR cycle that is available on AWS S3. | |
| HRRR data typically becomes available ~45-90 minutes after valid time. | |
| Returns a tz-naive datetime in UTC (Herbie requirement). | |
| """ | |
| from herbie import Herbie | |
| now = datetime.now(timezone.utc).replace(tzinfo=None) # tz-naive UTC | |
| for hours_ago in range(2, max_lookback_hours + 1): | |
| cycle_time = (now - timedelta(hours=hours_ago)).replace( | |
| minute=0, second=0, microsecond=0 | |
| ) | |
| try: | |
| H = Herbie( | |
| cycle_time, | |
| model="hrrr", | |
| product="sfc", | |
| fxx=0, | |
| verbose=False, | |
| ) | |
| # Check if the index file exists (fast check without downloading data) | |
| if H.idx is not None: | |
| logger.info(f"Found HRRR cycle: {cycle_time:%Y-%m-%d %H:%M UTC}") | |
| return cycle_time | |
| except Exception: | |
| continue | |
| raise RuntimeError( | |
| f"No HRRR data available in the last {max_lookback_hours} hours. " | |
| "NOAA servers may be temporarily unavailable." | |
| ) | |
| def _fetch_single_variable(cycle_time: datetime, mapping: dict) -> np.ndarray: | |
| """ | |
| Fetch one variable from HRRR and extract the NE subgrid. | |
| Returns: | |
| np.ndarray of shape (450, 449) | |
| """ | |
| from herbie import Herbie | |
| H = Herbie( | |
| cycle_time, | |
| model="hrrr", | |
| product=mapping["product"], | |
| fxx=mapping["fxx"], | |
| verbose=False, | |
| ) | |
| ds = H.xarray(mapping["search"], remove_grib=True) | |
| # xarray dataset may have different variable names depending on the GRIB message. | |
| # Get the first data variable (excluding coordinates). | |
| data_vars = [v for v in ds.data_vars if v not in ("latitude", "longitude", "gribfile_projection")] | |
| if not data_vars: | |
| raise ValueError(f"No data variable found for {mapping['name']}") | |
| field = ds[data_vars[0]].values # Full CONUS grid | |
| # Extract NE subgrid | |
| subgrid = field[NE_Y_SLICE, NE_X_SLICE] | |
| if subgrid.shape != (450, 449): | |
| raise ValueError( | |
| f"Unexpected shape {subgrid.shape} for {mapping['name']}, expected (450, 449)" | |
| ) | |
| return subgrid.astype(np.float32) | |
| def fetch_hrrr_input( | |
| cycle_time: datetime = None, | |
| progress_callback=None, | |
| ) -> tuple[np.ndarray, datetime]: | |
| """ | |
| Fetch all 42 HRRR channels and stack into model input format. | |
| Args: | |
| cycle_time: Specific HRRR cycle to fetch. If None, finds the latest. | |
| progress_callback: Optional callable(fraction, description) for progress updates. | |
| Returns: | |
| (input_array, cycle_time) where input_array is (450, 449, 42) float32. | |
| """ | |
| if cycle_time is None: | |
| if progress_callback: | |
| progress_callback(0.05, "Finding latest HRRR cycle...") | |
| cycle_time = find_latest_hrrr_cycle() | |
| channels = [] | |
| n_channels = len(HRRR_MAPPING) | |
| failed = [] | |
| for i, mapping in enumerate(HRRR_MAPPING): | |
| if progress_callback: | |
| frac = 0.1 + 0.85 * (i / n_channels) | |
| progress_callback(frac, f"Fetching {mapping['name']} ({i+1}/{n_channels})...") | |
| try: | |
| field = _fetch_single_variable(cycle_time, mapping) | |
| channels.append(field) | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch {mapping['name']}: {e}") | |
| failed.append(mapping["name"]) | |
| # Fill with zeros as fallback for individual missing channels | |
| channels.append(np.zeros((450, 449), dtype=np.float32)) | |
| if failed: | |
| logger.warning(f"Failed channels ({len(failed)}/{n_channels}): {failed}") | |
| if len(failed) > n_channels // 2: | |
| raise RuntimeError( | |
| f"Too many channels failed ({len(failed)}/{n_channels}). " | |
| "HRRR data may be unavailable." | |
| ) | |
| input_array = np.stack(channels, axis=-1) # (450, 449, 42) | |
| if progress_callback: | |
| progress_callback(1.0, "Data fetch complete!") | |
| return input_array, cycle_time | |