Novaai / modules /fast_module.py
veteroner's picture
CRANE AI sistemi yüklendi
25e927f verified
"""
CRANE AI - Fast Response Modülü
"""
from typing import Dict, Any
from core.base_module import BaseMicroModule
import logging
import asyncio
# Weather tool import
try:
from tools.weather_tool import WeatherTool
WEATHER_TOOL_AVAILABLE = True
except ImportError:
WEATHER_TOOL_AVAILABLE = False
WeatherTool = None
logger = logging.getLogger(__name__)
class FastModule(BaseMicroModule):
"""Hızlı yanıt için özelleşmiş modül"""
def __init__(self, config: Dict[str, Any]):
super().__init__(
model_id="TinyLlama/TinyLlama-1.1B-Chat-v1.0",
config=config
)
# Weather tool initialize
if WEATHER_TOOL_AVAILABLE:
self.weather_tool = WeatherTool()
else:
self.weather_tool = None
# Hızlı yanıt anahtar kelimeleri
self.quick_keywords = {
"hızlı", "kısa", "özet", "basit", "kolay",
"evet", "hayır", "doğru", "yanlış", "kim",
"ne", "nerede", "ne zaman", "kaç", "hangi",
"merhaba", "selam", "teşekkür", "tamam"
}
# Basit soru türleri
self.simple_patterns = {
"greeting": ["merhaba", "selam", "günaydın", "iyi akşam"],
"thanks": ["teşekkür", "sağol", "merci"],
"simple_question": ["kim", "ne", "nerede", "kaç"],
"yes_no": ["mı", "mi", "mu", "mü"],
"time": ["saat", "tarih", "zaman"],
"weather": ["hava", "sıcaklık", "yağmur"]
}
# Hazır yanıtlar
self.quick_responses = {
"greeting": [
"Merhaba! Size nasıl yardımcı olabilirim?",
"Selam! Bugün nasılsınız?",
"Günaydın! Ne yapmak istiyorsunuz?"
],
"thanks": [
"Rica ederim! Başka bir konuda yardımcı olabilir miyim?",
"Bir şey değil! Başka sorunuz var mı?",
"Memnun oldum! Size daha nasıl yardımcı olabilirim?"
],
"unknown": [
"Bu konuda daha detaylı bilgi verebilir miyim?",
"Sorunuzu biraz daha açabilir misiniz?",
"Bu konuda size yardımcı olmak istiyorum."
]
}
def can_handle(self, query: str, context: Dict[str, Any]) -> float:
"""Hızlı yanıt sorguları için uygunluk skoru"""
query_lower = query.lower()
# Kod anahtar kelimelerini kontrol et (negatif skor)
code_keywords = {
"function", "class", "def", "import", "from", "return",
"if", "else", "for", "while", "try", "except", "with",
"python", "javascript", "java", "c++", "html", "css",
"kod", "kodu", "script", "fonksiyon", "sınıf", "algoritma",
"program", "yazılım", "debug", "hata", "fix", "düzelt",
"yaz", "oluştur", "geliştir", "hesap", "makinesi"
}
# Kod soruları için düşük skor
for keyword in code_keywords:
if keyword in query_lower:
return 0.1 # Çok düşük skor
# Hızlı yanıt anahtar kelimelerini kontrol et
quick_score = 0
for keyword in self.quick_keywords:
if keyword in query_lower:
quick_score += 0.15
# Basit soru pattern'larını kontrol et
for pattern_type, patterns in self.simple_patterns.items():
for pattern in patterns:
if pattern in query_lower:
quick_score += 0.2
# Kısa sorular için yüksek skor
word_count = len(query.split())
if word_count <= 5:
quick_score += 0.3
elif word_count <= 10:
quick_score += 0.2
# Selamlama ve teşekkür ifadeleri
if any(word in query_lower for word in ["merhaba", "selam", "teşekkür", "sağol"]):
quick_score += 0.4
# Evet/hayır soruları
if any(ending in query_lower for ending in ["mı?", "mi?", "mu?", "mü?"]):
quick_score += 0.25
# Hava durumu soruları için yüksek skor
if self.weather_tool and self.weather_tool.can_handle(query):
quick_score += 0.6
# Maksimum 1.0 skor
return min(quick_score, 1.0)
async def process(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Hızlı yanıt işlemi"""
try:
# Weather tool kontrolü
if self.weather_tool and self.weather_tool.can_handle(query):
weather_response = await self.weather_tool.process_weather_query(query)
return {
"response": weather_response,
"module": "fast_module",
"confidence": self.can_handle(query, context),
"response_type": "weather_tool",
"cached": False,
"tool_used": "weather_api"
}
# Önce hazır yanıt kontrolü
quick_response = self._check_quick_response(query)
if quick_response:
return {
"response": quick_response,
"module": "fast_module",
"confidence": self.can_handle(query, context),
"response_type": "quick",
"cached": True
}
# Hızlı yanıt için özel prompt
prompt = self._build_fast_prompt(query, context)
# Yanıt üretimi (düşük token limit)
response = await self.generate_response(
prompt,
max_tokens=min(self.config.get("max_tokens", 512), 256), # Maksimum 256 token
temperature=0.8 # Hızlı yanıt için yüksek temperature
)
return {
"response": response,
"module": "fast_module",
"confidence": self.can_handle(query, context),
"response_type": "generated",
"cached": False
}
except Exception as e:
logger.error(f"Fast processing error: {str(e)}")
return {
"error": str(e),
"module": "fast_module"
}
def _check_quick_response(self, query: str) -> str:
"""Hazır yanıtları kontrol eder"""
query_lower = query.lower()
# Selamlama kontrolü
if any(word in query_lower for word in self.simple_patterns["greeting"]):
import random
return random.choice(self.quick_responses["greeting"])
# Teşekkür kontrolü
if any(word in query_lower for word in self.simple_patterns["thanks"]):
import random
return random.choice(self.quick_responses["thanks"])
# Basit evet/hayır soruları
if len(query.split()) <= 3:
if any(ending in query_lower for ending in ["mı?", "mi?", "mu?", "mü?"]):
return "Bu konuda kesin bir yanıt verebilmek için biraz daha detay gerekiyor."
return None
def _build_fast_prompt(self, query: str, context: Dict[str, Any]) -> str:
"""Hızlı yanıt için prompt hazırlar"""
# Sorgu türünü belirle
query_type = self._detect_query_type(query)
# Temel prompt
prompt = f"""Sen hızlı ve özlü yanıtlar veren bir asistansın. Kısa ve net cevaplar veriyorsun.
Kullanıcı sorusu: {query}
Lütfen:
1. Kısa ve net yanıt ver
2. Gereksiz detaylara girme
3. Doğrudan soruyu yanıtla
4. Maksimum 2-3 cümle kullan
Soru türü: {query_type}
"""
# Bağlam varsa kısaca ekle
if context.get("history"):
last_msg = context["history"][-1]
if len(last_msg) < 100: # Sadece kısa geçmişi ekle
prompt += f"\nÖnceki: {last_msg}\n"
return prompt
def _detect_query_type(self, query: str) -> str:
"""Sorgu türünü tespit eder"""
query_lower = query.lower()
# Selamlama
if any(word in query_lower for word in self.simple_patterns["greeting"]):
return "greeting"
# Teşekkür
if any(word in query_lower for word in self.simple_patterns["thanks"]):
return "thanks"
# Basit soru
if any(word in query_lower for word in self.simple_patterns["simple_question"]):
return "simple_question"
# Evet/hayır
if any(ending in query_lower for ending in ["mı?", "mi?", "mu?", "mü?"]):
return "yes_no"
# Zaman
if any(word in query_lower for word in self.simple_patterns["time"]):
return "time"
# Hava
if any(word in query_lower for word in self.simple_patterns["weather"]):
return "weather"
# Kısa soru
if len(query.split()) <= 5:
return "short_question"
# Varsayılan
return "general"