import logging from typing import Optional, Dict, Any, List import requests from app.utils import config logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", level=logging.INFO, ) def fetch_forecast( q: str, days: int = 3, alerts: str = "yes", aqi: str = "yes", lang: Optional[str] = None, ) -> Dict[str, Any]: if not config.WEATHER_API_KEY: raise RuntimeError("WEATHER_API_KEY is not configured") url = "http://api.weatherapi.com/v1/forecast.json" params: Dict[str, Any] = { "key": config.WEATHER_API_KEY, "q": q, "days": max(1, min(int(days), 14)), "alerts": alerts, "aqi": aqi, } if lang: params["lang"] = lang resp = requests.get(url, params=params, timeout=10) resp.raise_for_status() return resp.json() def _summarize_alerts(alerts_obj: Any, limit: int = 3) -> str: if not alerts_obj or not isinstance(alerts_obj, dict): return "No alerts." alerts = alerts_obj.get("alert") or [] if not alerts: return "No alerts." lines: List[str] = ["Alerts:"] for a in alerts[: max(1, limit)]: headline = a.get("headline") or a.get("event") or "Alert" severity = a.get("severity") or "Unknown" urgency = a.get("urgency") or "Unknown" effective = a.get("effective") or "" expires = a.get("expires") or "" time_part = "" if effective or expires: time_part = f" ({effective} → {expires})" lines.append(f"- {headline} | severity={severity}, urgency={urgency}{time_part}") if len(alerts) > limit: lines.append(f"- (+{len(alerts) - limit} more)") return "\n".join(lines) def summarize_forecast(data: Dict[str, Any], include_alerts: bool = True) -> str: loc = data.get("location") or {} cur = data.get("current") or {} forecast = (data.get("forecast") or {}).get("forecastday") or [] alerts_obj = data.get("alerts") or {} name = loc.get("name") or loc.get("region") or "this area" country = loc.get("country") or "" where = f"{name}, {country}".strip().strip(",") cond = (cur.get("condition") or {}).get("text", "Unknown") temp_c = cur.get("temp_c", "?") humidity = cur.get("humidity", "?") wind_kph = cur.get("wind_kph", "?") lines: List[str] = [ f"Weather for {where}:", f"- Now: {cond}, {temp_c}°C, humidity {humidity}%, wind {wind_kph} kph", ] if forecast: lines.append("Forecast:") for d in forecast[:3]: date = d.get("date", "") day = d.get("day") or {} day_cond = (day.get("condition") or {}).get("text", "Unknown") maxt = day.get("maxtemp_c", "?") mint = day.get("mintemp_c", "?") rain_chance = day.get("daily_chance_of_rain", "?") lines.append(f"- {date}: {day_cond}, {mint}–{maxt}°C, rain chance {rain_chance}%") if include_alerts: lines.append(_summarize_alerts(alerts_obj)) return "\n".join(lines).strip() def forecast_summary_for_state(state: str) -> str: q = f"{state}, Nigeria" data = fetch_forecast( q=q, days=getattr(config, "WEATHER_FORECAST_DAYS", 3), alerts=getattr(config, "WEATHER_ALERTS", "yes"), aqi=getattr(config, "WEATHER_AQI", "yes"), ) return summarize_forecast(data, include_alerts=True) def alerts_for_q(q: str) -> Dict[str, Any]: data = fetch_forecast( q=q, days=getattr(config, "WEATHER_FORECAST_DAYS", 3), alerts=getattr(config, "WEATHER_ALERTS", "yes"), aqi=getattr(config, "WEATHER_AQI", "yes"), ) return { "location": data.get("location"), "alerts": data.get("alerts") or {}, } def forecast_summary_for_gps(lat: float, lon: float) -> str: q = f"{lat},{lon}" data = fetch_forecast( q=q, days=getattr(config, "WEATHER_FORECAST_DAYS", 3), alerts=getattr(config, "WEATHER_ALERTS", "yes"), aqi=getattr(config, "WEATHER_AQI", "yes"), ) return summarize_forecast(data, include_alerts=True)