Spaces:
Sleeping
Sleeping
| # conversation_manager.py | |
| import os | |
| import uuid | |
| import hashlib | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| from supabase import create_client, Client | |
| from dotenv import load_dotenv | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| class ConversationManager: | |
| """Enhanced ConversationManager with user authentication and session isolation""" | |
| def __init__(self): | |
| """Initialize the conversation manager with Supabase client""" | |
| try: | |
| # Initialize Supabase client | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_KEY") | |
| if not supabase_url or not supabase_key: | |
| raise ValueError("Missing Supabase credentials in environment variables") | |
| self.supabase: Client = create_client(supabase_url, supabase_key) | |
| logger.info("ConversationManager initialized successfully with Supabase") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize ConversationManager: {e}") | |
| raise | |
| def generate_chat_id(self, user_id: str) -> str: | |
| """Generate a unique chat ID for a specific user""" | |
| timestamp = str(int(datetime.utcnow().timestamp())) | |
| random_part = uuid.uuid4().hex[:8] | |
| return f"chat_{user_id[:8]}_{timestamp}_{random_part}" | |
| def generate_user_id(self, device_info: Optional[str] = None, ip_address: Optional[str] = None) -> str: | |
| """ | |
| Generate a unique user ID based on device info and/or IP | |
| This creates a consistent ID for the same device/IP combination | |
| """ | |
| # Create a hash based on available info | |
| identifier_string = "" | |
| if device_info: | |
| identifier_string += device_info | |
| if ip_address: | |
| identifier_string += ip_address | |
| # Fallback to timestamp + random if no device info | |
| if not identifier_string: | |
| identifier_string = f"anonymous_{int(datetime.utcnow().timestamp())}_{uuid.uuid4().hex[:8]}" | |
| # Create consistent hash | |
| user_hash = hashlib.sha256(identifier_string.encode()).hexdigest()[:16] | |
| return f"user_{user_hash}" | |
| def generate_message_id(self, chat_id: str, user_id: str) -> int: | |
| """Generate the next message ID for a specific chat""" | |
| try: | |
| # Get current conversation to find the highest message ID | |
| history = self.get_history(chat_id, user_id) | |
| if not history: | |
| return 1 | |
| # Find the highest message_id | |
| max_id = 0 | |
| for message in history: | |
| msg_id = message.get('message_id', 0) | |
| if isinstance(msg_id, int) and msg_id > max_id: | |
| max_id = msg_id | |
| return max_id + 1 | |
| except Exception as e: | |
| logger.error(f"Error generating message ID for chat {chat_id}, user {user_id}: {e}") | |
| return 1 | |
| def get_history(self, chat_id: str, user_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve conversation history for a specific chat ID and user | |
| Args: | |
| chat_id: The chat ID to retrieve history for | |
| user_id: The user ID (for security/isolation) | |
| Returns: | |
| List of message dictionaries, empty list if chat doesn't exist or unauthorized | |
| """ | |
| if not chat_id or not user_id: | |
| logger.warning("Cannot get history: chat_id or user_id is empty") | |
| return [] | |
| try: | |
| response = self.supabase.table('conversations').select('history').eq('session_id', chat_id).eq('user_id', user_id).execute() | |
| if response.data and len(response.data) > 0: | |
| history = response.data[0].get('history', []) | |
| if isinstance(history, list): | |
| logger.info(f"Retrieved {len(history)} messages for chat {chat_id}, user {user_id}") | |
| return history | |
| else: | |
| logger.warning(f"Invalid history format for chat {chat_id}") | |
| return [] | |
| else: | |
| logger.info(f"No history found for chat {chat_id}, user {user_id}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error retrieving history for chat {chat_id}, user {user_id}: {e}") | |
| return [] | |
| def save_history(self, chat_id: str, user_id: str, history: List[Dict[str, Any]]) -> bool: | |
| """ | |
| Save conversation history for a specific chat ID and user | |
| Args: | |
| chat_id: The chat ID to save history for | |
| user_id: The user ID (for security/isolation) | |
| history: List of message dictionaries | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not chat_id or not user_id: | |
| logger.error("Cannot save history: chat_id or user_id is empty") | |
| return False | |
| if not isinstance(history, list): | |
| logger.error("Cannot save history: history must be a list") | |
| return False | |
| try: | |
| # Check if conversation exists for this user | |
| existing = self.supabase.table('conversations').select('session_id').eq('session_id', chat_id).eq('user_id', user_id).execute() | |
| current_time = datetime.utcnow().isoformat() | |
| if existing.data and len(existing.data) > 0: | |
| # Update existing conversation | |
| response = self.supabase.table('conversations').update({ | |
| 'history': history, | |
| 'updated_at': current_time | |
| }).eq('session_id', chat_id).eq('user_id', user_id).execute() | |
| logger.info(f"Updated history for chat {chat_id}, user {user_id}") | |
| else: | |
| # Create new conversation | |
| response = self.supabase.table('conversations').insert({ | |
| 'session_id': chat_id, | |
| 'user_id': user_id, | |
| 'history': history, | |
| 'created_at': current_time, | |
| 'updated_at': current_time | |
| }).execute() | |
| logger.info(f"Created new conversation for chat {chat_id}, user {user_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error saving history for chat {chat_id}, user {user_id}: {e}") | |
| return False | |
| def add_message(self, chat_id: str, user_id: str, role: str, content: str, image_url: Optional[str] = None) -> Dict[str, Any]: | |
| """ | |
| Add a single message to a chat and return the message with its ID | |
| Args: | |
| chat_id: The chat ID to add the message to | |
| user_id: The user ID (for security/isolation) | |
| role: Message role ('user' or 'assistant') | |
| content: Message content | |
| image_url: Optional image URL | |
| Returns: | |
| Dictionary containing the message with its assigned ID | |
| """ | |
| if not chat_id or not user_id: | |
| logger.error("Cannot add message: chat_id or user_id is empty") | |
| return {} | |
| try: | |
| # Generate message ID | |
| message_id = self.generate_message_id(chat_id, user_id) | |
| # Create message object | |
| message = { | |
| "message_id": message_id, | |
| "role": role, | |
| "content": content, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Add image URL if provided | |
| if image_url: | |
| message["imageUrl"] = image_url | |
| # Get current history and add the new message | |
| current_history = self.get_history(chat_id, user_id) | |
| current_history.append(message) | |
| # Save updated history | |
| if self.save_history(chat_id, user_id, current_history): | |
| logger.info(f"Added message {message_id} to chat {chat_id}, user {user_id}") | |
| return message | |
| else: | |
| logger.error(f"Failed to save message to chat {chat_id}, user {user_id}") | |
| return {} | |
| except Exception as e: | |
| logger.error(f"Error adding message to chat {chat_id}, user {user_id}: {e}") | |
| return {} | |
| def delete_history(self, chat_id: str, user_id: str) -> bool: | |
| """ | |
| Delete conversation history for a specific chat ID and user | |
| Args: | |
| chat_id: The chat ID to delete | |
| user_id: The user ID (for security/isolation) | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not chat_id or not user_id: | |
| logger.error("Cannot delete history: chat_id or user_id is empty") | |
| return False | |
| try: | |
| response = self.supabase.table('conversations').delete().eq('session_id', chat_id).eq('user_id', user_id).execute() | |
| # Check if any rows were affected | |
| if hasattr(response, 'data') and response.data is not None: | |
| logger.info(f"Deleted conversation history for chat {chat_id}, user {user_id}") | |
| return True | |
| else: | |
| logger.warning(f"No conversation found to delete for chat {chat_id}, user {user_id}") | |
| return True # Consider it successful if nothing to delete | |
| except Exception as e: | |
| logger.error(f"Error deleting history for chat {chat_id}, user {user_id}: {e}") | |
| return False | |
| def get_all_chat_sessions(self, user_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get all chat sessions for a specific user | |
| Args: | |
| user_id: The user ID to get sessions for | |
| Returns: | |
| List of dictionaries with chat session information | |
| """ | |
| if not user_id: | |
| logger.error("Cannot get chat sessions: user_id is empty") | |
| return [] | |
| try: | |
| response = self.supabase.table('conversations').select('session_id, history, created_at, updated_at').eq('user_id', user_id).execute() | |
| sessions = [] | |
| for conv in response.data: | |
| session_id = conv.get('session_id') | |
| history = conv.get('history', []) | |
| created_at = conv.get('created_at') | |
| updated_at = conv.get('updated_at') | |
| # Generate title from first user message in the history | |
| title = "New Chat" | |
| if history and len(history) > 0: | |
| first_user_msg = None | |
| for msg in history: | |
| if msg.get('role') == 'user': | |
| first_user_msg = msg | |
| break | |
| if first_user_msg: | |
| content = first_user_msg.get('content', '') | |
| if content: | |
| title = content[:35] + '...' if len(content) > 35 else content | |
| elif first_user_msg.get('imageUrl'): | |
| title = "Image Query" | |
| sessions.append({ | |
| "session_id": session_id, | |
| "title": title, | |
| "message_count": len(history), | |
| "created_at": created_at, | |
| "updated_at": updated_at | |
| }) | |
| # Sort by updated_at (most recent first) | |
| sessions.sort(key=lambda x: x.get('updated_at', ''), reverse=True) | |
| logger.info(f"Retrieved {len(sessions)} sessions for user {user_id}") | |
| return sessions | |
| except Exception as e: | |
| logger.error(f"Error fetching chat sessions for user {user_id}: {e}") | |
| return [] | |
| def get_chat_info(self, chat_id: str, user_id: str) -> Dict[str, Any]: | |
| """ | |
| Get information about a specific chat for a specific user | |
| Args: | |
| chat_id: The chat ID to get information for | |
| user_id: The user ID (for security/isolation) | |
| Returns: | |
| Dictionary with chat information | |
| """ | |
| if not chat_id or not user_id: | |
| return {} | |
| try: | |
| response = self.supabase.table('conversations').select('*').eq('session_id', chat_id).eq('user_id', user_id).execute() | |
| if response.data and len(response.data) > 0: | |
| conv = response.data[0] | |
| history = conv.get('history', []) | |
| return { | |
| "session_id": conv.get('session_id'), | |
| "user_id": conv.get('user_id'), | |
| "message_count": len(history), | |
| "created_at": conv.get('created_at'), | |
| "updated_at": conv.get('updated_at'), | |
| "exists": True | |
| } | |
| else: | |
| return { | |
| "session_id": chat_id, | |
| "user_id": user_id, | |
| "message_count": 0, | |
| "exists": False | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting chat info for {chat_id}, user {user_id}: {e}") | |
| return { | |
| "session_id": chat_id, | |
| "user_id": user_id, | |
| "error": str(e), | |
| "exists": False | |
| } | |
| def chat_exists(self, chat_id: str, user_id: str) -> bool: | |
| """ | |
| Check if a chat ID exists for a specific user | |
| Args: | |
| chat_id: The chat ID to check | |
| user_id: The user ID (for security/isolation) | |
| Returns: | |
| True if chat exists and belongs to user, False otherwise | |
| """ | |
| if not chat_id or not user_id: | |
| return False | |
| try: | |
| response = self.supabase.table('conversations').select('session_id').eq('session_id', chat_id).eq('user_id', user_id).execute() | |
| exists = bool(response.data and len(response.data) > 0) | |
| if exists: | |
| logger.info(f"Chat {chat_id} exists for user {user_id}") | |
| return exists | |
| except Exception as e: | |
| logger.error(f"Error checking if chat {chat_id} exists for user {user_id}: {e}") | |
| return False | |
| def get_user_stats(self, user_id: str) -> Dict[str, Any]: | |
| """ | |
| Get statistics for a specific user | |
| Args: | |
| user_id: The user ID to get stats for | |
| Returns: | |
| Dictionary with user statistics | |
| """ | |
| if not user_id: | |
| return {} | |
| try: | |
| sessions = self.get_all_chat_sessions(user_id) | |
| total_sessions = len(sessions) | |
| total_messages = sum(session.get('message_count', 0) for session in sessions) | |
| # Calculate recent activity (last 24 hours) | |
| from datetime import datetime, timedelta | |
| now = datetime.utcnow() | |
| yesterday = now - timedelta(days=1) | |
| recent_sessions = 0 | |
| for session in sessions: | |
| updated_at = session.get('updated_at') | |
| if updated_at: | |
| try: | |
| session_time = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) | |
| if session_time > yesterday: | |
| recent_sessions += 1 | |
| except: | |
| pass | |
| return { | |
| "user_id": user_id, | |
| "total_sessions": total_sessions, | |
| "total_messages": total_messages, | |
| "recent_sessions_24h": recent_sessions, | |
| "average_messages_per_session": round(total_messages / total_sessions, 2) if total_sessions > 0 else 0, | |
| "first_session": min(session.get('created_at', '') for session in sessions) if sessions else None, | |
| "last_activity": max(session.get('updated_at', '') for session in sessions) if sessions else None | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting user stats for {user_id}: {e}") | |
| return {"user_id": user_id, "error": str(e)} | |
| def delete_all_user_data(self, user_id: str) -> bool: | |
| """ | |
| Delete all conversation data for a specific user | |
| Args: | |
| user_id: The user ID to delete all data for | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not user_id: | |
| logger.error("Cannot delete user data: user_id is empty") | |
| return False | |
| try: | |
| response = self.supabase.table('conversations').delete().eq('user_id', user_id).execute() | |
| logger.info(f"Deleted all conversation data for user {user_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting all user data for {user_id}: {e}") | |
| return False | |
| def migrate_anonymous_sessions(self, old_session_ids: List[str], new_user_id: str) -> bool: | |
| """ | |
| Migrate anonymous sessions to a specific user ID | |
| Useful when user decides to create an account | |
| Args: | |
| old_session_ids: List of session IDs to migrate | |
| new_user_id: The new user ID to assign these sessions to | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not old_session_ids or not new_user_id: | |
| logger.error("Cannot migrate sessions: missing session_ids or user_id") | |
| return False | |
| try: | |
| migrated_count = 0 | |
| for session_id in old_session_ids: | |
| try: | |
| # Update the user_id for this session | |
| response = self.supabase.table('conversations').update({ | |
| 'user_id': new_user_id, | |
| 'updated_at': datetime.utcnow().isoformat() | |
| }).eq('session_id', session_id).execute() | |
| migrated_count += 1 | |
| except Exception as e: | |
| logger.error(f"Failed to migrate session {session_id}: {e}") | |
| continue | |
| logger.info(f"Migrated {migrated_count} sessions to user {new_user_id}") | |
| return migrated_count > 0 | |
| except Exception as e: | |
| logger.error(f"Error migrating sessions to user {new_user_id}: {e}") | |
| return False | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| print("=== ConversationManager with User Authentication Test ===") | |
| try: | |
| # Initialize manager | |
| manager = ConversationManager() | |
| # Test user ID generation | |
| test_user_id = manager.generate_user_id("test_device_info", "192.168.1.100") | |
| print(f"\n1. Generated user ID: {test_user_id}") | |
| # Create a new chat for this user | |
| test_chat_id = manager.generate_chat_id(test_user_id) | |
| print(f"2. Generated chat ID: {test_chat_id}") | |
| # Add some messages | |
| msg1 = manager.add_message(test_chat_id, test_user_id, "user", "Hello, I need help with farming") | |
| print(f"3. Added user message: {msg1}") | |
| msg2 = manager.add_message(test_chat_id, test_user_id, "assistant", "Hello! I'd be happy to help you with farming. What specific questions do you have?") | |
| print(f"4. Added assistant message: {msg2}") | |
| # Test retrieving history | |
| print(f"\n5. Retrieved history for {test_chat_id}:") | |
| history = manager.get_history(test_chat_id, test_user_id) | |
| for msg in history: | |
| print(f" Message {msg.get('message_id')}: {msg.get('role')} - {msg.get('content')[:50]}...") | |
| # Test getting all sessions for this user | |
| print(f"\n6. All sessions for user {test_user_id}:") | |
| sessions = manager.get_all_chat_sessions(test_user_id) | |
| for session in sessions: | |
| print(f" {session.get('session_id')}: {session.get('title')} ({session.get('message_count')} messages)") | |
| # Test user stats | |
| print(f"\n7. User statistics:") | |
| stats = manager.get_user_stats(test_user_id) | |
| for key, value in stats.items(): | |
| print(f" {key}: {value}") | |
| # Test security - try to access with wrong user ID | |
| print(f"\n8. Security test - trying to access with wrong user ID:") | |
| wrong_user_history = manager.get_history(test_chat_id, "wrong_user_id") | |
| print(f" History with wrong user ID: {len(wrong_user_history)} messages (should be 0)") | |
| print("\n✅ All tests completed successfully!") | |
| except Exception as e: | |
| print(f"❌ Test failed: {e}") |