import aiohttp import asyncio import os import xarray as xr import requests from datetime import datetime from typing import Dict, Any # ============================================================================ # WEATHER SERVER (Open-Meteo) # ============================================================================ class WeatherServer: """Open-Meteo Weather API Server""" async def get_data(self, lat: float, lon: float) -> Dict[str, Any]: try: url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": lat, "longitude": lon, "current": "temperature_2m,precipitation,wind_speed_10m,relative_humidity_2m", "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,rain_sum", "timezone": "Asia/Kolkata", "forecast_days": 7 } async with aiohttp.ClientSession() as session: async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as response: if response.status == 200: data = await response.json() return { "status": "success", "data": { "current_temp_c": data["current"]["temperature_2m"], "current_precipitation_mm": data["current"]["precipitation"], "wind_speed_kmh": data["current"]["wind_speed_10m"], "humidity_percent": data["current"]["relative_humidity_2m"], "forecast_7day": { "max_temps": data["daily"]["temperature_2m_max"], "min_temps": data["daily"]["temperature_2m_min"], "precipitation_mm": data["daily"]["precipitation_sum"], "rain_mm": data["daily"]["rain_sum"] }, "data_source": "Open-Meteo API" } } else: return {"status": "error", "error": f"HTTP {response.status}"} except Exception as e: return {"status": "error", "error": str(e)}