""" fetch_aqi.py — OpenAQ API fetcher for air quality (PM2.5) data. Downloads PM2.5 readings from OpenAQ for coordinates matching each satellite patch. Handles missing values with forward fill + linear interpolation. Falls back to synthetic data when API is unavailable. """ import logging from datetime import datetime, timedelta from typing import Optional, Tuple import numpy as np import pandas as pd import requests from src.training.config import ( OPENAQ_API_KEY, OPENAQ_BASE_URL, AQI_RAW_DIR, TIMESERIES_DIR, TIMESERIES_WINDOW, AQI_FORECAST_HOURS, NUM_SYNTHETIC_SAMPLES, ) logger = logging.getLogger(__name__) def fetch_aqi_data( latitude: float = 37.5, longitude: float = -120.3, radius_km: int = 50, start_date: str = "2024-06-01", end_date: str = "2024-06-08", parameter: str = "pm25", ) -> Optional[pd.DataFrame]: """ Fetch PM2.5 measurements from Open-Meteo Air Quality API. """ locations_url = "https://air-quality-api.open-meteo.com/v1/air-quality" params = { "latitude": latitude, "longitude": longitude, "hourly": "pm2_5", "start_date": start_date, "end_date": end_date, "timezone": "auto" } logger.info(f"Fetching Open-Meteo Air Quality for ({latitude}, {longitude})...") try: resp = requests.get(locations_url, params=params, timeout=30) resp.raise_for_status() data = resp.json() hourly = data.get("hourly", {}) times = hourly.get("time", []) pm2_5 = hourly.get("pm2_5", []) if not times or not pm2_5: logger.warning("No AQI data returned from Open-Meteo.") return None df = pd.DataFrame({ "datetime": pd.to_datetime(times), "value": pm2_5 }) # Open-Meteo might return NaN for missing hours, interpolate df["value"] = df["value"].interpolate(method="linear").ffill().bfill() logger.info(f"Fetched {len(df)} AQI readings from Open-Meteo.") return df except requests.RequestException as e: logger.error(f"Open-Meteo API request failed: {e}") return None def generate_synthetic_aqi( num_samples: int = NUM_SYNTHETIC_SAMPLES, window: int = TIMESERIES_WINDOW, forecast_hours: int = AQI_FORECAST_HOURS, ) -> Tuple[np.ndarray, np.ndarray]: """ Generate realistic synthetic PM2.5 time series. Creates PM2.5 sequences with realistic: - Diurnal variation (higher during day) - Temporal autocorrelation - Occasional pollution spikes (simulating fire events) Args: num_samples: Number of samples. window: Days of historical data per sample. forecast_hours: Hours of future AQI to predict. Returns: Tuple of (historical_pm25, future_pm25_targets): - historical: (num_samples, window, 1) — daily average PM2.5 - targets: (num_samples, forecast_hours) — hourly PM2.5 for 72 hrs """ logger.info(f"Generating {num_samples} synthetic AQI samples...") historical = np.zeros((num_samples, window, 1), dtype=np.float32) targets = np.zeros((num_samples, forecast_hours), dtype=np.float32) for i in range(num_samples): # Base PM2.5 level (AQI equivalent) base_aqi = np.random.uniform(20, 80) # Occasional fire events cause PM2.5 spikes has_fire_event = np.random.rand() > 0.5 if has_fire_event: base_aqi += np.random.uniform(30, 150) # Historical daily averages with autocorrelation daily_values = base_aqi + np.cumsum(np.random.randn(window) * 5) daily_values = np.clip(daily_values, 5, 500) historical[i, :, 0] = daily_values # Future hourly forecast with diurnal pattern hours = np.arange(forecast_hours) diurnal = 10 * np.sin(2 * np.pi * hours / 24 - np.pi / 2) # Peak at noon trend = np.cumsum(np.random.randn(forecast_hours) * 2) forecast = daily_values[-1] + diurnal + trend forecast = np.clip(forecast, 5, 500) targets[i] = forecast return historical, targets def fetch_aqi_for_location( latitude: float, longitude: float, target_date: str, window: int = TIMESERIES_WINDOW, ) -> Tuple[np.ndarray, np.ndarray]: """ Fetch AQI history for a single location and generate forecast targets. Returns: Tuple of (historical_pm25, forecast_targets). """ try: end_dt = datetime.strptime(target_date, "%Y-%m-%d") start_dt = end_dt - timedelta(days=window) df = fetch_aqi_data( latitude=latitude, longitude=longitude, start_date=start_dt.strftime("%Y-%m-%d"), end_date=end_dt.strftime("%Y-%m-%d"), ) if df is not None and len(df) >= 24: # Aggregate to daily df["date"] = df["datetime"].dt.date daily = df.groupby("date")["value"].mean().values[-window:] if len(daily) >= window: historical = daily.reshape(window, 1).astype(np.float32) # Use last known values to create simple forecast baseline last_val = daily[-1] forecast = np.full(AQI_FORECAST_HOURS, last_val, dtype=np.float32) forecast += np.cumsum(np.random.randn(AQI_FORECAST_HOURS) * 2) forecast = np.clip(forecast, 5, 500) return historical, forecast except Exception as e: logger.error(f"Error fetching AQI data: {e}") # Fallback to synthetic logger.warning("Falling back to synthetic AQI data.") hist, tgt = generate_synthetic_aqi(num_samples=1, window=window) return hist[0], tgt[0] def fetch_and_prepare_aqi( num_samples: int = NUM_SYNTHETIC_SAMPLES, save: bool = True, ) -> Tuple[np.ndarray, np.ndarray]: """ Generate or fetch complete AQI dataset for training. Returns: Tuple of (historical_pm25, forecast_targets). """ historical, targets = generate_synthetic_aqi(num_samples=num_samples) if save: np.save(TIMESERIES_DIR / "aqi_history.npy", historical) np.save(TIMESERIES_DIR / "aqi_targets.npy", targets) logger.info(f"Saved AQI data: history={historical.shape}, targets={targets.shape}") return historical, targets if __name__ == "__main__": logging.basicConfig(level=logging.INFO) hist, tgt = fetch_and_prepare_aqi() print(f"AQI history: {hist.shape}, AQI targets: {tgt.shape}")