# conversation_manager.py import os import uuid import hashlib import logging from typing import Dict, List, Any, Optional from datetime import datetime from supabase import create_client, Client from dotenv import load_dotenv # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) load_dotenv() class ConversationManager: """Enhanced ConversationManager with user authentication and session isolation""" def __init__(self): """Initialize the conversation manager with Supabase client""" try: # Initialize Supabase client supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_KEY") if not supabase_url or not supabase_key: raise ValueError("Missing Supabase credentials in environment variables") self.supabase: Client = create_client(supabase_url, supabase_key) logger.info("ConversationManager initialized successfully with Supabase") except Exception as e: logger.error(f"Failed to initialize ConversationManager: {e}") raise def generate_chat_id(self, user_id: str) -> str: """Generate a unique chat ID for a specific user""" timestamp = str(int(datetime.utcnow().timestamp())) random_part = uuid.uuid4().hex[:8] return f"chat_{user_id[:8]}_{timestamp}_{random_part}" def generate_user_id(self, device_info: Optional[str] = None, ip_address: Optional[str] = None) -> str: """ Generate a unique user ID based on device info and/or IP This creates a consistent ID for the same device/IP combination """ # Create a hash based on available info identifier_string = "" if device_info: identifier_string += device_info if ip_address: identifier_string += ip_address # Fallback to timestamp + random if no device info if not identifier_string: identifier_string = f"anonymous_{int(datetime.utcnow().timestamp())}_{uuid.uuid4().hex[:8]}" # Create consistent hash user_hash = hashlib.sha256(identifier_string.encode()).hexdigest()[:16] return f"user_{user_hash}" def generate_message_id(self, chat_id: str, user_id: str) -> int: """Generate the next message ID for a specific chat""" try: # Get current conversation to find the highest message ID history = self.get_history(chat_id, user_id) if not history: return 1 # Find the highest message_id max_id = 0 for message in history: msg_id = message.get('message_id', 0) if isinstance(msg_id, int) and msg_id > max_id: max_id = msg_id return max_id + 1 except Exception as e: logger.error(f"Error generating message ID for chat {chat_id}, user {user_id}: {e}") return 1 def get_history(self, chat_id: str, user_id: str) -> List[Dict[str, Any]]: """ Retrieve conversation history for a specific chat ID and user Args: chat_id: The chat ID to retrieve history for user_id: The user ID (for security/isolation) Returns: List of message dictionaries, empty list if chat doesn't exist or unauthorized """ if not chat_id or not user_id: logger.warning("Cannot get history: chat_id or user_id is empty") return [] try: response = self.supabase.table('conversations').select('history').eq('session_id', chat_id).eq('user_id', user_id).execute() if response.data and len(response.data) > 0: history = response.data[0].get('history', []) if isinstance(history, list): logger.info(f"Retrieved {len(history)} messages for chat {chat_id}, user {user_id}") return history else: logger.warning(f"Invalid history format for chat {chat_id}") return [] else: logger.info(f"No history found for chat {chat_id}, user {user_id}") return [] except Exception as e: logger.error(f"Error retrieving history for chat {chat_id}, user {user_id}: {e}") return [] def save_history(self, chat_id: str, user_id: str, history: List[Dict[str, Any]]) -> bool: """ Save conversation history for a specific chat ID and user Args: chat_id: The chat ID to save history for user_id: The user ID (for security/isolation) history: List of message dictionaries Returns: True if successful, False otherwise """ if not chat_id or not user_id: logger.error("Cannot save history: chat_id or user_id is empty") return False if not isinstance(history, list): logger.error("Cannot save history: history must be a list") return False try: # Check if conversation exists for this user existing = self.supabase.table('conversations').select('session_id').eq('session_id', chat_id).eq('user_id', user_id).execute() current_time = datetime.utcnow().isoformat() if existing.data and len(existing.data) > 0: # Update existing conversation response = self.supabase.table('conversations').update({ 'history': history, 'updated_at': current_time }).eq('session_id', chat_id).eq('user_id', user_id).execute() logger.info(f"Updated history for chat {chat_id}, user {user_id}") else: # Create new conversation response = self.supabase.table('conversations').insert({ 'session_id': chat_id, 'user_id': user_id, 'history': history, 'created_at': current_time, 'updated_at': current_time }).execute() logger.info(f"Created new conversation for chat {chat_id}, user {user_id}") return True except Exception as e: logger.error(f"Error saving history for chat {chat_id}, user {user_id}: {e}") return False def add_message(self, chat_id: str, user_id: str, role: str, content: str, image_url: Optional[str] = None) -> Dict[str, Any]: """ Add a single message to a chat and return the message with its ID Args: chat_id: The chat ID to add the message to user_id: The user ID (for security/isolation) role: Message role ('user' or 'assistant') content: Message content image_url: Optional image URL Returns: Dictionary containing the message with its assigned ID """ if not chat_id or not user_id: logger.error("Cannot add message: chat_id or user_id is empty") return {} try: # Generate message ID message_id = self.generate_message_id(chat_id, user_id) # Create message object message = { "message_id": message_id, "role": role, "content": content, "timestamp": datetime.utcnow().isoformat() } # Add image URL if provided if image_url: message["imageUrl"] = image_url # Get current history and add the new message current_history = self.get_history(chat_id, user_id) current_history.append(message) # Save updated history if self.save_history(chat_id, user_id, current_history): logger.info(f"Added message {message_id} to chat {chat_id}, user {user_id}") return message else: logger.error(f"Failed to save message to chat {chat_id}, user {user_id}") return {} except Exception as e: logger.error(f"Error adding message to chat {chat_id}, user {user_id}: {e}") return {} def delete_history(self, chat_id: str, user_id: str) -> bool: """ Delete conversation history for a specific chat ID and user Args: chat_id: The chat ID to delete user_id: The user ID (for security/isolation) Returns: True if successful, False otherwise """ if not chat_id or not user_id: logger.error("Cannot delete history: chat_id or user_id is empty") return False try: response = self.supabase.table('conversations').delete().eq('session_id', chat_id).eq('user_id', user_id).execute() # Check if any rows were affected if hasattr(response, 'data') and response.data is not None: logger.info(f"Deleted conversation history for chat {chat_id}, user {user_id}") return True else: logger.warning(f"No conversation found to delete for chat {chat_id}, user {user_id}") return True # Consider it successful if nothing to delete except Exception as e: logger.error(f"Error deleting history for chat {chat_id}, user {user_id}: {e}") return False def get_all_chat_sessions(self, user_id: str) -> List[Dict[str, Any]]: """ Get all chat sessions for a specific user Args: user_id: The user ID to get sessions for Returns: List of dictionaries with chat session information """ if not user_id: logger.error("Cannot get chat sessions: user_id is empty") return [] try: response = self.supabase.table('conversations').select('session_id, history, created_at, updated_at').eq('user_id', user_id).execute() sessions = [] for conv in response.data: session_id = conv.get('session_id') history = conv.get('history', []) created_at = conv.get('created_at') updated_at = conv.get('updated_at') # Generate title from first user message in the history title = "New Chat" if history and len(history) > 0: first_user_msg = None for msg in history: if msg.get('role') == 'user': first_user_msg = msg break if first_user_msg: content = first_user_msg.get('content', '') if content: title = content[:35] + '...' if len(content) > 35 else content elif first_user_msg.get('imageUrl'): title = "Image Query" sessions.append({ "session_id": session_id, "title": title, "message_count": len(history), "created_at": created_at, "updated_at": updated_at }) # Sort by updated_at (most recent first) sessions.sort(key=lambda x: x.get('updated_at', ''), reverse=True) logger.info(f"Retrieved {len(sessions)} sessions for user {user_id}") return sessions except Exception as e: logger.error(f"Error fetching chat sessions for user {user_id}: {e}") return [] def get_chat_info(self, chat_id: str, user_id: str) -> Dict[str, Any]: """ Get information about a specific chat for a specific user Args: chat_id: The chat ID to get information for user_id: The user ID (for security/isolation) Returns: Dictionary with chat information """ if not chat_id or not user_id: return {} try: response = self.supabase.table('conversations').select('*').eq('session_id', chat_id).eq('user_id', user_id).execute() if response.data and len(response.data) > 0: conv = response.data[0] history = conv.get('history', []) return { "session_id": conv.get('session_id'), "user_id": conv.get('user_id'), "message_count": len(history), "created_at": conv.get('created_at'), "updated_at": conv.get('updated_at'), "exists": True } else: return { "session_id": chat_id, "user_id": user_id, "message_count": 0, "exists": False } except Exception as e: logger.error(f"Error getting chat info for {chat_id}, user {user_id}: {e}") return { "session_id": chat_id, "user_id": user_id, "error": str(e), "exists": False } def chat_exists(self, chat_id: str, user_id: str) -> bool: """ Check if a chat ID exists for a specific user Args: chat_id: The chat ID to check user_id: The user ID (for security/isolation) Returns: True if chat exists and belongs to user, False otherwise """ if not chat_id or not user_id: return False try: response = self.supabase.table('conversations').select('session_id').eq('session_id', chat_id).eq('user_id', user_id).execute() exists = bool(response.data and len(response.data) > 0) if exists: logger.info(f"Chat {chat_id} exists for user {user_id}") return exists except Exception as e: logger.error(f"Error checking if chat {chat_id} exists for user {user_id}: {e}") return False def get_user_stats(self, user_id: str) -> Dict[str, Any]: """ Get statistics for a specific user Args: user_id: The user ID to get stats for Returns: Dictionary with user statistics """ if not user_id: return {} try: sessions = self.get_all_chat_sessions(user_id) total_sessions = len(sessions) total_messages = sum(session.get('message_count', 0) for session in sessions) # Calculate recent activity (last 24 hours) from datetime import datetime, timedelta now = datetime.utcnow() yesterday = now - timedelta(days=1) recent_sessions = 0 for session in sessions: updated_at = session.get('updated_at') if updated_at: try: session_time = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) if session_time > yesterday: recent_sessions += 1 except: pass return { "user_id": user_id, "total_sessions": total_sessions, "total_messages": total_messages, "recent_sessions_24h": recent_sessions, "average_messages_per_session": round(total_messages / total_sessions, 2) if total_sessions > 0 else 0, "first_session": min(session.get('created_at', '') for session in sessions) if sessions else None, "last_activity": max(session.get('updated_at', '') for session in sessions) if sessions else None } except Exception as e: logger.error(f"Error getting user stats for {user_id}: {e}") return {"user_id": user_id, "error": str(e)} def delete_all_user_data(self, user_id: str) -> bool: """ Delete all conversation data for a specific user Args: user_id: The user ID to delete all data for Returns: True if successful, False otherwise """ if not user_id: logger.error("Cannot delete user data: user_id is empty") return False try: response = self.supabase.table('conversations').delete().eq('user_id', user_id).execute() logger.info(f"Deleted all conversation data for user {user_id}") return True except Exception as e: logger.error(f"Error deleting all user data for {user_id}: {e}") return False def migrate_anonymous_sessions(self, old_session_ids: List[str], new_user_id: str) -> bool: """ Migrate anonymous sessions to a specific user ID Useful when user decides to create an account Args: old_session_ids: List of session IDs to migrate new_user_id: The new user ID to assign these sessions to Returns: True if successful, False otherwise """ if not old_session_ids or not new_user_id: logger.error("Cannot migrate sessions: missing session_ids or user_id") return False try: migrated_count = 0 for session_id in old_session_ids: try: # Update the user_id for this session response = self.supabase.table('conversations').update({ 'user_id': new_user_id, 'updated_at': datetime.utcnow().isoformat() }).eq('session_id', session_id).execute() migrated_count += 1 except Exception as e: logger.error(f"Failed to migrate session {session_id}: {e}") continue logger.info(f"Migrated {migrated_count} sessions to user {new_user_id}") return migrated_count > 0 except Exception as e: logger.error(f"Error migrating sessions to user {new_user_id}: {e}") return False # Example usage and testing if __name__ == "__main__": print("=== ConversationManager with User Authentication Test ===") try: # Initialize manager manager = ConversationManager() # Test user ID generation test_user_id = manager.generate_user_id("test_device_info", "192.168.1.100") print(f"\n1. Generated user ID: {test_user_id}") # Create a new chat for this user test_chat_id = manager.generate_chat_id(test_user_id) print(f"2. Generated chat ID: {test_chat_id}") # Add some messages msg1 = manager.add_message(test_chat_id, test_user_id, "user", "Hello, I need help with farming") print(f"3. Added user message: {msg1}") msg2 = manager.add_message(test_chat_id, test_user_id, "assistant", "Hello! I'd be happy to help you with farming. What specific questions do you have?") print(f"4. Added assistant message: {msg2}") # Test retrieving history print(f"\n5. Retrieved history for {test_chat_id}:") history = manager.get_history(test_chat_id, test_user_id) for msg in history: print(f" Message {msg.get('message_id')}: {msg.get('role')} - {msg.get('content')[:50]}...") # Test getting all sessions for this user print(f"\n6. All sessions for user {test_user_id}:") sessions = manager.get_all_chat_sessions(test_user_id) for session in sessions: print(f" {session.get('session_id')}: {session.get('title')} ({session.get('message_count')} messages)") # Test user stats print(f"\n7. User statistics:") stats = manager.get_user_stats(test_user_id) for key, value in stats.items(): print(f" {key}: {value}") # Test security - try to access with wrong user ID print(f"\n8. Security test - trying to access with wrong user ID:") wrong_user_history = manager.get_history(test_chat_id, "wrong_user_id") print(f" History with wrong user ID: {len(wrong_user_history)} messages (should be 0)") print("\nāœ… All tests completed successfully!") except Exception as e: print(f"āŒ Test failed: {e}")