ChatbotRAG / conversation_service.py
minhvtt's picture
Upload 14 files
91fe002 verified
raw
history blame
5.79 kB
"""
Conversation Service for Multi-turn Chat
Server-side session management
"""
from typing import List, Dict, Optional
from datetime import datetime
from pymongo.collection import Collection
import uuid
class ConversationService:
"""
Manages multi-turn conversation history với server-side session
"""
def __init__(self, mongo_collection: Collection, max_history: int = 10):
"""
Args:
mongo_collection: MongoDB collection for storing conversations
max_history: Maximum số messages giữ lại (sliding window)
"""
self.collection = mongo_collection
self.max_history = max_history
# Create indexes
self._ensure_indexes()
def _ensure_indexes(self):
"""Create necessary indexes"""
try:
self.collection.create_index("session_id", unique=True)
# Auto-delete sessions sau 7 ngày không dùng
self.collection.create_index(
"updated_at",
expireAfterSeconds=604800 # 7 days
)
print("✓ Conversation indexes created")
except Exception as e:
print(f"Conversation indexes already exist or error: {e}")
def create_session(self, metadata: Optional[Dict] = None) -> str:
"""
Create new conversation session
Returns:
session_id (UUID string)
"""
session_id = str(uuid.uuid4())
self.collection.insert_one({
"session_id": session_id,
"messages": [],
"metadata": metadata or {},
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
})
return session_id
def add_message(
self,
session_id: str,
role: str,
content: str,
metadata: Optional[Dict] = None
):
"""
Add message to conversation history
Args:
session_id: Session identifier
role: "user" or "assistant"
content: Message text
metadata: Additional info (rag_stats, tool_calls, etc.)
"""
message = {
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {}
}
# Upsert: tạo session nếu chưa tồn tại
self.collection.update_one(
{"session_id": session_id},
{
"$push": {
"messages": {
"$each": [message],
"$slice": -self.max_history # Keep only last N messages
}
},
"$set": {"updated_at": datetime.utcnow()}
},
upsert=True
)
def get_conversation_history(
self,
session_id: str,
limit: Optional[int] = None,
include_metadata: bool = False
) -> List[Dict]:
"""
Get conversation messages for LLM context
Args:
session_id: Session identifier
limit: Override max_history với số lượng tùy chỉnh
include_metadata: Include metadata trong response
Returns:
List of messages in format: [{"role": "user", "content": "..."}, ...]
"""
session = self.collection.find_one({"session_id": session_id})
if not session:
return []
messages = session.get("messages", [])
# Limit to recent messages
if limit:
messages = messages[-limit:]
else:
messages = messages[-self.max_history:]
# Format for LLM
if include_metadata:
return messages
else:
return [
{
"role": msg["role"],
"content": msg["content"]
}
for msg in messages
]
def get_session_info(self, session_id: str) -> Optional[Dict]:
"""
Get session metadata
Returns:
Session info hoặc None nếu không tồn tại
"""
session = self.collection.find_one(
{"session_id": session_id},
{"_id": 0, "session_id": 1, "created_at": 1, "updated_at": 1, "metadata": 1}
)
return session
def clear_session(self, session_id: str) -> bool:
"""
Clear conversation history for session
Returns:
True nếu xóa thành công, False nếu session không tồn tại
"""
result = self.collection.delete_one({"session_id": session_id})
return result.deleted_count > 0
def session_exists(self, session_id: str) -> bool:
"""
Check if session exists
"""
return self.collection.count_documents({"session_id": session_id}) > 0
def get_last_user_message(self, session_id: str) -> Optional[str]:
"""
Get the last user message in conversation
Useful for context extraction
"""
session = self.collection.find_one({"session_id": session_id})
if not session:
return None
messages = session.get("messages", [])
# Tìm message cuối cùng từ user
for msg in reversed(messages):
if msg["role"] == "user":
return msg["content"]
return None