| import logging |
| from typing import Optional, Dict, Any, List |
|
|
| import requests |
|
|
| from app.utils import config |
|
|
| logging.basicConfig( |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| level=logging.INFO, |
| ) |
|
|
|
|
| def fetch_forecast( |
| q: str, |
| days: int = 3, |
| alerts: str = "yes", |
| aqi: str = "yes", |
| lang: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| if not config.WEATHER_API_KEY: |
| raise RuntimeError("WEATHER_API_KEY is not configured") |
|
|
| url = "http://api.weatherapi.com/v1/forecast.json" |
| params: Dict[str, Any] = { |
| "key": config.WEATHER_API_KEY, |
| "q": q, |
| "days": max(1, min(int(days), 14)), |
| "alerts": alerts, |
| "aqi": aqi, |
| } |
| if lang: |
| params["lang"] = lang |
|
|
| resp = requests.get(url, params=params, timeout=10) |
| resp.raise_for_status() |
| return resp.json() |
|
|
|
|
| def _summarize_alerts(alerts_obj: Any, limit: int = 3) -> str: |
| if not alerts_obj or not isinstance(alerts_obj, dict): |
| return "No alerts." |
| alerts = alerts_obj.get("alert") or [] |
| if not alerts: |
| return "No alerts." |
|
|
| lines: List[str] = ["Alerts:"] |
| for a in alerts[: max(1, limit)]: |
| headline = a.get("headline") or a.get("event") or "Alert" |
| severity = a.get("severity") or "Unknown" |
| urgency = a.get("urgency") or "Unknown" |
| effective = a.get("effective") or "" |
| expires = a.get("expires") or "" |
| time_part = "" |
| if effective or expires: |
| time_part = f" ({effective} → {expires})" |
| lines.append(f"- {headline} | severity={severity}, urgency={urgency}{time_part}") |
| if len(alerts) > limit: |
| lines.append(f"- (+{len(alerts) - limit} more)") |
| return "\n".join(lines) |
|
|
|
|
| def summarize_forecast(data: Dict[str, Any], include_alerts: bool = True) -> str: |
| loc = data.get("location") or {} |
| cur = data.get("current") or {} |
| forecast = (data.get("forecast") or {}).get("forecastday") or [] |
| alerts_obj = data.get("alerts") or {} |
|
|
| name = loc.get("name") or loc.get("region") or "this area" |
| country = loc.get("country") or "" |
| where = f"{name}, {country}".strip().strip(",") |
|
|
| cond = (cur.get("condition") or {}).get("text", "Unknown") |
| temp_c = cur.get("temp_c", "?") |
| humidity = cur.get("humidity", "?") |
| wind_kph = cur.get("wind_kph", "?") |
|
|
| lines: List[str] = [ |
| f"Weather for {where}:", |
| f"- Now: {cond}, {temp_c}°C, humidity {humidity}%, wind {wind_kph} kph", |
| ] |
|
|
| if forecast: |
| lines.append("Forecast:") |
| for d in forecast[:3]: |
| date = d.get("date", "") |
| day = d.get("day") or {} |
| day_cond = (day.get("condition") or {}).get("text", "Unknown") |
| maxt = day.get("maxtemp_c", "?") |
| mint = day.get("mintemp_c", "?") |
| rain_chance = day.get("daily_chance_of_rain", "?") |
| lines.append(f"- {date}: {day_cond}, {mint}–{maxt}°C, rain chance {rain_chance}%") |
|
|
| if include_alerts: |
| lines.append(_summarize_alerts(alerts_obj)) |
|
|
| return "\n".join(lines).strip() |
|
|
|
|
| def forecast_summary_for_state(state: str) -> str: |
| q = f"{state}, Nigeria" |
| data = fetch_forecast( |
| q=q, |
| days=getattr(config, "WEATHER_FORECAST_DAYS", 3), |
| alerts=getattr(config, "WEATHER_ALERTS", "yes"), |
| aqi=getattr(config, "WEATHER_AQI", "yes"), |
| ) |
| return summarize_forecast(data, include_alerts=True) |
|
|
|
|
| def alerts_for_q(q: str) -> Dict[str, Any]: |
| data = fetch_forecast( |
| q=q, |
| days=getattr(config, "WEATHER_FORECAST_DAYS", 3), |
| alerts=getattr(config, "WEATHER_ALERTS", "yes"), |
| aqi=getattr(config, "WEATHER_AQI", "yes"), |
| ) |
| return { |
| "location": data.get("location"), |
| "alerts": data.get("alerts") or {}, |
| } |
|
|
|
|
| def forecast_summary_for_gps(lat: float, lon: float) -> str: |
| q = f"{lat},{lon}" |
| data = fetch_forecast( |
| q=q, |
| days=getattr(config, "WEATHER_FORECAST_DAYS", 3), |
| alerts=getattr(config, "WEATHER_ALERTS", "yes"), |
| aqi=getattr(config, "WEATHER_AQI", "yes"), |
| ) |
| return summarize_forecast(data, include_alerts=True) |
|
|
|
|
|
|