# services/session_manager.py """ Session Manager for MasterLLM V3 Architecture REFACTORED for normalized MongoDB + S3 storage: - Sessions collection: metadata only - Messages collection: message metadata with S3 references - Full message content in S3 - Auto chat naming using Bedrock LLM - Sorted by last_activity """ import os import uuid import json import boto3 from datetime import datetime from typing import Optional, Dict, Any, List from pymongo import MongoClient from pymongo.errors import DuplicateKeyError, ConnectionFailure from services.s3_manager import get_s3_manager from services.schemas import ( SessionSchema, MessageSchema, SessionState, MessageRole, SessionStats, MessageS3Content, schema_to_dict, datetime_to_iso ) class SessionManager: """Manages user sessions with normalized MongoDB + S3 architecture""" def __init__(self): """Initialize Session Manager""" # MongoDB connection mongodb_uri = os.getenv("MONGODB_URI") mongodb_db = os.getenv("MONGODB_DB", "masterllm") if not mongodb_uri: raise RuntimeError("MONGODB_URI environment variable not set") self.client = MongoClient(mongodb_uri, serverSelectionTimeoutMS=5000) self.db = self.client[mongodb_db] # Collections self.sessions_collection = self.db["sessions"] self.messages_collection = self.db["messages"] self.files_collection = self.db["files"] # S3 manager self.s3 = get_s3_manager() # Bedrock client for chat naming self.bedrock_runtime = boto3.client( "bedrock-runtime", region_name=os.getenv("AWS_REGION", "us-east-1") ) print(f"✅ Session Manager initialized with normalized schema") def create_session( self, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> str: """ Create a new session Args: user_id: Optional user identifier metadata: Additional metadata Returns: session_id: Unique session ID """ session_id = str(uuid.uuid4()) now = datetime.utcnow() session_record = SessionSchema( session_id=session_id, user_id=user_id, created_at=now, last_activity=now, state=SessionState.INITIAL, metadata=metadata or {} ) self.sessions_collection.insert_one(schema_to_dict(session_record)) return session_id def add_message( self, session_id: str, role: str, content: str, file_data: Optional[Dict[str, Any]] = None, result: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Add message to session Stores: - Metadata in messages collection (using MongoDB _id as message_id) - Full content in S3 - Updates session last_activity Args: session_id: Session ID role: "user", "assistant", or "system" content: Message content file_data: Optional file metadata result: Optional pipeline result data (for pipeline completion messages) Returns: Message metadata dict with MongoDB _id as message_id """ now = datetime.utcnow() # Create message metadata record WITHOUT message_id (MongoDB will generate _id) content_preview = content[:200] if len(content) > 200 else content message_record = { "session_id": session_id, "role": role, "timestamp": now.isoformat() + "Z", "content_preview": content_preview, "has_file": bool(file_data and file_data.get("file_id")), "file_id": file_data.get("file_id") if file_data else None, "metadata": {} } # Add result if provided (for pipeline completion messages) if result: message_record["result"] = result # Insert message metadata and get the MongoDB _id insert_result = self.messages_collection.insert_one(message_record) message_id = str(insert_result.inserted_id) # Now store full message content in S3 using the MongoDB _id s3_key = f"sessions/{session_id}/messages/{message_id}.json" s3_bucket = self.s3.bucket_name message_s3_content = { "message_id": message_id, "role": role, "content": content, "timestamp": now.isoformat() + "Z", "file_data": file_data or {} } if result: message_s3_content["result"] = result self.s3.upload_json(s3_key, message_s3_content, add_prefix=False) # Update the message record with S3 info self.messages_collection.update_one( {"_id": insert_result.inserted_id}, { "$set": { "s3_key": s3_key, "s3_bucket": s3_bucket } } ) # Update session last_activity and message count self.sessions_collection.update_one( {"session_id": session_id}, { "$set": {"last_activity": now.isoformat() + "Z"}, "$inc": {"stats.message_count": 1} } ) # Auto-generate chat name after first user message (only if not already generated) if role == "user" and not content.lower().startswith("uploaded file"): # Check if chat name already exists to avoid unnecessary generation attempts session = self.sessions_collection.find_one( {"session_id": session_id}, {"chat_name": 1, "_id": 0} ) if session and not session.get("chat_name"): self._maybe_generate_chat_name(session_id) # Return message record with _id as message_id message_record["message_id"] = message_id return message_record def get_messages( self, session_id: str, limit: int = 50, include_content: bool = True ) -> List[Dict[str, Any]]: """ Get messages for a session Args: session_id: Session ID limit: Maximum number of messages include_content: Whether to fetch full content from S3 Returns: List of message dicts with _id converted to message_id """ # Get message metadata from MongoDB (include _id this time) messages = list(self.messages_collection.find( {"session_id": session_id} ).sort("timestamp", 1).limit(limit)) # Convert MongoDB _id to message_id string for msg in messages: msg["message_id"] = str(msg.pop("_id")) if not include_content: return messages # Fetch full content from S3 for each message for msg in messages: try: full_content = self.s3.download_json(msg["s3_key"], add_prefix=False) msg["content"] = full_content.get("content", msg["content_preview"]) msg["file_data"] = full_content.get("file_data", {}) # Include result if present in S3 if "result" in full_content: msg["result"] = full_content["result"] except Exception as e: # If S3 fetch fails, use preview msg["content"] = msg["content_preview"] msg["file_data"] = {} # V3 API: Add file metadata fields msg["file"] = False if msg.get("file_id"): try: file_record = self.db["files"].find_one({"file_id": msg["file_id"]}) if file_record: msg["file"] = True msg["fileName"] = file_record["file_name"] msg["fileUrl"] = file_record["presigned_url"] except Exception as e: print(f"⚠️ Failed to get file metadata: {e}") return messages def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Get session metadata by ID (read-only, does not update last_activity)""" session = self.sessions_collection.find_one( {"session_id": session_id}, {"_id": 0} ) return session def get_all_sessions( self, limit: int = 100, skip: int = 0 ) -> List[Dict[str, Any]]: """ Get all sessions sorted by last_activity (descending) Args: limit: Maximum number to return skip: Number to skip (pagination) Returns: List of session records """ sessions = list(self.sessions_collection.find( {}, {"_id": 0} ).sort("last_activity", -1).skip(skip).limit(limit)) return sessions def update_session( self, session_id: str, updates: Dict[str, Any], update_activity: bool = False ) -> bool: """ Update session metadata Args: session_id: Session ID updates: Fields to update update_activity: If True, also update last_activity timestamp Returns: True if successful """ # Only update last_activity if explicitly requested if update_activity and "last_activity" not in updates: updates["last_activity"] = datetime.utcnow().isoformat() + "Z" result = self.sessions_collection.update_one( {"session_id": session_id}, {"$set": updates} ) return result.modified_count > 0 def rename_session( self, session_id: str, new_name: str ) -> bool: """ Rename a session Args: session_id: Session ID new_name: New chat name Returns: True if successful """ try: result = self.sessions_collection.update_one( {"session_id": session_id}, {"$set": {"chat_name": new_name}} ) return result.modified_count > 0 except Exception as e: print(f"Error renaming session {session_id}: {e}") return False def delete_session(self, session_id: str) -> bool: """ Delete a session's metadata from MongoDB. Args: session_id: Session ID to delete Returns: True if successful """ try: result = self.sessions_collection.delete_one({"session_id": session_id}) return result.deleted_count > 0 except Exception as e: print(f"Error deleting session {session_id}: {e}") return False def _maybe_generate_chat_name(self, session_id: str): """ Auto-generate chat name after first user message Uses Bedrock LLM to generate a descriptive name """ session = self.get_session(session_id) if not session or session.get("chat_name"): return # Already has a name # Get first user message (not file upload message) messages = self.get_messages(session_id, limit=10, include_content=True) first_user_msg = None for msg in messages: if msg.get("role") == "user": content = msg.get("content", "") if not content.lower().startswith("uploaded file"): first_user_msg = content break if not first_user_msg: return # Generate name using Bedrock try: model_id = os.getenv("BEDROCK_MODEL_ID", "anthropic.claude-3-haiku-20240307-v1:0") prompt = ( "Create a succinct, descriptive 3-6 word title for this chat session based on the first user message.\n" "Return only the title, without quotes.\n\n" f"First message: {first_user_msg}" ) body = { "anthropic_version": "bedrock-2023-05-31", "max_tokens": 48, "temperature": 0.2, "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}] } response = self.bedrock_runtime.invoke_model( modelId=model_id, accept="application/json", contentType="application/json", body=json.dumps(body) ) response_body = json.loads(response["body"].read()) title = "" if isinstance(response_body.get("content"), list) and response_body["content"]: part = response_body["content"][0] if isinstance(part, dict): title = part.get("text", "") title = title.strip().strip('"').strip() or "New Chat" # Save to session self.sessions_collection.update_one( {"session_id": session_id}, { "$set": { "chat_name": title[:100], "chat_name_generated_at": datetime.utcnow().isoformat() + "Z", "chat_name_model": model_id } } ) except Exception as e: # Don't block flow on naming errors print(f"⚠️ Failed to generate chat name: {e}") def get_all_session_ids(self) -> List[str]: """Get list of all session IDs""" sessions = self.sessions_collection.find({}, {"session_id": 1, "_id": 0}) return [s["session_id"] for s in sessions] def get_session_history( self, session_id: str, limit: int = 50 ) -> List[Dict[str, Any]]: """ Get conversation history for a session Args: session_id: Session ID limit: Maximum messages to return Returns: List of messages with full content """ return self.get_messages(session_id, limit=limit, include_content=True) def get_session_stats(self, session_id: str) -> Dict[str, Any]: """Get session statistics""" session = self.get_session(session_id) if not session: return {} # Count messages message_count = self.messages_collection.count_documents({"session_id": session_id}) return { "session_id": session_id, "created_at": session.get("created_at"), "last_activity": session.get("last_activity"), "chat_name": session.get("chat_name"), "state": session.get("state"), "message_count": message_count, "stats": session.get("stats", {}) } def save_pipeline_execution( self, session_id: str, pipeline: Dict[str, Any], result: Dict[str, Any], file_path: Optional[str] = None, executor: str = "unknown" ) -> bool: """ DEPRECATED: Use pipeline_manager instead This method is kept for backwards compatibility """ print("⚠️ save_pipeline_execution is deprecated. Use pipeline_manager instead.") return True def get_all_sessions(self, limit: int = 100) -> List[Dict[str, Any]]: """ Get all sessions sorted by last_activity Includes current_file metadata """ sessions = list(self.sessions_collection.find( {}, {"_id": 0} ).sort("last_activity", -1).limit(limit)) # Add current file info to each session for session in sessions: session["file"] = False if session.get("current_file_id"): try: file_record = self.db["files"].find_one({"file_id": session["current_file_id"]}) if file_record: session["current_file"] = { "file_id": file_record["file_id"], "file_name": file_record["file_name"], "file_url": file_record["presigned_url"], "uploaded_at": file_record["uploaded_at"] } session["file"] = True except Exception as e: print(f"⚠️ Failed to get file info: {e}") return sessions def close(self): """Close MongoDB connection""" if self.client: self.client.close() print("🔒 MongoDB connection closed") # Global session manager instance session_manager = SessionManager()