File size: 8,552 Bytes
5e0ae28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
"""
Advanced reasoning engine - Main business logic
"""
import time
import hashlib
from typing import Generator, List, Dict, Optional, Any, Tuple
from src.api.groq_client import GroqClientManager
from src.core.prompt_engine import PromptEngine
from src.core.conversation import ConversationManager
from src.services.cache_service import ResponseCache
from src.services.rate_limiter import RateLimiter
from src.services.export_service import ConversationExporter
from src.services.analytics_service import AnalyticsService
from src.models.metrics import ConversationMetrics
from src.models.entry import ConversationEntry
from src.config.settings import AppConfig
from src.config.constants import ReasoningMode, ModelConfig
from src.utils.logger import logger
from src.utils.decorators import handle_groq_errors, with_rate_limit
from src.utils.validators import validate_input
from src.utils.helpers import generate_session_id
class AdvancedReasoner:
"""
π§ ADVANCED REASONING ENGINE
Main orchestrator for AI reasoning with caching, metrics, and export
"""
def __init__(self):
# Core components
self.client_manager = GroqClientManager()
self.conversation_manager = ConversationManager()
self.prompt_engine = PromptEngine()
# Services
self.cache = ResponseCache(AppConfig.CACHE_SIZE, AppConfig.CACHE_TTL)
self.rate_limiter = RateLimiter(AppConfig.RATE_LIMIT_REQUESTS, AppConfig.RATE_LIMIT_WINDOW)
self.exporter = ConversationExporter()
self.analytics = AnalyticsService()
# Metrics and state
self.metrics = ConversationMetrics()
self.session_id = generate_session_id()
logger.info(f"β
AdvancedReasoner initialized | Session: {self.session_id[:8]}...")
def _generate_cache_key(self, query: str, model: str, mode: str,
temp: float, tokens: int) -> str:
"""
π GENERATE CACHE KEY
"""
key_string = f"{query}|{model}|{mode}|{temp}|{tokens}"
return hashlib.sha256(key_string.encode()).hexdigest()
@handle_groq_errors(max_retries=AppConfig.MAX_RETRIES, retry_delay=AppConfig.RETRY_DELAY)
def _call_groq_api(self, messages: List[Dict], model: str,
temperature: float, max_tokens: int) -> Generator[str, None, None]:
"""
π CALL GROQ API WITH STREAMING
"""
if AppConfig.ENABLE_RATE_LIMITING:
self.rate_limiter.acquire()
client = self.client_manager.client
stream = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def generate_response(
self,
query: str,
history: List[Dict],
model: str,
reasoning_mode: ReasoningMode,
enable_critique: bool = True,
temperature: float = 0.7,
max_tokens: int = 4000,
template: str = "Custom",
use_cache: bool = True
) -> Generator[str, None, None]:
"""
π§ GENERATE RESPONSE WITH STREAMING
"""
# Validate input
is_valid, error_msg = validate_input(query, AppConfig.MAX_INPUT_LENGTH)
if not is_valid:
yield f"β **Input Error:** {error_msg}"
return
start_time = time.time()
# Check cache
cache_key = self._generate_cache_key(query, model, reasoning_mode.value, temperature, max_tokens)
if use_cache and AppConfig.ENABLE_CACHE:
cached = self.cache.get(cache_key)
if cached:
self.metrics.update_cache_stats(hit=True)
logger.info("β
Cache hit - returning cached response")
yield cached
return
self.metrics.update_cache_stats(hit=False)
# Build messages
messages = self.prompt_engine.build_messages(query, reasoning_mode, template, history)
# Stream response
full_response = ""
try:
for chunk in self._call_groq_api(messages, model, temperature, max_tokens):
full_response += chunk
yield full_response
# Self-critique if enabled
if enable_critique and AppConfig.ENABLE_SELF_CRITIQUE:
critique_prompt = self.prompt_engine.get_self_critique_prompt(full_response)
critique_messages = [
{"role": "system", "content": "You are a critical reviewer."},
{"role": "user", "content": critique_prompt}
]
critique_response = ""
for chunk in self._call_groq_api(critique_messages, model, temperature, max_tokens // 2):
critique_response += chunk
full_response += f"\n\n---\n\n### π Self-Critique\n{critique_response}"
yield full_response
# Cache response
if use_cache and AppConfig.ENABLE_CACHE:
self.cache.set(cache_key, full_response)
# Update metrics
elapsed_time = time.time() - start_time
tokens_estimate = len(full_response.split())
self.metrics.update(
tokens=tokens_estimate,
time_taken=elapsed_time,
depth=1,
corrections=1 if enable_critique else 0,
confidence=95.0
)
# Save conversation
entry = ConversationEntry(
user_message=query,
assistant_response=full_response,
model=model,
reasoning_mode=reasoning_mode.value,
temperature=temperature,
max_tokens=max_tokens,
tokens_used=tokens_estimate,
inference_time=elapsed_time,
critique_enabled=enable_critique,
cache_hit=False
)
self.conversation_manager.add_conversation(entry)
logger.info(f"β
Response generated in {elapsed_time:.2f}s | Tokens: {tokens_estimate}")
except Exception as e:
self.metrics.increment_errors()
error_msg = f"β **Error:** {str(e)}"
logger.error(f"Response generation error: {e}", exc_info=True)
yield error_msg
# Convenience properties
@property
def conversation_history(self) -> List[ConversationEntry]:
"""Get conversation history"""
return self.conversation_manager.get_history()
@property
def model_usage(self) -> Dict[str, int]:
"""Get model usage statistics"""
return dict(self.conversation_manager.model_usage)
@property
def mode_usage(self) -> Dict[str, int]:
"""Get mode usage statistics"""
return dict(self.conversation_manager.mode_usage)
def clear_history(self) -> None:
"""Clear conversation history"""
self.conversation_manager.clear_history()
def export_conversation(self, format_type: str, include_metadata: bool = True) -> Tuple[str, Optional[str]]:
"""
Export conversations
Returns (content, filepath_string) for Gradio compatibility
"""
return self.exporter.export(self.conversation_history, format_type, include_metadata)
def export_current_chat_pdf(self) -> Optional[str]:
"""
Export current chat as PDF
Returns string path for Gradio compatibility
"""
return self.exporter.export_to_pdf(self.conversation_history, include_metadata=True)
def search_conversations(self, keyword: str) -> List[tuple]:
"""Search conversations"""
return self.analytics.search_conversations(self.conversation_history, keyword)
def get_analytics(self) -> Dict[str, Any]:
"""Get analytics"""
return self.analytics.generate_analytics(
self.conversation_history,
self.metrics,
self.session_id,
self.model_usage,
self.mode_usage,
self.cache.get_stats()
)
|