akashub
feat(Initial project setup): Added code for initial setup
b20698b
raw
history blame
2.4 kB
import aiohttp
import asyncio
import os
import xarray as xr
import requests
from datetime import datetime
from typing import Dict, Any
# ============================================================================
# WEATHER SERVER (Open-Meteo)
# ============================================================================
class WeatherServer:
"""Open-Meteo Weather API Server"""
async def get_data(self, lat: float, lon: float) -> Dict[str, Any]:
try:
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"current": "temperature_2m,precipitation,wind_speed_10m,relative_humidity_2m",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,rain_sum",
"timezone": "Asia/Kolkata",
"forecast_days": 7
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
return {
"status": "success",
"data": {
"current_temp_c": data["current"]["temperature_2m"],
"current_precipitation_mm": data["current"]["precipitation"],
"wind_speed_kmh": data["current"]["wind_speed_10m"],
"humidity_percent": data["current"]["relative_humidity_2m"],
"forecast_7day": {
"max_temps": data["daily"]["temperature_2m_max"],
"min_temps": data["daily"]["temperature_2m_min"],
"precipitation_mm": data["daily"]["precipitation_sum"],
"rain_mm": data["daily"]["rain_sum"]
},
"data_source": "Open-Meteo API"
}
}
else:
return {"status": "error", "error": f"HTTP {response.status}"}
except Exception as e:
return {"status": "error", "error": str(e)}