krupal02's picture
Deploy Multi-Hazard Warning System - MTL model for wildfire risk + AQI forecasting
d5b0af1
Raw
History Blame Contribute Delete
6.56 kB
"""
fetch_aqi.py — OpenAQ API fetcher for air quality (PM2.5) data.
Downloads PM2.5 readings from OpenAQ for coordinates matching each
satellite patch. Handles missing values with forward fill + linear
interpolation. Falls back to synthetic data when API is unavailable.
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, Tuple
import numpy as np
import pandas as pd
import requests
from src.training.config import (
OPENAQ_API_KEY, OPENAQ_BASE_URL, AQI_RAW_DIR, TIMESERIES_DIR,
TIMESERIES_WINDOW, AQI_FORECAST_HOURS, NUM_SYNTHETIC_SAMPLES,
)
logger = logging.getLogger(__name__)
def fetch_aqi_data(
latitude: float = 37.5,
longitude: float = -120.3,
radius_km: int = 50,
start_date: str = "2024-06-01",
end_date: str = "2024-06-08",
parameter: str = "pm25",
) -> Optional[pd.DataFrame]:
"""
Fetch PM2.5 measurements from Open-Meteo Air Quality API.
"""
locations_url = "https://air-quality-api.open-meteo.com/v1/air-quality"
params = {
"latitude": latitude,
"longitude": longitude,
"hourly": "pm2_5",
"start_date": start_date,
"end_date": end_date,
"timezone": "auto"
}
logger.info(f"Fetching Open-Meteo Air Quality for ({latitude}, {longitude})...")
try:
resp = requests.get(locations_url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
hourly = data.get("hourly", {})
times = hourly.get("time", [])
pm2_5 = hourly.get("pm2_5", [])
if not times or not pm2_5:
logger.warning("No AQI data returned from Open-Meteo.")
return None
df = pd.DataFrame({
"datetime": pd.to_datetime(times),
"value": pm2_5
})
# Open-Meteo might return NaN for missing hours, interpolate
df["value"] = df["value"].interpolate(method="linear").ffill().bfill()
logger.info(f"Fetched {len(df)} AQI readings from Open-Meteo.")
return df
except requests.RequestException as e:
logger.error(f"Open-Meteo API request failed: {e}")
return None
def generate_synthetic_aqi(
num_samples: int = NUM_SYNTHETIC_SAMPLES,
window: int = TIMESERIES_WINDOW,
forecast_hours: int = AQI_FORECAST_HOURS,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Generate realistic synthetic PM2.5 time series.
Creates PM2.5 sequences with realistic:
- Diurnal variation (higher during day)
- Temporal autocorrelation
- Occasional pollution spikes (simulating fire events)
Args:
num_samples: Number of samples.
window: Days of historical data per sample.
forecast_hours: Hours of future AQI to predict.
Returns:
Tuple of (historical_pm25, future_pm25_targets):
- historical: (num_samples, window, 1) — daily average PM2.5
- targets: (num_samples, forecast_hours) — hourly PM2.5 for 72 hrs
"""
logger.info(f"Generating {num_samples} synthetic AQI samples...")
historical = np.zeros((num_samples, window, 1), dtype=np.float32)
targets = np.zeros((num_samples, forecast_hours), dtype=np.float32)
for i in range(num_samples):
# Base PM2.5 level (AQI equivalent)
base_aqi = np.random.uniform(20, 80)
# Occasional fire events cause PM2.5 spikes
has_fire_event = np.random.rand() > 0.5
if has_fire_event:
base_aqi += np.random.uniform(30, 150)
# Historical daily averages with autocorrelation
daily_values = base_aqi + np.cumsum(np.random.randn(window) * 5)
daily_values = np.clip(daily_values, 5, 500)
historical[i, :, 0] = daily_values
# Future hourly forecast with diurnal pattern
hours = np.arange(forecast_hours)
diurnal = 10 * np.sin(2 * np.pi * hours / 24 - np.pi / 2) # Peak at noon
trend = np.cumsum(np.random.randn(forecast_hours) * 2)
forecast = daily_values[-1] + diurnal + trend
forecast = np.clip(forecast, 5, 500)
targets[i] = forecast
return historical, targets
def fetch_aqi_for_location(
latitude: float,
longitude: float,
target_date: str,
window: int = TIMESERIES_WINDOW,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Fetch AQI history for a single location and generate forecast targets.
Returns:
Tuple of (historical_pm25, forecast_targets).
"""
try:
end_dt = datetime.strptime(target_date, "%Y-%m-%d")
start_dt = end_dt - timedelta(days=window)
df = fetch_aqi_data(
latitude=latitude,
longitude=longitude,
start_date=start_dt.strftime("%Y-%m-%d"),
end_date=end_dt.strftime("%Y-%m-%d"),
)
if df is not None and len(df) >= 24:
# Aggregate to daily
df["date"] = df["datetime"].dt.date
daily = df.groupby("date")["value"].mean().values[-window:]
if len(daily) >= window:
historical = daily.reshape(window, 1).astype(np.float32)
# Use last known values to create simple forecast baseline
last_val = daily[-1]
forecast = np.full(AQI_FORECAST_HOURS, last_val, dtype=np.float32)
forecast += np.cumsum(np.random.randn(AQI_FORECAST_HOURS) * 2)
forecast = np.clip(forecast, 5, 500)
return historical, forecast
except Exception as e:
logger.error(f"Error fetching AQI data: {e}")
# Fallback to synthetic
logger.warning("Falling back to synthetic AQI data.")
hist, tgt = generate_synthetic_aqi(num_samples=1, window=window)
return hist[0], tgt[0]
def fetch_and_prepare_aqi(
num_samples: int = NUM_SYNTHETIC_SAMPLES,
save: bool = True,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Generate or fetch complete AQI dataset for training.
Returns:
Tuple of (historical_pm25, forecast_targets).
"""
historical, targets = generate_synthetic_aqi(num_samples=num_samples)
if save:
np.save(TIMESERIES_DIR / "aqi_history.npy", historical)
np.save(TIMESERIES_DIR / "aqi_targets.npy", targets)
logger.info(f"Saved AQI data: history={historical.shape}, targets={targets.shape}")
return historical, targets
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
hist, tgt = fetch_and_prepare_aqi()
print(f"AQI history: {hist.shape}, AQI targets: {tgt.shape}")