ai-learning-path-generator / src /services /conversation_manager.py
“shubhamdhamal”
Deploy Flask app with Docker
7644eac
"""
Conversation Manager Service
Phase 1: Conversation Memory & Context Management
This service handles:
- Conversation history storage and retrieval
- Context window management (last N messages)
- Session management
- Message persistence
"""
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
import uuid
from web_app import db
from web_app.models import ChatMessage, ConversationSession, User, UserLearningPath
class ConversationManager:
"""
Manages conversation state, history, and context for the chatbot.
Key Features:
- Store and retrieve conversation history
- Manage conversation sessions
- Build context windows for AI
- Track conversation metrics
"""
def __init__(self, context_window_size: int = 10):
"""
Initialize the conversation manager.
Args:
context_window_size: Number of recent messages to include in context (default: 10)
"""
self.context_window_size = context_window_size
def get_or_create_session(
self,
user_id: int,
learning_path_id: Optional[str] = None
) -> ConversationSession:
"""
Get active session or create a new one.
Sessions expire after 30 minutes of inactivity.
Args:
user_id: User ID
learning_path_id: Optional learning path ID
Returns:
ConversationSession object
"""
# Check for active session in last 30 minutes
cutoff_time = datetime.utcnow() - timedelta(minutes=30)
active_session = ConversationSession.query.filter(
ConversationSession.user_id == user_id,
ConversationSession.is_active == True,
ConversationSession.last_activity_at >= cutoff_time
).order_by(ConversationSession.last_activity_at.desc()).first()
if active_session:
# Update last activity
active_session.last_activity_at = datetime.utcnow()
db.session.commit()
return active_session
# Create new session
new_session = ConversationSession(
user_id=user_id,
learning_path_id=learning_path_id,
is_active=True
)
db.session.add(new_session)
db.session.commit()
return new_session
def add_message(
self,
user_id: int,
message: str,
role: str,
learning_path_id: Optional[str] = None,
intent: Optional[str] = None,
entities: Optional[Dict] = None,
tokens_used: int = 0,
response_time_ms: Optional[int] = None
) -> ChatMessage:
"""
Add a message to conversation history.
Args:
user_id: User ID
message: Message content
role: 'user' or 'assistant'
learning_path_id: Optional learning path ID
intent: Classified intent (from Phase 2)
entities: Extracted entities (from Phase 2)
tokens_used: Number of tokens used for this message
response_time_ms: Response time in milliseconds
Returns:
ChatMessage object
"""
# Get or create session
session = self.get_or_create_session(user_id, learning_path_id)
# Create message
chat_message = ChatMessage(
user_id=user_id,
learning_path_id=learning_path_id,
message=message,
role=role,
intent=intent,
entities=entities,
tokens_used=tokens_used,
response_time_ms=response_time_ms,
session_id=session.id
)
db.session.add(chat_message)
# Update session stats
session.message_count += 1
session.total_tokens_used += tokens_used
session.last_activity_at = datetime.utcnow()
db.session.commit()
return chat_message
def get_conversation_history(
self,
user_id: int,
learning_path_id: Optional[str] = None,
limit: Optional[int] = None,
session_id: Optional[str] = None
) -> List[ChatMessage]:
"""
Get conversation history for a user.
Args:
user_id: User ID
learning_path_id: Optional filter by learning path
limit: Maximum number of messages to return
session_id: Optional filter by session
Returns:
List of ChatMessage objects (ordered by timestamp)
"""
query = ChatMessage.query.filter(ChatMessage.user_id == user_id)
if learning_path_id:
query = query.filter(ChatMessage.learning_path_id == learning_path_id)
if session_id:
query = query.filter(ChatMessage.session_id == session_id)
query = query.order_by(ChatMessage.timestamp.asc())
if limit:
# Get the most recent N messages
total_count = query.count()
if total_count > limit:
query = query.offset(total_count - limit)
return query.all()
def get_context_window(
self,
user_id: int,
learning_path_id: Optional[str] = None,
window_size: Optional[int] = None
) -> List[Dict[str, str]]:
"""
Get recent conversation context for AI.
Returns messages in OpenAI chat format:
[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
Args:
user_id: User ID
learning_path_id: Optional learning path ID
window_size: Number of recent messages (default: self.context_window_size)
Returns:
List of message dictionaries in OpenAI format
"""
window_size = window_size or self.context_window_size
# Get recent messages
messages = self.get_conversation_history(
user_id=user_id,
learning_path_id=learning_path_id,
limit=window_size
)
# Convert to OpenAI format
context = []
for msg in messages:
context.append({
"role": msg.role,
"content": msg.message
})
return context
def get_session_summary(self, session_id: str) -> Optional[str]:
"""
Get or generate session summary.
Args:
session_id: Session ID
Returns:
Session summary text or None
"""
session = ConversationSession.query.get(session_id)
if not session:
return None
return session.summary
def end_session(self, session_id: str, summary: Optional[str] = None):
"""
End a conversation session.
Args:
session_id: Session ID
summary: Optional session summary
"""
session = ConversationSession.query.get(session_id)
if session:
session.is_active = False
session.ended_at = datetime.utcnow()
if summary:
session.summary = summary
db.session.commit()
def get_conversation_stats(self, user_id: int) -> Dict:
"""
Get conversation statistics for a user.
Args:
user_id: User ID
Returns:
Dictionary with conversation stats
"""
total_messages = ChatMessage.query.filter(
ChatMessage.user_id == user_id
).count()
total_sessions = ConversationSession.query.filter(
ConversationSession.user_id == user_id
).count()
total_tokens = db.session.query(
db.func.sum(ChatMessage.tokens_used)
).filter(
ChatMessage.user_id == user_id
).scalar() or 0
# Get intent distribution
intent_counts = db.session.query(
ChatMessage.intent,
db.func.count(ChatMessage.id)
).filter(
ChatMessage.user_id == user_id,
ChatMessage.intent.isnot(None)
).group_by(ChatMessage.intent).all()
intent_distribution = {intent: count for intent, count in intent_counts}
return {
'total_messages': total_messages,
'total_sessions': total_sessions,
'total_tokens_used': total_tokens,
'intent_distribution': intent_distribution
}
def clear_old_sessions(self, days: int = 30):
"""
Archive old inactive sessions.
Args:
days: Number of days after which to archive sessions
"""
cutoff_date = datetime.utcnow() - timedelta(days=days)
old_sessions = ConversationSession.query.filter(
ConversationSession.last_activity_at < cutoff_date,
ConversationSession.is_active == True
).all()
for session in old_sessions:
session.is_active = False
session.ended_at = datetime.utcnow()
db.session.commit()
return len(old_sessions)