Spaces:
Sleeping
Sleeping
| """ | |
| Conversation Manager Service | |
| Phase 1: Conversation Memory & Context Management | |
| This service handles: | |
| - Conversation history storage and retrieval | |
| - Context window management (last N messages) | |
| - Session management | |
| - Message persistence | |
| """ | |
| from typing import List, Dict, Optional, Tuple | |
| from datetime import datetime, timedelta | |
| import uuid | |
| from web_app import db | |
| from web_app.models import ChatMessage, ConversationSession, User, UserLearningPath | |
| class ConversationManager: | |
| """ | |
| Manages conversation state, history, and context for the chatbot. | |
| Key Features: | |
| - Store and retrieve conversation history | |
| - Manage conversation sessions | |
| - Build context windows for AI | |
| - Track conversation metrics | |
| """ | |
| def __init__(self, context_window_size: int = 10): | |
| """ | |
| Initialize the conversation manager. | |
| Args: | |
| context_window_size: Number of recent messages to include in context (default: 10) | |
| """ | |
| self.context_window_size = context_window_size | |
| def get_or_create_session( | |
| self, | |
| user_id: int, | |
| learning_path_id: Optional[str] = None | |
| ) -> ConversationSession: | |
| """ | |
| Get active session or create a new one. | |
| Sessions expire after 30 minutes of inactivity. | |
| Args: | |
| user_id: User ID | |
| learning_path_id: Optional learning path ID | |
| Returns: | |
| ConversationSession object | |
| """ | |
| # Check for active session in last 30 minutes | |
| cutoff_time = datetime.utcnow() - timedelta(minutes=30) | |
| active_session = ConversationSession.query.filter( | |
| ConversationSession.user_id == user_id, | |
| ConversationSession.is_active == True, | |
| ConversationSession.last_activity_at >= cutoff_time | |
| ).order_by(ConversationSession.last_activity_at.desc()).first() | |
| if active_session: | |
| # Update last activity | |
| active_session.last_activity_at = datetime.utcnow() | |
| db.session.commit() | |
| return active_session | |
| # Create new session | |
| new_session = ConversationSession( | |
| user_id=user_id, | |
| learning_path_id=learning_path_id, | |
| is_active=True | |
| ) | |
| db.session.add(new_session) | |
| db.session.commit() | |
| return new_session | |
| def add_message( | |
| self, | |
| user_id: int, | |
| message: str, | |
| role: str, | |
| learning_path_id: Optional[str] = None, | |
| intent: Optional[str] = None, | |
| entities: Optional[Dict] = None, | |
| tokens_used: int = 0, | |
| response_time_ms: Optional[int] = None | |
| ) -> ChatMessage: | |
| """ | |
| Add a message to conversation history. | |
| Args: | |
| user_id: User ID | |
| message: Message content | |
| role: 'user' or 'assistant' | |
| learning_path_id: Optional learning path ID | |
| intent: Classified intent (from Phase 2) | |
| entities: Extracted entities (from Phase 2) | |
| tokens_used: Number of tokens used for this message | |
| response_time_ms: Response time in milliseconds | |
| Returns: | |
| ChatMessage object | |
| """ | |
| # Get or create session | |
| session = self.get_or_create_session(user_id, learning_path_id) | |
| # Create message | |
| chat_message = ChatMessage( | |
| user_id=user_id, | |
| learning_path_id=learning_path_id, | |
| message=message, | |
| role=role, | |
| intent=intent, | |
| entities=entities, | |
| tokens_used=tokens_used, | |
| response_time_ms=response_time_ms, | |
| session_id=session.id | |
| ) | |
| db.session.add(chat_message) | |
| # Update session stats | |
| session.message_count += 1 | |
| session.total_tokens_used += tokens_used | |
| session.last_activity_at = datetime.utcnow() | |
| db.session.commit() | |
| return chat_message | |
| def get_conversation_history( | |
| self, | |
| user_id: int, | |
| learning_path_id: Optional[str] = None, | |
| limit: Optional[int] = None, | |
| session_id: Optional[str] = None | |
| ) -> List[ChatMessage]: | |
| """ | |
| Get conversation history for a user. | |
| Args: | |
| user_id: User ID | |
| learning_path_id: Optional filter by learning path | |
| limit: Maximum number of messages to return | |
| session_id: Optional filter by session | |
| Returns: | |
| List of ChatMessage objects (ordered by timestamp) | |
| """ | |
| query = ChatMessage.query.filter(ChatMessage.user_id == user_id) | |
| if learning_path_id: | |
| query = query.filter(ChatMessage.learning_path_id == learning_path_id) | |
| if session_id: | |
| query = query.filter(ChatMessage.session_id == session_id) | |
| query = query.order_by(ChatMessage.timestamp.asc()) | |
| if limit: | |
| # Get the most recent N messages | |
| total_count = query.count() | |
| if total_count > limit: | |
| query = query.offset(total_count - limit) | |
| return query.all() | |
| def get_context_window( | |
| self, | |
| user_id: int, | |
| learning_path_id: Optional[str] = None, | |
| window_size: Optional[int] = None | |
| ) -> List[Dict[str, str]]: | |
| """ | |
| Get recent conversation context for AI. | |
| Returns messages in OpenAI chat format: | |
| [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] | |
| Args: | |
| user_id: User ID | |
| learning_path_id: Optional learning path ID | |
| window_size: Number of recent messages (default: self.context_window_size) | |
| Returns: | |
| List of message dictionaries in OpenAI format | |
| """ | |
| window_size = window_size or self.context_window_size | |
| # Get recent messages | |
| messages = self.get_conversation_history( | |
| user_id=user_id, | |
| learning_path_id=learning_path_id, | |
| limit=window_size | |
| ) | |
| # Convert to OpenAI format | |
| context = [] | |
| for msg in messages: | |
| context.append({ | |
| "role": msg.role, | |
| "content": msg.message | |
| }) | |
| return context | |
| def get_session_summary(self, session_id: str) -> Optional[str]: | |
| """ | |
| Get or generate session summary. | |
| Args: | |
| session_id: Session ID | |
| Returns: | |
| Session summary text or None | |
| """ | |
| session = ConversationSession.query.get(session_id) | |
| if not session: | |
| return None | |
| return session.summary | |
| def end_session(self, session_id: str, summary: Optional[str] = None): | |
| """ | |
| End a conversation session. | |
| Args: | |
| session_id: Session ID | |
| summary: Optional session summary | |
| """ | |
| session = ConversationSession.query.get(session_id) | |
| if session: | |
| session.is_active = False | |
| session.ended_at = datetime.utcnow() | |
| if summary: | |
| session.summary = summary | |
| db.session.commit() | |
| def get_conversation_stats(self, user_id: int) -> Dict: | |
| """ | |
| Get conversation statistics for a user. | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| Dictionary with conversation stats | |
| """ | |
| total_messages = ChatMessage.query.filter( | |
| ChatMessage.user_id == user_id | |
| ).count() | |
| total_sessions = ConversationSession.query.filter( | |
| ConversationSession.user_id == user_id | |
| ).count() | |
| total_tokens = db.session.query( | |
| db.func.sum(ChatMessage.tokens_used) | |
| ).filter( | |
| ChatMessage.user_id == user_id | |
| ).scalar() or 0 | |
| # Get intent distribution | |
| intent_counts = db.session.query( | |
| ChatMessage.intent, | |
| db.func.count(ChatMessage.id) | |
| ).filter( | |
| ChatMessage.user_id == user_id, | |
| ChatMessage.intent.isnot(None) | |
| ).group_by(ChatMessage.intent).all() | |
| intent_distribution = {intent: count for intent, count in intent_counts} | |
| return { | |
| 'total_messages': total_messages, | |
| 'total_sessions': total_sessions, | |
| 'total_tokens_used': total_tokens, | |
| 'intent_distribution': intent_distribution | |
| } | |
| def clear_old_sessions(self, days: int = 30): | |
| """ | |
| Archive old inactive sessions. | |
| Args: | |
| days: Number of days after which to archive sessions | |
| """ | |
| cutoff_date = datetime.utcnow() - timedelta(days=days) | |
| old_sessions = ConversationSession.query.filter( | |
| ConversationSession.last_activity_at < cutoff_date, | |
| ConversationSession.is_active == True | |
| ).all() | |
| for session in old_sessions: | |
| session.is_active = False | |
| session.ended_at = datetime.utcnow() | |
| db.session.commit() | |
| return len(old_sessions) | |