EasyBot / conversation_manager.py
NitinBot001's picture
Update conversation_manager.py
5ed8087 verified
# conversation_manager.py
import os
import uuid
import hashlib
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
from supabase import create_client, Client
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
class ConversationManager:
"""Enhanced ConversationManager with user authentication and session isolation"""
def __init__(self):
"""Initialize the conversation manager with Supabase client"""
try:
# Initialize Supabase client
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")
if not supabase_url or not supabase_key:
raise ValueError("Missing Supabase credentials in environment variables")
self.supabase: Client = create_client(supabase_url, supabase_key)
logger.info("ConversationManager initialized successfully with Supabase")
except Exception as e:
logger.error(f"Failed to initialize ConversationManager: {e}")
raise
def generate_chat_id(self, user_id: str) -> str:
"""Generate a unique chat ID for a specific user"""
timestamp = str(int(datetime.utcnow().timestamp()))
random_part = uuid.uuid4().hex[:8]
return f"chat_{user_id[:8]}_{timestamp}_{random_part}"
def generate_user_id(self, device_info: Optional[str] = None, ip_address: Optional[str] = None) -> str:
"""
Generate a unique user ID based on device info and/or IP
This creates a consistent ID for the same device/IP combination
"""
# Create a hash based on available info
identifier_string = ""
if device_info:
identifier_string += device_info
if ip_address:
identifier_string += ip_address
# Fallback to timestamp + random if no device info
if not identifier_string:
identifier_string = f"anonymous_{int(datetime.utcnow().timestamp())}_{uuid.uuid4().hex[:8]}"
# Create consistent hash
user_hash = hashlib.sha256(identifier_string.encode()).hexdigest()[:16]
return f"user_{user_hash}"
def generate_message_id(self, chat_id: str, user_id: str) -> int:
"""Generate the next message ID for a specific chat"""
try:
# Get current conversation to find the highest message ID
history = self.get_history(chat_id, user_id)
if not history:
return 1
# Find the highest message_id
max_id = 0
for message in history:
msg_id = message.get('message_id', 0)
if isinstance(msg_id, int) and msg_id > max_id:
max_id = msg_id
return max_id + 1
except Exception as e:
logger.error(f"Error generating message ID for chat {chat_id}, user {user_id}: {e}")
return 1
def get_history(self, chat_id: str, user_id: str) -> List[Dict[str, Any]]:
"""
Retrieve conversation history for a specific chat ID and user
Args:
chat_id: The chat ID to retrieve history for
user_id: The user ID (for security/isolation)
Returns:
List of message dictionaries, empty list if chat doesn't exist or unauthorized
"""
if not chat_id or not user_id:
logger.warning("Cannot get history: chat_id or user_id is empty")
return []
try:
response = self.supabase.table('conversations').select('history').eq('session_id', chat_id).eq('user_id', user_id).execute()
if response.data and len(response.data) > 0:
history = response.data[0].get('history', [])
if isinstance(history, list):
logger.info(f"Retrieved {len(history)} messages for chat {chat_id}, user {user_id}")
return history
else:
logger.warning(f"Invalid history format for chat {chat_id}")
return []
else:
logger.info(f"No history found for chat {chat_id}, user {user_id}")
return []
except Exception as e:
logger.error(f"Error retrieving history for chat {chat_id}, user {user_id}: {e}")
return []
def save_history(self, chat_id: str, user_id: str, history: List[Dict[str, Any]]) -> bool:
"""
Save conversation history for a specific chat ID and user
Args:
chat_id: The chat ID to save history for
user_id: The user ID (for security/isolation)
history: List of message dictionaries
Returns:
True if successful, False otherwise
"""
if not chat_id or not user_id:
logger.error("Cannot save history: chat_id or user_id is empty")
return False
if not isinstance(history, list):
logger.error("Cannot save history: history must be a list")
return False
try:
# Check if conversation exists for this user
existing = self.supabase.table('conversations').select('session_id').eq('session_id', chat_id).eq('user_id', user_id).execute()
current_time = datetime.utcnow().isoformat()
if existing.data and len(existing.data) > 0:
# Update existing conversation
response = self.supabase.table('conversations').update({
'history': history,
'updated_at': current_time
}).eq('session_id', chat_id).eq('user_id', user_id).execute()
logger.info(f"Updated history for chat {chat_id}, user {user_id}")
else:
# Create new conversation
response = self.supabase.table('conversations').insert({
'session_id': chat_id,
'user_id': user_id,
'history': history,
'created_at': current_time,
'updated_at': current_time
}).execute()
logger.info(f"Created new conversation for chat {chat_id}, user {user_id}")
return True
except Exception as e:
logger.error(f"Error saving history for chat {chat_id}, user {user_id}: {e}")
return False
def add_message(self, chat_id: str, user_id: str, role: str, content: str, image_url: Optional[str] = None) -> Dict[str, Any]:
"""
Add a single message to a chat and return the message with its ID
Args:
chat_id: The chat ID to add the message to
user_id: The user ID (for security/isolation)
role: Message role ('user' or 'assistant')
content: Message content
image_url: Optional image URL
Returns:
Dictionary containing the message with its assigned ID
"""
if not chat_id or not user_id:
logger.error("Cannot add message: chat_id or user_id is empty")
return {}
try:
# Generate message ID
message_id = self.generate_message_id(chat_id, user_id)
# Create message object
message = {
"message_id": message_id,
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat()
}
# Add image URL if provided
if image_url:
message["imageUrl"] = image_url
# Get current history and add the new message
current_history = self.get_history(chat_id, user_id)
current_history.append(message)
# Save updated history
if self.save_history(chat_id, user_id, current_history):
logger.info(f"Added message {message_id} to chat {chat_id}, user {user_id}")
return message
else:
logger.error(f"Failed to save message to chat {chat_id}, user {user_id}")
return {}
except Exception as e:
logger.error(f"Error adding message to chat {chat_id}, user {user_id}: {e}")
return {}
def delete_history(self, chat_id: str, user_id: str) -> bool:
"""
Delete conversation history for a specific chat ID and user
Args:
chat_id: The chat ID to delete
user_id: The user ID (for security/isolation)
Returns:
True if successful, False otherwise
"""
if not chat_id or not user_id:
logger.error("Cannot delete history: chat_id or user_id is empty")
return False
try:
response = self.supabase.table('conversations').delete().eq('session_id', chat_id).eq('user_id', user_id).execute()
# Check if any rows were affected
if hasattr(response, 'data') and response.data is not None:
logger.info(f"Deleted conversation history for chat {chat_id}, user {user_id}")
return True
else:
logger.warning(f"No conversation found to delete for chat {chat_id}, user {user_id}")
return True # Consider it successful if nothing to delete
except Exception as e:
logger.error(f"Error deleting history for chat {chat_id}, user {user_id}: {e}")
return False
def get_all_chat_sessions(self, user_id: str) -> List[Dict[str, Any]]:
"""
Get all chat sessions for a specific user
Args:
user_id: The user ID to get sessions for
Returns:
List of dictionaries with chat session information
"""
if not user_id:
logger.error("Cannot get chat sessions: user_id is empty")
return []
try:
response = self.supabase.table('conversations').select('session_id, history, created_at, updated_at').eq('user_id', user_id).execute()
sessions = []
for conv in response.data:
session_id = conv.get('session_id')
history = conv.get('history', [])
created_at = conv.get('created_at')
updated_at = conv.get('updated_at')
# Generate title from first user message in the history
title = "New Chat"
if history and len(history) > 0:
first_user_msg = None
for msg in history:
if msg.get('role') == 'user':
first_user_msg = msg
break
if first_user_msg:
content = first_user_msg.get('content', '')
if content:
title = content[:35] + '...' if len(content) > 35 else content
elif first_user_msg.get('imageUrl'):
title = "Image Query"
sessions.append({
"session_id": session_id,
"title": title,
"message_count": len(history),
"created_at": created_at,
"updated_at": updated_at
})
# Sort by updated_at (most recent first)
sessions.sort(key=lambda x: x.get('updated_at', ''), reverse=True)
logger.info(f"Retrieved {len(sessions)} sessions for user {user_id}")
return sessions
except Exception as e:
logger.error(f"Error fetching chat sessions for user {user_id}: {e}")
return []
def get_chat_info(self, chat_id: str, user_id: str) -> Dict[str, Any]:
"""
Get information about a specific chat for a specific user
Args:
chat_id: The chat ID to get information for
user_id: The user ID (for security/isolation)
Returns:
Dictionary with chat information
"""
if not chat_id or not user_id:
return {}
try:
response = self.supabase.table('conversations').select('*').eq('session_id', chat_id).eq('user_id', user_id).execute()
if response.data and len(response.data) > 0:
conv = response.data[0]
history = conv.get('history', [])
return {
"session_id": conv.get('session_id'),
"user_id": conv.get('user_id'),
"message_count": len(history),
"created_at": conv.get('created_at'),
"updated_at": conv.get('updated_at'),
"exists": True
}
else:
return {
"session_id": chat_id,
"user_id": user_id,
"message_count": 0,
"exists": False
}
except Exception as e:
logger.error(f"Error getting chat info for {chat_id}, user {user_id}: {e}")
return {
"session_id": chat_id,
"user_id": user_id,
"error": str(e),
"exists": False
}
def chat_exists(self, chat_id: str, user_id: str) -> bool:
"""
Check if a chat ID exists for a specific user
Args:
chat_id: The chat ID to check
user_id: The user ID (for security/isolation)
Returns:
True if chat exists and belongs to user, False otherwise
"""
if not chat_id or not user_id:
return False
try:
response = self.supabase.table('conversations').select('session_id').eq('session_id', chat_id).eq('user_id', user_id).execute()
exists = bool(response.data and len(response.data) > 0)
if exists:
logger.info(f"Chat {chat_id} exists for user {user_id}")
return exists
except Exception as e:
logger.error(f"Error checking if chat {chat_id} exists for user {user_id}: {e}")
return False
def get_user_stats(self, user_id: str) -> Dict[str, Any]:
"""
Get statistics for a specific user
Args:
user_id: The user ID to get stats for
Returns:
Dictionary with user statistics
"""
if not user_id:
return {}
try:
sessions = self.get_all_chat_sessions(user_id)
total_sessions = len(sessions)
total_messages = sum(session.get('message_count', 0) for session in sessions)
# Calculate recent activity (last 24 hours)
from datetime import datetime, timedelta
now = datetime.utcnow()
yesterday = now - timedelta(days=1)
recent_sessions = 0
for session in sessions:
updated_at = session.get('updated_at')
if updated_at:
try:
session_time = datetime.fromisoformat(updated_at.replace('Z', '+00:00'))
if session_time > yesterday:
recent_sessions += 1
except:
pass
return {
"user_id": user_id,
"total_sessions": total_sessions,
"total_messages": total_messages,
"recent_sessions_24h": recent_sessions,
"average_messages_per_session": round(total_messages / total_sessions, 2) if total_sessions > 0 else 0,
"first_session": min(session.get('created_at', '') for session in sessions) if sessions else None,
"last_activity": max(session.get('updated_at', '') for session in sessions) if sessions else None
}
except Exception as e:
logger.error(f"Error getting user stats for {user_id}: {e}")
return {"user_id": user_id, "error": str(e)}
def delete_all_user_data(self, user_id: str) -> bool:
"""
Delete all conversation data for a specific user
Args:
user_id: The user ID to delete all data for
Returns:
True if successful, False otherwise
"""
if not user_id:
logger.error("Cannot delete user data: user_id is empty")
return False
try:
response = self.supabase.table('conversations').delete().eq('user_id', user_id).execute()
logger.info(f"Deleted all conversation data for user {user_id}")
return True
except Exception as e:
logger.error(f"Error deleting all user data for {user_id}: {e}")
return False
def migrate_anonymous_sessions(self, old_session_ids: List[str], new_user_id: str) -> bool:
"""
Migrate anonymous sessions to a specific user ID
Useful when user decides to create an account
Args:
old_session_ids: List of session IDs to migrate
new_user_id: The new user ID to assign these sessions to
Returns:
True if successful, False otherwise
"""
if not old_session_ids or not new_user_id:
logger.error("Cannot migrate sessions: missing session_ids or user_id")
return False
try:
migrated_count = 0
for session_id in old_session_ids:
try:
# Update the user_id for this session
response = self.supabase.table('conversations').update({
'user_id': new_user_id,
'updated_at': datetime.utcnow().isoformat()
}).eq('session_id', session_id).execute()
migrated_count += 1
except Exception as e:
logger.error(f"Failed to migrate session {session_id}: {e}")
continue
logger.info(f"Migrated {migrated_count} sessions to user {new_user_id}")
return migrated_count > 0
except Exception as e:
logger.error(f"Error migrating sessions to user {new_user_id}: {e}")
return False
# Example usage and testing
if __name__ == "__main__":
print("=== ConversationManager with User Authentication Test ===")
try:
# Initialize manager
manager = ConversationManager()
# Test user ID generation
test_user_id = manager.generate_user_id("test_device_info", "192.168.1.100")
print(f"\n1. Generated user ID: {test_user_id}")
# Create a new chat for this user
test_chat_id = manager.generate_chat_id(test_user_id)
print(f"2. Generated chat ID: {test_chat_id}")
# Add some messages
msg1 = manager.add_message(test_chat_id, test_user_id, "user", "Hello, I need help with farming")
print(f"3. Added user message: {msg1}")
msg2 = manager.add_message(test_chat_id, test_user_id, "assistant", "Hello! I'd be happy to help you with farming. What specific questions do you have?")
print(f"4. Added assistant message: {msg2}")
# Test retrieving history
print(f"\n5. Retrieved history for {test_chat_id}:")
history = manager.get_history(test_chat_id, test_user_id)
for msg in history:
print(f" Message {msg.get('message_id')}: {msg.get('role')} - {msg.get('content')[:50]}...")
# Test getting all sessions for this user
print(f"\n6. All sessions for user {test_user_id}:")
sessions = manager.get_all_chat_sessions(test_user_id)
for session in sessions:
print(f" {session.get('session_id')}: {session.get('title')} ({session.get('message_count')} messages)")
# Test user stats
print(f"\n7. User statistics:")
stats = manager.get_user_stats(test_user_id)
for key, value in stats.items():
print(f" {key}: {value}")
# Test security - try to access with wrong user ID
print(f"\n8. Security test - trying to access with wrong user ID:")
wrong_user_history = manager.get_history(test_chat_id, "wrong_user_id")
print(f" History with wrong user ID: {len(wrong_user_history)} messages (should be 0)")
print("\n✅ All tests completed successfully!")
except Exception as e:
print(f"❌ Test failed: {e}")