Spaces:
Sleeping
Sleeping
| import aiohttp | |
| import asyncio | |
| import os | |
| import xarray as xr | |
| import requests | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| # ============================================================================ | |
| # WEATHER SERVER (Open-Meteo) | |
| # ============================================================================ | |
| class WeatherServer: | |
| """Open-Meteo Weather API Server""" | |
| async def get_data(self, lat: float, lon: float) -> Dict[str, Any]: | |
| try: | |
| url = "https://api.open-meteo.com/v1/forecast" | |
| params = { | |
| "latitude": lat, | |
| "longitude": lon, | |
| "current": "temperature_2m,precipitation,wind_speed_10m,relative_humidity_2m", | |
| "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,rain_sum", | |
| "timezone": "Asia/Kolkata", | |
| "forecast_days": 7 | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| return { | |
| "status": "success", | |
| "data": { | |
| "current_temp_c": data["current"]["temperature_2m"], | |
| "current_precipitation_mm": data["current"]["precipitation"], | |
| "wind_speed_kmh": data["current"]["wind_speed_10m"], | |
| "humidity_percent": data["current"]["relative_humidity_2m"], | |
| "forecast_7day": { | |
| "max_temps": data["daily"]["temperature_2m_max"], | |
| "min_temps": data["daily"]["temperature_2m_min"], | |
| "precipitation_mm": data["daily"]["precipitation_sum"], | |
| "rain_mm": data["daily"]["rain_sum"] | |
| }, | |
| "data_source": "Open-Meteo API" | |
| } | |
| } | |
| else: | |
| return {"status": "error", "error": f"HTTP {response.status}"} | |
| except Exception as e: | |
| return {"status": "error", "error": str(e)} |