| """ |
| GHCN-Hourly Data Downloader |
| |
| Downloads and parses GHCN-Hourly (formerly ISD) data from NOAA NCEI. |
| https://www.ncei.noaa.gov/products/global-historical-climatology-network-hourly |
| |
| This dataset includes wind, temperature, pressure, humidity, clouds, and more |
| at hourly resolution from 20,000+ stations worldwide. |
| """ |
|
|
| import gzip |
| import json |
| from dataclasses import dataclass |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Generator, Optional, List, Union |
|
|
| import httpx |
| import pandas as pd |
| from loguru import logger |
| from tqdm import tqdm |
|
|
|
|
| @dataclass |
| class HourlyStation: |
| """GHCN-Hourly station metadata.""" |
|
|
| usaf: str |
| wban: str |
| station_name: str |
| country: str |
| state: Optional[str] |
| latitude: float |
| longitude: float |
| elevation: float |
| begin_date: datetime |
| end_date: datetime |
|
|
| @property |
| def id(self) -> str: |
| """Combined station ID.""" |
| return f"{self.usaf}-{self.wban}" |
|
|
|
|
| @dataclass |
| class HourlyObservation: |
| """Single hourly observation record.""" |
|
|
| station_id: str |
| timestamp: datetime |
| latitude: float |
| longitude: float |
| elevation: float |
|
|
| |
| wind_direction: Optional[float] |
| wind_speed: Optional[float] |
| wind_gust: Optional[float] |
|
|
| |
| temperature: Optional[float] |
| dew_point: Optional[float] |
|
|
| |
| sea_level_pressure: Optional[float] |
| station_pressure: Optional[float] |
|
|
| |
| relative_humidity: Optional[float] |
|
|
| |
| visibility: Optional[float] |
|
|
| |
| precipitation_1h: Optional[float] |
| precipitation_6h: Optional[float] |
|
|
| |
| cloud_ceiling: Optional[float] |
| cloud_coverage: Optional[str] |
|
|
| |
| quality_control: str |
|
|
|
|
| class GHCNHourlyDownloader: |
| """ |
| Downloads and parses GHCN-Hourly (ISD-Lite) data. |
| |
| GHCN-Hourly provides sub-daily observations including wind, temperature, |
| pressure, and humidity from global surface stations. |
| |
| We use the ISD-Lite format which is a simplified version containing the |
| most essential variables. |
| |
| Example usage: |
| downloader = GHCNHourlyDownloader(output_dir="data/raw/ghcn_hourly") |
| downloader.download_station_list() |
| |
| # Download data for specific stations and years |
| for station in downloader.get_stations(country="US", min_years=30): |
| downloader.download_station_year(station.usaf, station.wban, 2023) |
| """ |
|
|
| |
| BASE_URL = "https://www.ncei.noaa.gov/pub/data/noaa/isd-lite/" |
| STATION_LIST_URL = "https://www.ncei.noaa.gov/pub/data/noaa/isd-history.csv" |
|
|
| def __init__( |
| self, |
| output_dir: Union[str, Path] = "data/raw/ghcn_hourly", |
| timeout: float = 60.0, |
| ): |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
| self.timeout = timeout |
| self._client: Optional[httpx.Client] = None |
|
|
| @property |
| def client(self) -> httpx.Client: |
| """Lazy-initialized HTTP client.""" |
| if self._client is None: |
| self._client = httpx.Client(timeout=self.timeout, follow_redirects=True) |
| return self._client |
|
|
| def __enter__(self) -> "GHCNHourlyDownloader": |
| return self |
|
|
| def __exit__(self, *args) -> None: |
| if self._client: |
| self._client.close() |
|
|
| def download_station_list(self, force: bool = False) -> Path: |
| """Download the station history/metadata file.""" |
| output_path = self.output_dir / "isd-history.csv" |
|
|
| if output_path.exists() and not force: |
| logger.info(f"Station list already exists: {output_path}") |
| return output_path |
|
|
| logger.info(f"Downloading station list from {self.STATION_LIST_URL}") |
| response = self.client.get(self.STATION_LIST_URL) |
| response.raise_for_status() |
|
|
| output_path.write_text(response.text) |
| logger.success(f"Downloaded station list to {output_path}") |
| return output_path |
|
|
| def parse_stations(self, path: Optional[Path] = None) -> List[HourlyStation]: |
| """Parse the station history CSV file.""" |
| if path is None: |
| path = self.output_dir / "isd-history.csv" |
|
|
| if not path.exists(): |
| self.download_station_list() |
|
|
| df = pd.read_csv(path, low_memory=False) |
|
|
| stations = [] |
| for _, row in df.iterrows(): |
| try: |
| |
| if pd.isna(row.get("LAT")) or pd.isna(row.get("LON")): |
| continue |
|
|
| station = HourlyStation( |
| usaf=str(row["USAF"]).zfill(6), |
| wban=str(row["WBAN"]).zfill(5), |
| station_name=str(row.get("STATION NAME", "")), |
| country=str(row.get("CTRY", "")), |
| state=str(row.get("STATE", "")) if pd.notna(row.get("STATE")) else None, |
| latitude=float(row["LAT"]), |
| longitude=float(row["LON"]), |
| elevation=float(row.get("ELEV(M)", 0)) if pd.notna(row.get("ELEV(M)")) else 0.0, |
| begin_date=pd.to_datetime(str(row.get("BEGIN", "19000101")), format="%Y%m%d"), |
| end_date=pd.to_datetime(str(row.get("END", "20991231")), format="%Y%m%d"), |
| ) |
| stations.append(station) |
| except Exception as e: |
| logger.debug(f"Skipping station: {e}") |
| continue |
|
|
| logger.info(f"Parsed {len(stations)} stations") |
| return stations |
|
|
| def get_stations( |
| self, |
| country: Optional[str] = None, |
| min_years: int = 0, |
| bbox: Optional[tuple[float, float, float, float]] = None, |
| active_only: bool = True, |
| ) -> List[HourlyStation]: |
| """ |
| Get stations matching criteria. |
| |
| Args: |
| country: 2-letter country code |
| min_years: Minimum years of data required |
| bbox: Bounding box (min_lon, min_lat, max_lon, max_lat) |
| active_only: Only include stations with data through 2023+ |
| |
| Returns: |
| List of matching stations |
| """ |
| stations = self.parse_stations() |
|
|
| if country: |
| stations = [s for s in stations if s.country == country] |
|
|
| if bbox: |
| min_lon, min_lat, max_lon, max_lat = bbox |
| stations = [ |
| s |
| for s in stations |
| if min_lon <= s.longitude <= max_lon and min_lat <= s.latitude <= max_lat |
| ] |
|
|
| if min_years > 0: |
| stations = [ |
| s |
| for s in stations |
| if (s.end_date - s.begin_date).days / 365 >= min_years |
| ] |
|
|
| if active_only: |
| cutoff = datetime(2023, 1, 1) |
| stations = [s for s in stations if s.end_date >= cutoff] |
|
|
| logger.info(f"Found {len(stations)} matching stations") |
| return stations |
|
|
| def download_station_year( |
| self, |
| usaf: str, |
| wban: str, |
| year: int, |
| force: bool = False, |
| ) -> Optional[Path]: |
| """ |
| Download ISD-Lite data for a station-year. |
| |
| ISD-Lite files are organized by year: {year}/{usaf}-{wban}-{year}.gz |
| """ |
| filename = f"{usaf}-{wban}-{year}.gz" |
| url = f"{self.BASE_URL}{year}/{filename}" |
| output_path = self.output_dir / "data" / str(year) / filename |
|
|
| if output_path.exists() and not force: |
| logger.debug(f"Data already exists: {output_path}") |
| return output_path |
|
|
| output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| try: |
| response = self.client.get(url) |
| response.raise_for_status() |
| output_path.write_bytes(response.content) |
| return output_path |
| except httpx.HTTPStatusError as e: |
| if e.response.status_code == 404: |
| logger.debug(f"No data for {usaf}-{wban} in {year}") |
| return None |
| raise |
|
|
| def parse_isd_lite( |
| self, |
| usaf: str, |
| wban: str, |
| year: int, |
| ) -> Generator[HourlyObservation, None, None]: |
| """ |
| Parse an ISD-Lite file and yield observations. |
| |
| ISD-Lite format (fixed-width, space-separated): |
| Field 1: Year |
| Field 2: Month |
| Field 3: Day |
| Field 4: Hour |
| Field 5: Air Temperature (°C * 10) |
| Field 6: Dew Point Temperature (°C * 10) |
| Field 7: Sea Level Pressure (hPa * 10) |
| Field 8: Wind Direction (degrees) |
| Field 9: Wind Speed (m/s * 10) |
| Field 10: Sky Condition Total Coverage Code |
| Field 11: Liquid Precipitation Depth 1-Hour (mm * 10) |
| Field 12: Liquid Precipitation Depth 6-Hour (mm * 10) |
| |
| Missing values are represented as -9999. |
| """ |
| path = self.output_dir / "data" / str(year) / f"{usaf}-{wban}-{year}.gz" |
|
|
| if not path.exists(): |
| result = self.download_station_year(usaf, wban, year) |
| if result is None: |
| return |
|
|
| station_id = f"{usaf}-{wban}" |
|
|
| with gzip.open(path, "rt") as f: |
| for line in f: |
| parts = line.split() |
| if len(parts) < 12: |
| continue |
|
|
| try: |
| year_val = int(parts[0]) |
| month = int(parts[1]) |
| day = int(parts[2]) |
| hour = int(parts[3]) |
|
|
| timestamp = datetime(year_val, month, day, hour) |
|
|
| |
| def parse_val(idx: int, scale: float = 10.0) -> Optional[float]: |
| val = int(parts[idx]) |
| return val / scale if val != -9999 else None |
|
|
| yield HourlyObservation( |
| station_id=station_id, |
| timestamp=timestamp, |
| latitude=0.0, |
| longitude=0.0, |
| elevation=0.0, |
| wind_direction=parse_val(7, 1.0), |
| wind_speed=parse_val(8, 10.0), |
| wind_gust=None, |
| temperature=parse_val(4, 10.0), |
| dew_point=parse_val(5, 10.0), |
| sea_level_pressure=parse_val(6, 10.0), |
| station_pressure=None, |
| relative_humidity=None, |
| visibility=None, |
| precipitation_1h=parse_val(10, 10.0), |
| precipitation_6h=parse_val(11, 10.0), |
| cloud_ceiling=None, |
| cloud_coverage=str(int(parts[9])) if int(parts[9]) != -9999 else None, |
| quality_control="", |
| ) |
| except (ValueError, IndexError) as e: |
| logger.debug(f"Parse error: {e}") |
| continue |
|
|
| def station_year_to_dataframe( |
| self, |
| usaf: str, |
| wban: str, |
| year: int, |
| ) -> pd.DataFrame: |
| """Load station-year data as a pandas DataFrame.""" |
| observations = list(self.parse_isd_lite(usaf, wban, year)) |
|
|
| if not observations: |
| return pd.DataFrame() |
|
|
| df = pd.DataFrame([vars(o) for o in observations]) |
| df = df.set_index("timestamp").sort_index() |
|
|
| return df |
|
|
| def download_station_range( |
| self, |
| usaf: str, |
| wban: str, |
| start_year: int, |
| end_year: int, |
| ) -> List[Path]: |
| """Download multiple years of data for a station.""" |
| paths = [] |
| for year in range(start_year, end_year + 1): |
| result = self.download_station_year(usaf, wban, year) |
| if result: |
| paths.append(result) |
| return paths |
|
|
| def download_all( |
| self, |
| stations: Optional[List[HourlyStation]] = None, |
| years: Optional[List[int]] = None, |
| max_stations: Optional[int] = None, |
| **filter_kwargs, |
| ) -> int: |
| """ |
| Download data for multiple stations and years. |
| |
| Returns count of files downloaded. |
| """ |
| if stations is None: |
| stations = self.get_stations(**filter_kwargs) |
|
|
| if max_stations: |
| stations = stations[:max_stations] |
|
|
| if years is None: |
| years = list(range(2000, 2024)) |
|
|
| count = 0 |
| for station in tqdm(stations, desc="Downloading stations"): |
| for year in years: |
| try: |
| result = self.download_station_year(station.usaf, station.wban, year) |
| if result: |
| count += 1 |
| except Exception as e: |
| logger.warning(f"Failed to download {station.id}/{year}: {e}") |
|
|
| logger.success(f"Downloaded {count} station-year files") |
| return count |
|
|
|
|
| def main(): |
| """CLI entry point for downloading GHCN-Hourly data.""" |
| import argparse |
|
|
| parser = argparse.ArgumentParser(description="Download GHCN-Hourly (ISD-Lite) data") |
| parser.add_argument( |
| "--output-dir", |
| default="data/raw/ghcn_hourly", |
| help="Output directory for downloaded data", |
| ) |
| parser.add_argument( |
| "--country", |
| default=None, |
| help="Filter by country code (e.g., US, CA, GB)", |
| ) |
| parser.add_argument( |
| "--min-years", |
| type=int, |
| default=20, |
| help="Minimum years of data required", |
| ) |
| parser.add_argument( |
| "--max-stations", |
| type=int, |
| default=None, |
| help="Maximum number of stations to download", |
| ) |
| parser.add_argument( |
| "--start-year", |
| type=int, |
| default=2000, |
| help="Start year for data download", |
| ) |
| parser.add_argument( |
| "--end-year", |
| type=int, |
| default=2023, |
| help="End year for data download", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| with GHCNHourlyDownloader(output_dir=args.output_dir) as downloader: |
| downloader.download_station_list() |
|
|
| years = list(range(args.start_year, args.end_year + 1)) |
| downloader.download_all( |
| country=args.country, |
| min_years=args.min_years, |
| max_stations=args.max_stations, |
| years=years, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|