NovaAI / tools /weather_tool.py
veteroner's picture
CRANE AI ZeroGPU Space yüklendi
58c4fec verified
"""
CRANE AI - Weather Tool
Gerçek zamanlı hava durumu verisi için tool
"""
import asyncio
import json
from typing import Dict, Any, Optional
import logging
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
import requests
AIOHTTP_AVAILABLE = False
logger = logging.getLogger(__name__)
class WeatherTool:
"""Hava durumu sorguları için external tool"""
def __init__(self):
# Ücretsiz OpenWeatherMap API kullanabiliriz
self.base_url = "http://api.openweathermap.org/data/2.5/weather"
self.api_key = None # API key gerekirse eklenebilir
# Alternatif olarak wttr.in kullanabiliriz (ücretsiz)
self.wttr_url = "http://wttr.in/{city}?format=j1"
def can_handle(self, query: str) -> bool:
"""Bu tool'un handle edebileceği sorguları tespit eder"""
weather_keywords = [
"hava", "weather", "sıcaklık", "temperature", "yağmur", "rain",
"kar", "snow", "rüzgar", "wind", "nem", "humidity", "güneş", "sun",
"bulut", "cloud", "fırtına", "storm", "bugün", "today", "yarın", "tomorrow"
]
query_lower = query.lower()
return any(keyword in query_lower for keyword in weather_keywords)
async def get_weather(self, city: str = "Istanbul") -> Dict[str, Any]:
"""Belirtilen şehir için hava durumu bilgisi alır"""
try:
# wttr.in API kullan (ücretsiz ve API key gerektirmez)
url = f"http://wttr.in/{city}?format=j1"
if AIOHTTP_AVAILABLE:
# Async aiohttp kullan
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
data = await response.json()
else:
return {
"error": f"API response error: {response.status}",
"status": "error"
}
else:
# Fallback to sync requests
import requests
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
else:
return {
"error": f"API response error: {response.status_code}",
"status": "error"
}
# Veriyi parse et
current = data.get("current_condition", [{}])[0]
weather_desc = current.get("weatherDesc", [{}])[0].get("value", "Bilinmiyor")
temp_c = current.get("temp_C", "N/A")
humidity = current.get("humidity", "N/A")
wind_speed = current.get("windspeedKmph", "N/A")
return {
"city": city,
"temperature": f"{temp_c}°C",
"description": weather_desc,
"humidity": f"{humidity}%",
"wind_speed": f"{wind_speed} km/h",
"status": "success"
}
except Exception as e:
logger.error(f"Weather API error: {str(e)}")
return {
"error": str(e),
"status": "error"
}
def extract_city(self, query: str) -> str:
"""Sorgudan şehir adını çıkarır"""
# Türk şehirleri
turkish_cities = [
"istanbul", "ankara", "izmir", "bursa", "antalya", "adana",
"konya", "gaziantep", "mersin", "diyarbakır", "kayseri", "eskişehir"
]
query_lower = query.lower()
# Şehir adı geçiyorsa onu al
for city in turkish_cities:
if city in query_lower:
return city.title()
# Şehir belirtilmemişse İstanbul default
return "Istanbul"
async def process_weather_query(self, query: str) -> str:
"""Hava durumu sorgusunu işler ve yanıt oluşturur"""
try:
# Şehri tespit et
city = self.extract_city(query)
# Hava durumu bilgisini al
weather_data = await self.get_weather(city)
if weather_data.get("status") == "success":
return f"""🌤️ **{city} Hava Durumu:**
🌡️ **Sıcaklık:** {weather_data['temperature']}
☁️ **Durum:** {weather_data['description']}
💧 **Nem:** {weather_data['humidity']}
💨 **Rüzgar:** {weather_data['wind_speed']}
*Güncel bilgi - API üzerinden alındı*"""
else:
return f"❌ Hava durumu bilgisi alınamadı: {weather_data.get('error', 'Bilinmeyen hata')}"
except Exception as e:
logger.error(f"Weather query processing error: {str(e)}")
return f"❌ Hava durumu servisi şu anda kullanılamıyor: {str(e)}"