WebashalarForML's picture
Upload 178 files
330b6e4 verified
"""
Core Chat Agent Service
This module provides the main ChatAgent class that orchestrates message processing,
language context management, chat history, and LLM interactions for the multi-language
chat agent system.
"""
import logging
from typing import Dict, Any, Optional, Generator, List
from datetime import datetime
from .groq_client import GroqClient, ChatMessage, LanguageContext
from .language_context import LanguageContextManager
from .session_manager import SessionManager, SessionNotFoundError, SessionExpiredError
from .chat_history import ChatHistoryManager, ChatHistoryError
from .programming_assistance import ProgrammingAssistanceService, AssistanceType
from ..models.message import Message
from ..models.chat_session import ChatSession
from ..utils.error_handler import ChatAgentError, ErrorCategory, ErrorSeverity, get_error_handler, error_handler_decorator
from ..utils.logging_config import get_logger, get_performance_logger
logger = get_logger('chat_agent')
performance_logger = get_performance_logger('chat_agent')
# Remove legacy ChatAgentError class - using the one from error_handler
class ChatAgent:
"""
Core chat agent service that orchestrates message processing workflow.
Handles language context, history retrieval, LLM calls, and response streaming
for the multi-language programming assistant chat system.
"""
def __init__(self, groq_client: GroqClient, language_context_manager: LanguageContextManager,
session_manager: SessionManager, chat_history_manager: ChatHistoryManager,
programming_assistance_service: ProgrammingAssistanceService = None):
"""
Initialize the chat agent with required service dependencies.
Args:
groq_client: Groq LangChain client for LLM interactions
language_context_manager: Manager for programming language contexts
session_manager: Manager for chat sessions
chat_history_manager: Manager for chat history storage and retrieval
programming_assistance_service: Service for specialized programming assistance
"""
self.groq_client = groq_client
self.language_context_manager = language_context_manager
self.session_manager = session_manager
self.chat_history_manager = chat_history_manager
self.programming_assistance_service = programming_assistance_service or ProgrammingAssistanceService()
# Initialize error handler
self.error_handler = get_error_handler()
logger.info("ChatAgent initialized successfully", extra={
'components': ['groq_client', 'language_context_manager', 'session_manager', 'chat_history_manager', 'programming_assistance_service'],
'error_handling': 'enabled'
})
@error_handler_decorator(get_error_handler(), return_fallback=False)
def process_message(self, session_id: str, message: str,
language: Optional[str] = None) -> Dict[str, Any]:
"""
Process a user message through the complete chat workflow.
This method handles:
1. Session validation and activity updates
2. Language context management
3. Chat history retrieval
4. LLM response generation
5. Message and response storage
Args:
session_id: Unique session identifier
message: User's input message
language: Optional language override for this message
Returns:
Dict containing response and metadata
Raises:
ChatAgentError: For various processing errors
"""
start_time = datetime.utcnow()
# 1. Validate session and update activity
session = self._validate_and_update_session(session_id)
# 2. Handle language context
current_language = self._handle_language_context(session_id, language, session)
# 3. Store user message
user_message = self._store_user_message(session_id, message, current_language)
# 4. Retrieve chat history for context
chat_history = self._get_chat_context(session_id)
# 5. Generate LLM response
response_content, response_metadata = self._generate_response(
message, chat_history, current_language
)
# 6. Store assistant response
assistant_message = self._store_assistant_message(
session_id, response_content, current_language, response_metadata
)
# 7. Update session message count
self.session_manager.increment_message_count(session_id)
# Log performance
processing_time = (datetime.utcnow() - start_time).total_seconds()
performance_logger.log_operation(
operation="process_message",
duration=processing_time,
context={
'session_id': session_id,
'language': current_language,
'message_length': len(message),
'history_size': len(chat_history)
}
)
return {
'response': response_content,
'language': current_language,
'session_id': session_id,
'message_id': assistant_message.id,
'metadata': response_metadata,
'processing_time': processing_time,
'timestamp': datetime.utcnow().isoformat()
}
def switch_language(self, session_id: str, language: str) -> Dict[str, Any]:
"""
Switch programming language context for a session while maintaining chat continuity.
Args:
session_id: Unique session identifier
language: New programming language to switch to
Returns:
Dict containing switch confirmation and context info
Raises:
ChatAgentError: If language switch fails
"""
try:
# 1. Validate session
session = self._validate_and_update_session(session_id)
# 2. Validate and set new language
if not self.language_context_manager.validate_language(language):
raise ChatAgentError(f"Unsupported language: {language}")
# Get previous language for context
previous_language = self.language_context_manager.get_language(session_id)
# 3. Update language context
success = self.language_context_manager.set_language(session_id, language)
if not success:
raise ChatAgentError(f"Failed to set language to {language}")
# 4. Update session language
self.session_manager.set_session_language(session_id, language)
# 5. Store language switch message for continuity
switch_message = f"Language context switched from {previous_language} to {language}. " \
f"I'm now ready to help you with {language} programming!"
self._store_assistant_message(
session_id, switch_message, language,
{'type': 'language_switch', 'previous_language': previous_language}
)
logger.info(f"Language switched from {previous_language} to {language} for session {session_id}")
return {
'success': True,
'previous_language': previous_language,
'new_language': language,
'session_id': session_id,
'message': switch_message,
'timestamp': datetime.utcnow().isoformat()
}
except (SessionNotFoundError, SessionExpiredError) as e:
logger.error(f"Session error switching language: {e}")
raise ChatAgentError(f"Session error: {e}")
except Exception as e:
logger.error(f"Unexpected error switching language: {e}")
raise ChatAgentError(f"Language switch failed: {e}")
def stream_response(self, session_id: str, message: str,
language: Optional[str] = None) -> Generator[Dict[str, Any], None, None]:
"""
Generate streaming response for real-time chat experience.
Args:
session_id: Unique session identifier
message: User's input message
language: Optional language override for this message
Yields:
Dict containing response chunks and metadata
Raises:
ChatAgentError: For various processing errors
"""
try:
# 1. Validate session and update activity
session = self._validate_and_update_session(session_id)
# 2. Handle language context
current_language = self._handle_language_context(session_id, language, session)
# 3. Store user message
user_message = self._store_user_message(session_id, message, current_language)
# 4. Retrieve chat history for context
chat_history = self._get_chat_context(session_id)
# 5. Create language context for streaming
language_context = LanguageContext(
language=current_language,
prompt_template=self.language_context_manager.get_language_prompt_template(current_language),
syntax_highlighting=current_language
)
# 6. Stream response from Groq
response_chunks = []
start_time = datetime.utcnow()
yield {
'type': 'start',
'session_id': session_id,
'language': current_language,
'timestamp': start_time.isoformat()
}
for chunk in self.groq_client.stream_response(message, chat_history, language_context):
response_chunks.append(chunk)
yield {
'type': 'chunk',
'content': chunk,
'session_id': session_id,
'timestamp': datetime.utcnow().isoformat()
}
# 7. Store complete response
complete_response = ''.join(response_chunks)
end_time = datetime.utcnow()
response_metadata = {
'streaming': True,
'chunks_count': len(response_chunks),
'processing_time': (end_time - start_time).total_seconds()
}
assistant_message = self._store_assistant_message(
session_id, complete_response, current_language, response_metadata
)
# 8. Update session message count
self.session_manager.increment_message_count(session_id)
yield {
'type': 'complete',
'session_id': session_id,
'message_id': assistant_message.id,
'total_chunks': len(response_chunks),
'processing_time': response_metadata['processing_time'],
'timestamp': end_time.isoformat()
}
except (SessionNotFoundError, SessionExpiredError) as e:
logger.error(f"Session error in streaming: {e}")
yield {
'type': 'error',
'error': f"Session error: {e}",
'session_id': session_id,
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Unexpected error in streaming: {e}")
yield {
'type': 'error',
'error': f"Processing failed: {e}",
'session_id': session_id,
'timestamp': datetime.utcnow().isoformat()
}
def get_chat_history(self, session_id: str, limit: int = 10) -> List[Dict[str, Any]]:
"""
Retrieve recent conversation history for a session.
Args:
session_id: Unique session identifier
limit: Maximum number of messages to retrieve
Returns:
List of message dictionaries with formatted data
Raises:
ChatAgentError: If history retrieval fails
"""
try:
# Validate session
self._validate_and_update_session(session_id)
# Get recent messages
messages = self.chat_history_manager.get_recent_history(session_id, limit)
# Format messages for response
formatted_messages = []
for message in messages:
formatted_messages.append({
'id': message.id,
'role': message.role,
'content': message.content,
'language': message.language,
'timestamp': message.timestamp.isoformat(),
'metadata': message.message_metadata
})
return formatted_messages
except (SessionNotFoundError, SessionExpiredError) as e:
logger.error(f"Session error getting history: {e}")
raise ChatAgentError(f"Session error: {e}")
except ChatHistoryError as e:
logger.error(f"Chat history error: {e}")
raise ChatAgentError(f"History error: {e}")
except Exception as e:
logger.error(f"Unexpected error getting history: {e}")
raise ChatAgentError(f"Failed to get history: {e}")
def get_session_info(self, session_id: str) -> Dict[str, Any]:
"""
Get comprehensive session information including context and statistics.
Args:
session_id: Unique session identifier
Returns:
Dict containing session info, language context, and statistics
Raises:
ChatAgentError: If session info retrieval fails
"""
try:
# Get session
session = self._validate_and_update_session(session_id)
# Get language context
language_context = self.language_context_manager.get_session_context_info(session_id)
# Get message count
message_count = self.chat_history_manager.get_message_count(session_id)
# Get cache stats
cache_stats = self.chat_history_manager.get_cache_stats(session_id)
return {
'session': {
'id': session.id,
'user_id': session.user_id,
'language': session.language,
'created_at': session.created_at.isoformat(),
'last_active': session.last_active.isoformat(),
'message_count': session.message_count,
'is_active': session.is_active,
'metadata': session.session_metadata
},
'language_context': language_context,
'statistics': {
'total_messages': message_count,
'session_message_count': session.message_count,
'cache_stats': cache_stats
},
'supported_languages': list(self.language_context_manager.get_supported_languages())
}
except (SessionNotFoundError, SessionExpiredError) as e:
logger.error(f"Session error getting info: {e}")
raise ChatAgentError(f"Session error: {e}")
except Exception as e:
logger.error(f"Unexpected error getting session info: {e}")
raise ChatAgentError(f"Failed to get session info: {e}")
def process_programming_assistance(self, session_id: str, message: str,
code: str = None, error_message: str = None,
assistance_type: AssistanceType = None) -> Dict[str, Any]:
"""
Process a programming assistance request with specialized handling.
Args:
session_id: Unique session identifier
message: User's message/question
code: Optional code to analyze
error_message: Optional error message to debug
assistance_type: Optional specific type of assistance needed
Returns:
Dict containing specialized assistance response
Raises:
ChatAgentError: For various processing errors
"""
try:
# 1. Validate session and update activity
session = self._validate_and_update_session(session_id)
current_language = self.language_context_manager.get_language(session_id)
# 2. Detect assistance type if not provided
if not assistance_type:
assistance_type = self.programming_assistance_service.detect_assistance_type(message, code)
# 3. Get specialized prompt template
context = {
'beginner_mode': 'beginner' in message.lower() or 'new to' in message.lower(),
'code_provided': bool(code),
'error_provided': bool(error_message)
}
specialized_prompt = self.programming_assistance_service.get_assistance_prompt_template(
assistance_type, current_language, context
)
# 4. Perform analysis based on assistance type
analysis_result = None
if assistance_type in [AssistanceType.CODE_EXPLANATION, AssistanceType.CODE_REVIEW] and code:
analysis_result = self.programming_assistance_service.analyze_code(code, current_language)
elif assistance_type in [AssistanceType.ERROR_ANALYSIS, AssistanceType.DEBUGGING] and error_message:
analysis_result = self.programming_assistance_service.analyze_error(
error_message, code, current_language
)
elif assistance_type == AssistanceType.BEGINNER_HELP:
# Extract topic from message for beginner explanations
topic = self._extract_topic_from_message(message)
analysis_result = self.programming_assistance_service.generate_beginner_explanation(
topic, current_language, code
)
# 5. Build enhanced message with analysis
enhanced_message = self._build_enhanced_message(
message, code, error_message, analysis_result, assistance_type
)
# 6. Store user message with assistance metadata
user_message = self._store_user_message(
session_id, message, current_language, {
'assistance_type': assistance_type.value,
'code_provided': bool(code),
'error_provided': bool(error_message)
}
)
# 7. Get chat history for context
chat_history = self._get_chat_context(session_id)
# 8. Create specialized language context
language_context = LanguageContext(
language=current_language,
prompt_template=specialized_prompt,
syntax_highlighting=current_language
)
# 9. Generate response with specialized context
response_content, response_metadata = self._generate_response(
enhanced_message, chat_history, current_language, language_context
)
# 10. Format response if analysis was performed
if analysis_result and assistance_type != AssistanceType.BEGINNER_HELP:
formatted_response = self.programming_assistance_service.format_assistance_response(
assistance_type, analysis_result, current_language
)
response_content = f"{formatted_response}\n\n---\n\n{response_content}"
# 11. Store assistant response
assistant_message = self._store_assistant_message(
session_id, response_content, current_language, {
**response_metadata,
'assistance_type': assistance_type.value,
'analysis_performed': bool(analysis_result)
}
)
# 12. Update session message count
self.session_manager.increment_message_count(session_id)
return {
'response': response_content,
'assistance_type': assistance_type.value,
'language': current_language,
'session_id': session_id,
'message_id': assistant_message.id,
'analysis_result': analysis_result,
'metadata': response_metadata,
'timestamp': datetime.utcnow().isoformat()
}
except (SessionNotFoundError, SessionExpiredError) as e:
logger.error(f"Session error in programming assistance: {e}")
raise ChatAgentError(f"Session error: {e}")
except Exception as e:
logger.error(f"Unexpected error in programming assistance: {e}")
raise ChatAgentError(f"Programming assistance failed: {e}")
def explain_code(self, session_id: str, code: str, question: str = None) -> Dict[str, Any]:
"""
Provide detailed code explanation.
Args:
session_id: Unique session identifier
code: Code to explain
question: Optional specific question about the code
Returns:
Dict containing code explanation response
"""
message = question or "Please explain this code:"
return self.process_programming_assistance(
session_id, message, code=code, assistance_type=AssistanceType.CODE_EXPLANATION
)
def debug_code(self, session_id: str, code: str, error_message: str,
description: str = None) -> Dict[str, Any]:
"""
Provide debugging assistance for code with errors.
Args:
session_id: Unique session identifier
code: Code that has errors
error_message: Error message received
description: Optional description of the problem
Returns:
Dict containing debugging assistance response
"""
message = description or "I'm getting an error with this code. Can you help me debug it?"
return self.process_programming_assistance(
session_id, message, code=code, error_message=error_message,
assistance_type=AssistanceType.DEBUGGING
)
def analyze_error(self, session_id: str, error_message: str,
context: str = None) -> Dict[str, Any]:
"""
Analyze and explain an error message.
Args:
session_id: Unique session identifier
error_message: Error message to analyze
context: Optional context about when the error occurred
Returns:
Dict containing error analysis response
"""
message = context or "I got this error and don't understand what it means:"
return self.process_programming_assistance(
session_id, message, error_message=error_message,
assistance_type=AssistanceType.ERROR_ANALYSIS
)
def review_code(self, session_id: str, code: str, focus_areas: List[str] = None) -> Dict[str, Any]:
"""
Provide code review and improvement suggestions.
Args:
session_id: Unique session identifier
code: Code to review
focus_areas: Optional list of specific areas to focus on
Returns:
Dict containing code review response
"""
focus_text = f" Please focus on: {', '.join(focus_areas)}" if focus_areas else ""
message = f"Please review this code and provide feedback.{focus_text}"
return self.process_programming_assistance(
session_id, message, code=code, assistance_type=AssistanceType.CODE_REVIEW
)
def get_beginner_help(self, session_id: str, topic: str,
specific_question: str = None) -> Dict[str, Any]:
"""
Provide beginner-friendly help on programming topics.
Args:
session_id: Unique session identifier
topic: Programming topic or concept
specific_question: Optional specific question about the topic
Returns:
Dict containing beginner-friendly explanation
"""
message = specific_question or f"I'm new to programming. Can you explain {topic} in simple terms?"
return self.process_programming_assistance(
session_id, message, assistance_type=AssistanceType.BEGINNER_HELP
)
# Private helper methods
def _validate_and_update_session(self, session_id: str) -> ChatSession:
"""Validate session exists and update activity."""
session = self.session_manager.get_session(session_id)
self.session_manager.update_session_activity(session_id)
return session
def _handle_language_context(self, session_id: str, language: Optional[str],
session: ChatSession) -> str:
"""Handle language context for the session."""
if language:
# Validate and set new language if provided
if not self.language_context_manager.validate_language(language):
logger.warning(f"Invalid language {language}, using session default")
return self.language_context_manager.get_language(session_id)
# Set language context
self.language_context_manager.set_language(session_id, language)
# Update session language if different
if session.language != language:
self.session_manager.set_session_language(session_id, language)
return language
else:
# Use existing session language
return self.language_context_manager.get_language(session_id)
def _store_assistant_message(self, session_id: str, content: str, language: str,
metadata: Optional[Dict[str, Any]] = None) -> Message:
"""Store assistant message in chat history."""
return self.chat_history_manager.store_message(
session_id=session_id,
role='assistant',
content=content,
language=language,
message_metadata=metadata
)
def _get_chat_context(self, session_id: str) -> List[ChatMessage]:
"""Get recent chat history formatted for LLM context."""
messages = self.chat_history_manager.get_recent_history(session_id)
chat_messages = []
for message in messages:
chat_messages.append(ChatMessage(
role=message.role,
content=message.content,
language=message.language,
timestamp=message.timestamp.isoformat()
))
return chat_messages
def _generate_response(self, message: str, chat_history: List[ChatMessage],
language: str, language_context: LanguageContext = None) -> tuple[str, Dict[str, Any]]:
"""Generate response using Groq LLM with context."""
start_time = datetime.utcnow()
# Create language context if not provided
if not language_context:
language_context = LanguageContext(
language=language,
prompt_template=self.language_context_manager.get_language_prompt_template(language),
syntax_highlighting=language
)
# Generate response
response = self.groq_client.generate_response(
prompt=message,
chat_history=chat_history,
language_context=language_context
)
end_time = datetime.utcnow()
# Create response metadata
metadata = {
'processing_time': (end_time - start_time).total_seconds(),
'language': language,
'context_messages': len(chat_history),
'model_info': self.groq_client.get_model_info()
}
return response, metadata
def _store_user_message(self, session_id: str, content: str, language: str,
metadata: Optional[Dict[str, Any]] = None) -> Message:
"""Store user message in chat history with optional metadata."""
return self.chat_history_manager.store_message(
session_id=session_id,
role='user',
content=content,
language=language,
message_metadata=metadata
)
def _extract_topic_from_message(self, message: str) -> str:
"""Extract programming topic from user message."""
# Simple keyword extraction - could be enhanced with NLP
common_topics = [
'variables', 'functions', 'loops', 'conditionals', 'classes', 'objects',
'arrays', 'lists', 'dictionaries', 'strings', 'integers', 'floats',
'inheritance', 'polymorphism', 'encapsulation', 'recursion', 'algorithms',
'data structures', 'debugging', 'testing', 'modules', 'packages'
]
message_lower = message.lower()
for topic in common_topics:
if topic in message_lower:
return topic
# If no specific topic found, extract potential topic from question words
words = message_lower.split()
for i, word in enumerate(words):
if word in ['what', 'how', 'explain', 'understand'] and i + 1 < len(words):
# Return the next few words as potential topic
return ' '.join(words[i+1:i+3])
return 'programming concepts'
def _build_enhanced_message(self, message: str, code: str = None,
error_message: str = None, analysis_result: Any = None,
assistance_type: AssistanceType = None) -> str:
"""Build enhanced message with code and analysis context."""
enhanced_parts = [message]
if code:
enhanced_parts.append(f"\n\nCode to analyze:\n```\n{code}\n```")
if error_message:
enhanced_parts.append(f"\n\nError message:\n```\n{error_message}\n```")
if analysis_result and assistance_type == AssistanceType.BEGINNER_HELP:
# For beginner help, the analysis_result is already the formatted explanation
return analysis_result
return '\n'.join(enhanced_parts)
def create_chat_agent(groq_client: GroqClient, language_context_manager: LanguageContextManager,
session_manager: SessionManager, chat_history_manager: ChatHistoryManager,
programming_assistance_service: ProgrammingAssistanceService = None) -> ChatAgent:
"""
Factory function to create a ChatAgent instance.
Args:
groq_client: Groq LangChain client for LLM interactions
language_context_manager: Manager for programming language contexts
session_manager: Manager for chat sessions
chat_history_manager: Manager for chat history storage and retrieval
programming_assistance_service: Service for specialized programming assistance
Returns:
ChatAgent: Configured chat agent instance
"""
return ChatAgent(groq_client, language_context_manager, session_manager, chat_history_manager, programming_assistance_service)