""" CRANE AI - Weather Tool Gerçek zamanlı hava durumu verisi için tool """ import asyncio import json from typing import Dict, Any, Optional import logging try: import aiohttp AIOHTTP_AVAILABLE = True except ImportError: import requests AIOHTTP_AVAILABLE = False logger = logging.getLogger(__name__) class WeatherTool: """Hava durumu sorguları için external tool""" def __init__(self): # Ücretsiz OpenWeatherMap API kullanabiliriz self.base_url = "http://api.openweathermap.org/data/2.5/weather" self.api_key = None # API key gerekirse eklenebilir # Alternatif olarak wttr.in kullanabiliriz (ücretsiz) self.wttr_url = "http://wttr.in/{city}?format=j1" def can_handle(self, query: str) -> bool: """Bu tool'un handle edebileceği sorguları tespit eder""" weather_keywords = [ "hava", "weather", "sıcaklık", "temperature", "yağmur", "rain", "kar", "snow", "rüzgar", "wind", "nem", "humidity", "güneş", "sun", "bulut", "cloud", "fırtına", "storm", "bugün", "today", "yarın", "tomorrow" ] query_lower = query.lower() return any(keyword in query_lower for keyword in weather_keywords) async def get_weather(self, city: str = "Istanbul") -> Dict[str, Any]: """Belirtilen şehir için hava durumu bilgisi alır""" try: # wttr.in API kullan (ücretsiz ve API key gerektirmez) url = f"http://wttr.in/{city}?format=j1" if AIOHTTP_AVAILABLE: # Async aiohttp kullan async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: if response.status == 200: data = await response.json() else: return { "error": f"API response error: {response.status}", "status": "error" } else: # Fallback to sync requests import requests response = requests.get(url, timeout=10) if response.status_code == 200: data = response.json() else: return { "error": f"API response error: {response.status_code}", "status": "error" } # Veriyi parse et current = data.get("current_condition", [{}])[0] weather_desc = current.get("weatherDesc", [{}])[0].get("value", "Bilinmiyor") temp_c = current.get("temp_C", "N/A") humidity = current.get("humidity", "N/A") wind_speed = current.get("windspeedKmph", "N/A") return { "city": city, "temperature": f"{temp_c}°C", "description": weather_desc, "humidity": f"{humidity}%", "wind_speed": f"{wind_speed} km/h", "status": "success" } except Exception as e: logger.error(f"Weather API error: {str(e)}") return { "error": str(e), "status": "error" } def extract_city(self, query: str) -> str: """Sorgudan şehir adını çıkarır""" # Türk şehirleri turkish_cities = [ "istanbul", "ankara", "izmir", "bursa", "antalya", "adana", "konya", "gaziantep", "mersin", "diyarbakır", "kayseri", "eskişehir" ] query_lower = query.lower() # Şehir adı geçiyorsa onu al for city in turkish_cities: if city in query_lower: return city.title() # Şehir belirtilmemişse İstanbul default return "Istanbul" async def process_weather_query(self, query: str) -> str: """Hava durumu sorgusunu işler ve yanıt oluşturur""" try: # Şehri tespit et city = self.extract_city(query) # Hava durumu bilgisini al weather_data = await self.get_weather(city) if weather_data.get("status") == "success": return f"""🌤️ **{city} Hava Durumu:** 🌡️ **Sıcaklık:** {weather_data['temperature']} ☁️ **Durum:** {weather_data['description']} 💧 **Nem:** {weather_data['humidity']} 💨 **Rüzgar:** {weather_data['wind_speed']} *Güncel bilgi - API üzerinden alındı*""" else: return f"❌ Hava durumu bilgisi alınamadı: {weather_data.get('error', 'Bilinmeyen hata')}" except Exception as e: logger.error(f"Weather query processing error: {str(e)}") return f"❌ Hava durumu servisi şu anda kullanılamıyor: {str(e)}"