Dhruv Pawar
Initial commit: add all project files
5e0ae28
"""
Advanced reasoning engine - Main business logic
"""
import time
import hashlib
from typing import Generator, List, Dict, Optional, Any, Tuple
from src.api.groq_client import GroqClientManager
from src.core.prompt_engine import PromptEngine
from src.core.conversation import ConversationManager
from src.services.cache_service import ResponseCache
from src.services.rate_limiter import RateLimiter
from src.services.export_service import ConversationExporter
from src.services.analytics_service import AnalyticsService
from src.models.metrics import ConversationMetrics
from src.models.entry import ConversationEntry
from src.config.settings import AppConfig
from src.config.constants import ReasoningMode, ModelConfig
from src.utils.logger import logger
from src.utils.decorators import handle_groq_errors, with_rate_limit
from src.utils.validators import validate_input
from src.utils.helpers import generate_session_id
class AdvancedReasoner:
"""
🧠 ADVANCED REASONING ENGINE
Main orchestrator for AI reasoning with caching, metrics, and export
"""
def __init__(self):
# Core components
self.client_manager = GroqClientManager()
self.conversation_manager = ConversationManager()
self.prompt_engine = PromptEngine()
# Services
self.cache = ResponseCache(AppConfig.CACHE_SIZE, AppConfig.CACHE_TTL)
self.rate_limiter = RateLimiter(AppConfig.RATE_LIMIT_REQUESTS, AppConfig.RATE_LIMIT_WINDOW)
self.exporter = ConversationExporter()
self.analytics = AnalyticsService()
# Metrics and state
self.metrics = ConversationMetrics()
self.session_id = generate_session_id()
logger.info(f"✅ AdvancedReasoner initialized | Session: {self.session_id[:8]}...")
def _generate_cache_key(self, query: str, model: str, mode: str,
temp: float, tokens: int) -> str:
"""
🔑 GENERATE CACHE KEY
"""
key_string = f"{query}|{model}|{mode}|{temp}|{tokens}"
return hashlib.sha256(key_string.encode()).hexdigest()
@handle_groq_errors(max_retries=AppConfig.MAX_RETRIES, retry_delay=AppConfig.RETRY_DELAY)
def _call_groq_api(self, messages: List[Dict], model: str,
temperature: float, max_tokens: int) -> Generator[str, None, None]:
"""
🔌 CALL GROQ API WITH STREAMING
"""
if AppConfig.ENABLE_RATE_LIMITING:
self.rate_limiter.acquire()
client = self.client_manager.client
stream = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def generate_response(
self,
query: str,
history: List[Dict],
model: str,
reasoning_mode: ReasoningMode,
enable_critique: bool = True,
temperature: float = 0.7,
max_tokens: int = 4000,
template: str = "Custom",
use_cache: bool = True
) -> Generator[str, None, None]:
"""
🧠 GENERATE RESPONSE WITH STREAMING
"""
# Validate input
is_valid, error_msg = validate_input(query, AppConfig.MAX_INPUT_LENGTH)
if not is_valid:
yield f"❌ **Input Error:** {error_msg}"
return
start_time = time.time()
# Check cache
cache_key = self._generate_cache_key(query, model, reasoning_mode.value, temperature, max_tokens)
if use_cache and AppConfig.ENABLE_CACHE:
cached = self.cache.get(cache_key)
if cached:
self.metrics.update_cache_stats(hit=True)
logger.info("✅ Cache hit - returning cached response")
yield cached
return
self.metrics.update_cache_stats(hit=False)
# Build messages
messages = self.prompt_engine.build_messages(query, reasoning_mode, template, history)
# Stream response
full_response = ""
try:
for chunk in self._call_groq_api(messages, model, temperature, max_tokens):
full_response += chunk
yield full_response
# Self-critique if enabled
if enable_critique and AppConfig.ENABLE_SELF_CRITIQUE:
critique_prompt = self.prompt_engine.get_self_critique_prompt(full_response)
critique_messages = [
{"role": "system", "content": "You are a critical reviewer."},
{"role": "user", "content": critique_prompt}
]
critique_response = ""
for chunk in self._call_groq_api(critique_messages, model, temperature, max_tokens // 2):
critique_response += chunk
full_response += f"\n\n---\n\n### 🔍 Self-Critique\n{critique_response}"
yield full_response
# Cache response
if use_cache and AppConfig.ENABLE_CACHE:
self.cache.set(cache_key, full_response)
# Update metrics
elapsed_time = time.time() - start_time
tokens_estimate = len(full_response.split())
self.metrics.update(
tokens=tokens_estimate,
time_taken=elapsed_time,
depth=1,
corrections=1 if enable_critique else 0,
confidence=95.0
)
# Save conversation
entry = ConversationEntry(
user_message=query,
assistant_response=full_response,
model=model,
reasoning_mode=reasoning_mode.value,
temperature=temperature,
max_tokens=max_tokens,
tokens_used=tokens_estimate,
inference_time=elapsed_time,
critique_enabled=enable_critique,
cache_hit=False
)
self.conversation_manager.add_conversation(entry)
logger.info(f"✅ Response generated in {elapsed_time:.2f}s | Tokens: {tokens_estimate}")
except Exception as e:
self.metrics.increment_errors()
error_msg = f"❌ **Error:** {str(e)}"
logger.error(f"Response generation error: {e}", exc_info=True)
yield error_msg
# Convenience properties
@property
def conversation_history(self) -> List[ConversationEntry]:
"""Get conversation history"""
return self.conversation_manager.get_history()
@property
def model_usage(self) -> Dict[str, int]:
"""Get model usage statistics"""
return dict(self.conversation_manager.model_usage)
@property
def mode_usage(self) -> Dict[str, int]:
"""Get mode usage statistics"""
return dict(self.conversation_manager.mode_usage)
def clear_history(self) -> None:
"""Clear conversation history"""
self.conversation_manager.clear_history()
def export_conversation(self, format_type: str, include_metadata: bool = True) -> Tuple[str, Optional[str]]:
"""
Export conversations
Returns (content, filepath_string) for Gradio compatibility
"""
return self.exporter.export(self.conversation_history, format_type, include_metadata)
def export_current_chat_pdf(self) -> Optional[str]:
"""
Export current chat as PDF
Returns string path for Gradio compatibility
"""
return self.exporter.export_to_pdf(self.conversation_history, include_metadata=True)
def search_conversations(self, keyword: str) -> List[tuple]:
"""Search conversations"""
return self.analytics.search_conversations(self.conversation_history, keyword)
def get_analytics(self) -> Dict[str, Any]:
"""Get analytics"""
return self.analytics.generate_analytics(
self.conversation_history,
self.metrics,
self.session_id,
self.model_usage,
self.mode_usage,
self.cache.get_stats()
)