| """ |
| fetch_aqi.py — OpenAQ API fetcher for air quality (PM2.5) data. |
| |
| Downloads PM2.5 readings from OpenAQ for coordinates matching each |
| satellite patch. Handles missing values with forward fill + linear |
| interpolation. Falls back to synthetic data when API is unavailable. |
| """ |
|
|
| import logging |
| from datetime import datetime, timedelta |
| from typing import Optional, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| import requests |
|
|
| from src.training.config import ( |
| OPENAQ_API_KEY, OPENAQ_BASE_URL, AQI_RAW_DIR, TIMESERIES_DIR, |
| TIMESERIES_WINDOW, AQI_FORECAST_HOURS, NUM_SYNTHETIC_SAMPLES, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def fetch_aqi_data( |
| latitude: float = 37.5, |
| longitude: float = -120.3, |
| radius_km: int = 50, |
| start_date: str = "2024-06-01", |
| end_date: str = "2024-06-08", |
| parameter: str = "pm25", |
| ) -> Optional[pd.DataFrame]: |
| """ |
| Fetch PM2.5 measurements from Open-Meteo Air Quality API. |
| """ |
| locations_url = "https://air-quality-api.open-meteo.com/v1/air-quality" |
| params = { |
| "latitude": latitude, |
| "longitude": longitude, |
| "hourly": "pm2_5", |
| "start_date": start_date, |
| "end_date": end_date, |
| "timezone": "auto" |
| } |
|
|
| logger.info(f"Fetching Open-Meteo Air Quality for ({latitude}, {longitude})...") |
|
|
| try: |
| resp = requests.get(locations_url, params=params, timeout=30) |
| resp.raise_for_status() |
| data = resp.json() |
| |
| hourly = data.get("hourly", {}) |
| times = hourly.get("time", []) |
| pm2_5 = hourly.get("pm2_5", []) |
| |
| if not times or not pm2_5: |
| logger.warning("No AQI data returned from Open-Meteo.") |
| return None |
|
|
| df = pd.DataFrame({ |
| "datetime": pd.to_datetime(times), |
| "value": pm2_5 |
| }) |
| |
| |
| df["value"] = df["value"].interpolate(method="linear").ffill().bfill() |
| |
| logger.info(f"Fetched {len(df)} AQI readings from Open-Meteo.") |
| return df |
|
|
| except requests.RequestException as e: |
| logger.error(f"Open-Meteo API request failed: {e}") |
| return None |
|
|
|
|
| def generate_synthetic_aqi( |
| num_samples: int = NUM_SYNTHETIC_SAMPLES, |
| window: int = TIMESERIES_WINDOW, |
| forecast_hours: int = AQI_FORECAST_HOURS, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Generate realistic synthetic PM2.5 time series. |
| |
| Creates PM2.5 sequences with realistic: |
| - Diurnal variation (higher during day) |
| - Temporal autocorrelation |
| - Occasional pollution spikes (simulating fire events) |
| |
| Args: |
| num_samples: Number of samples. |
| window: Days of historical data per sample. |
| forecast_hours: Hours of future AQI to predict. |
| |
| Returns: |
| Tuple of (historical_pm25, future_pm25_targets): |
| - historical: (num_samples, window, 1) — daily average PM2.5 |
| - targets: (num_samples, forecast_hours) — hourly PM2.5 for 72 hrs |
| """ |
| logger.info(f"Generating {num_samples} synthetic AQI samples...") |
|
|
| historical = np.zeros((num_samples, window, 1), dtype=np.float32) |
| targets = np.zeros((num_samples, forecast_hours), dtype=np.float32) |
|
|
| for i in range(num_samples): |
| |
| base_aqi = np.random.uniform(20, 80) |
|
|
| |
| has_fire_event = np.random.rand() > 0.5 |
| if has_fire_event: |
| base_aqi += np.random.uniform(30, 150) |
|
|
| |
| daily_values = base_aqi + np.cumsum(np.random.randn(window) * 5) |
| daily_values = np.clip(daily_values, 5, 500) |
| historical[i, :, 0] = daily_values |
|
|
| |
| hours = np.arange(forecast_hours) |
| diurnal = 10 * np.sin(2 * np.pi * hours / 24 - np.pi / 2) |
| trend = np.cumsum(np.random.randn(forecast_hours) * 2) |
| forecast = daily_values[-1] + diurnal + trend |
| forecast = np.clip(forecast, 5, 500) |
| targets[i] = forecast |
|
|
| return historical, targets |
|
|
|
|
| def fetch_aqi_for_location( |
| latitude: float, |
| longitude: float, |
| target_date: str, |
| window: int = TIMESERIES_WINDOW, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Fetch AQI history for a single location and generate forecast targets. |
| |
| Returns: |
| Tuple of (historical_pm25, forecast_targets). |
| """ |
| try: |
| end_dt = datetime.strptime(target_date, "%Y-%m-%d") |
| start_dt = end_dt - timedelta(days=window) |
|
|
| df = fetch_aqi_data( |
| latitude=latitude, |
| longitude=longitude, |
| start_date=start_dt.strftime("%Y-%m-%d"), |
| end_date=end_dt.strftime("%Y-%m-%d"), |
| ) |
|
|
| if df is not None and len(df) >= 24: |
| |
| df["date"] = df["datetime"].dt.date |
| daily = df.groupby("date")["value"].mean().values[-window:] |
|
|
| if len(daily) >= window: |
| historical = daily.reshape(window, 1).astype(np.float32) |
| |
| last_val = daily[-1] |
| forecast = np.full(AQI_FORECAST_HOURS, last_val, dtype=np.float32) |
| forecast += np.cumsum(np.random.randn(AQI_FORECAST_HOURS) * 2) |
| forecast = np.clip(forecast, 5, 500) |
| return historical, forecast |
| except Exception as e: |
| logger.error(f"Error fetching AQI data: {e}") |
|
|
| |
| logger.warning("Falling back to synthetic AQI data.") |
| hist, tgt = generate_synthetic_aqi(num_samples=1, window=window) |
| return hist[0], tgt[0] |
|
|
|
|
| def fetch_and_prepare_aqi( |
| num_samples: int = NUM_SYNTHETIC_SAMPLES, |
| save: bool = True, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Generate or fetch complete AQI dataset for training. |
| |
| Returns: |
| Tuple of (historical_pm25, forecast_targets). |
| """ |
| historical, targets = generate_synthetic_aqi(num_samples=num_samples) |
|
|
| if save: |
| np.save(TIMESERIES_DIR / "aqi_history.npy", historical) |
| np.save(TIMESERIES_DIR / "aqi_targets.npy", targets) |
| logger.info(f"Saved AQI data: history={historical.shape}, targets={targets.shape}") |
|
|
| return historical, targets |
|
|
|
|
| if __name__ == "__main__": |
| logging.basicConfig(level=logging.INFO) |
| hist, tgt = fetch_and_prepare_aqi() |
| print(f"AQI history: {hist.shape}, AQI targets: {tgt.shape}") |
|
|