Spaces:
Sleeping
Sleeping
| # services/session_manager.py | |
| """ | |
| Session Manager for MasterLLM V3 Architecture | |
| REFACTORED for normalized MongoDB + S3 storage: | |
| - Sessions collection: metadata only | |
| - Messages collection: message metadata with S3 references | |
| - Full message content in S3 | |
| - Auto chat naming using Bedrock LLM | |
| - Sorted by last_activity | |
| """ | |
| import os | |
| import uuid | |
| import json | |
| import boto3 | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any, List | |
| from pymongo import MongoClient | |
| from pymongo.errors import DuplicateKeyError, ConnectionFailure | |
| from services.s3_manager import get_s3_manager | |
| from services.schemas import ( | |
| SessionSchema, MessageSchema, SessionState, MessageRole, | |
| SessionStats, MessageS3Content, schema_to_dict, datetime_to_iso | |
| ) | |
| class SessionManager: | |
| """Manages user sessions with normalized MongoDB + S3 architecture""" | |
| def __init__(self): | |
| """Initialize Session Manager""" | |
| # MongoDB connection | |
| mongodb_uri = os.getenv("MONGODB_URI") | |
| mongodb_db = os.getenv("MONGODB_DB", "masterllm") | |
| if not mongodb_uri: | |
| raise RuntimeError("MONGODB_URI environment variable not set") | |
| self.client = MongoClient(mongodb_uri, serverSelectionTimeoutMS=5000) | |
| self.db = self.client[mongodb_db] | |
| # Collections | |
| self.sessions_collection = self.db["sessions"] | |
| self.messages_collection = self.db["messages"] | |
| self.files_collection = self.db["files"] | |
| # S3 manager | |
| self.s3 = get_s3_manager() | |
| # Bedrock client for chat naming | |
| self.bedrock_runtime = boto3.client( | |
| "bedrock-runtime", | |
| region_name=os.getenv("AWS_REGION", "us-east-1") | |
| ) | |
| print(f"✅ Session Manager initialized with normalized schema") | |
| def create_session( | |
| self, | |
| user_id: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Create a new session | |
| Args: | |
| user_id: Optional user identifier | |
| metadata: Additional metadata | |
| Returns: | |
| session_id: Unique session ID | |
| """ | |
| session_id = str(uuid.uuid4()) | |
| now = datetime.utcnow() | |
| session_record = SessionSchema( | |
| session_id=session_id, | |
| user_id=user_id, | |
| created_at=now, | |
| last_activity=now, | |
| state=SessionState.INITIAL, | |
| metadata=metadata or {} | |
| ) | |
| self.sessions_collection.insert_one(schema_to_dict(session_record)) | |
| return session_id | |
| def add_message( | |
| self, | |
| session_id: str, | |
| role: str, | |
| content: str, | |
| file_data: Optional[Dict[str, Any]] = None, | |
| result: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Add message to session | |
| Stores: | |
| - Metadata in messages collection (using MongoDB _id as message_id) | |
| - Full content in S3 | |
| - Updates session last_activity | |
| Args: | |
| session_id: Session ID | |
| role: "user", "assistant", or "system" | |
| content: Message content | |
| file_data: Optional file metadata | |
| result: Optional pipeline result data (for pipeline completion messages) | |
| Returns: | |
| Message metadata dict with MongoDB _id as message_id | |
| """ | |
| now = datetime.utcnow() | |
| # Create message metadata record WITHOUT message_id (MongoDB will generate _id) | |
| content_preview = content[:200] if len(content) > 200 else content | |
| message_record = { | |
| "session_id": session_id, | |
| "role": role, | |
| "timestamp": now.isoformat() + "Z", | |
| "content_preview": content_preview, | |
| "has_file": bool(file_data and file_data.get("file_id")), | |
| "file_id": file_data.get("file_id") if file_data else None, | |
| "metadata": {} | |
| } | |
| # Add result if provided (for pipeline completion messages) | |
| if result: | |
| message_record["result"] = result | |
| # Insert message metadata and get the MongoDB _id | |
| insert_result = self.messages_collection.insert_one(message_record) | |
| message_id = str(insert_result.inserted_id) | |
| # Now store full message content in S3 using the MongoDB _id | |
| s3_key = f"sessions/{session_id}/messages/{message_id}.json" | |
| s3_bucket = self.s3.bucket_name | |
| message_s3_content = { | |
| "message_id": message_id, | |
| "role": role, | |
| "content": content, | |
| "timestamp": now.isoformat() + "Z", | |
| "file_data": file_data or {} | |
| } | |
| if result: | |
| message_s3_content["result"] = result | |
| self.s3.upload_json(s3_key, message_s3_content, add_prefix=False) | |
| # Update the message record with S3 info | |
| self.messages_collection.update_one( | |
| {"_id": insert_result.inserted_id}, | |
| { | |
| "$set": { | |
| "s3_key": s3_key, | |
| "s3_bucket": s3_bucket | |
| } | |
| } | |
| ) | |
| # Update session last_activity and message count | |
| self.sessions_collection.update_one( | |
| {"session_id": session_id}, | |
| { | |
| "$set": {"last_activity": now.isoformat() + "Z"}, | |
| "$inc": {"stats.message_count": 1} | |
| } | |
| ) | |
| # Auto-generate chat name after first user message (only if not already generated) | |
| if role == "user" and not content.lower().startswith("uploaded file"): | |
| # Check if chat name already exists to avoid unnecessary generation attempts | |
| session = self.sessions_collection.find_one( | |
| {"session_id": session_id}, | |
| {"chat_name": 1, "_id": 0} | |
| ) | |
| if session and not session.get("chat_name"): | |
| self._maybe_generate_chat_name(session_id) | |
| # Return message record with _id as message_id | |
| message_record["message_id"] = message_id | |
| return message_record | |
| def get_messages( | |
| self, | |
| session_id: str, | |
| limit: int = 50, | |
| include_content: bool = True | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get messages for a session | |
| Args: | |
| session_id: Session ID | |
| limit: Maximum number of messages | |
| include_content: Whether to fetch full content from S3 | |
| Returns: | |
| List of message dicts with _id converted to message_id | |
| """ | |
| # Get message metadata from MongoDB (include _id this time) | |
| messages = list(self.messages_collection.find( | |
| {"session_id": session_id} | |
| ).sort("timestamp", 1).limit(limit)) | |
| # Convert MongoDB _id to message_id string | |
| for msg in messages: | |
| msg["message_id"] = str(msg.pop("_id")) | |
| if not include_content: | |
| return messages | |
| # Fetch full content from S3 for each message | |
| for msg in messages: | |
| try: | |
| full_content = self.s3.download_json(msg["s3_key"], add_prefix=False) | |
| msg["content"] = full_content.get("content", msg["content_preview"]) | |
| msg["file_data"] = full_content.get("file_data", {}) | |
| # Include result if present in S3 | |
| if "result" in full_content: | |
| msg["result"] = full_content["result"] | |
| except Exception as e: | |
| # If S3 fetch fails, use preview | |
| msg["content"] = msg["content_preview"] | |
| msg["file_data"] = {} | |
| # V3 API: Add file metadata fields | |
| msg["file"] = False | |
| if msg.get("file_id"): | |
| try: | |
| file_record = self.db["files"].find_one({"file_id": msg["file_id"]}) | |
| if file_record: | |
| msg["file"] = True | |
| msg["fileName"] = file_record["file_name"] | |
| msg["fileUrl"] = file_record["presigned_url"] | |
| except Exception as e: | |
| print(f"⚠️ Failed to get file metadata: {e}") | |
| return messages | |
| def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: | |
| """Get session metadata by ID (read-only, does not update last_activity)""" | |
| session = self.sessions_collection.find_one( | |
| {"session_id": session_id}, | |
| {"_id": 0} | |
| ) | |
| return session | |
| def get_all_sessions( | |
| self, | |
| limit: int = 100, | |
| skip: int = 0 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get all sessions sorted by last_activity (descending) | |
| Args: | |
| limit: Maximum number to return | |
| skip: Number to skip (pagination) | |
| Returns: | |
| List of session records | |
| """ | |
| sessions = list(self.sessions_collection.find( | |
| {}, | |
| {"_id": 0} | |
| ).sort("last_activity", -1).skip(skip).limit(limit)) | |
| return sessions | |
| def update_session( | |
| self, | |
| session_id: str, | |
| updates: Dict[str, Any], | |
| update_activity: bool = False | |
| ) -> bool: | |
| """ | |
| Update session metadata | |
| Args: | |
| session_id: Session ID | |
| updates: Fields to update | |
| update_activity: If True, also update last_activity timestamp | |
| Returns: | |
| True if successful | |
| """ | |
| # Only update last_activity if explicitly requested | |
| if update_activity and "last_activity" not in updates: | |
| updates["last_activity"] = datetime.utcnow().isoformat() + "Z" | |
| result = self.sessions_collection.update_one( | |
| {"session_id": session_id}, | |
| {"$set": updates} | |
| ) | |
| return result.modified_count > 0 | |
| def rename_session( | |
| self, | |
| session_id: str, | |
| new_name: str | |
| ) -> bool: | |
| """ | |
| Rename a session | |
| Args: | |
| session_id: Session ID | |
| new_name: New chat name | |
| Returns: | |
| True if successful | |
| """ | |
| try: | |
| result = self.sessions_collection.update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"chat_name": new_name}} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| print(f"Error renaming session {session_id}: {e}") | |
| return False | |
| def delete_session(self, session_id: str) -> bool: | |
| """ | |
| Delete a session's metadata from MongoDB. | |
| Args: | |
| session_id: Session ID to delete | |
| Returns: | |
| True if successful | |
| """ | |
| try: | |
| result = self.sessions_collection.delete_one({"session_id": session_id}) | |
| return result.deleted_count > 0 | |
| except Exception as e: | |
| print(f"Error deleting session {session_id}: {e}") | |
| return False | |
| def _maybe_generate_chat_name(self, session_id: str): | |
| """ | |
| Auto-generate chat name after first user message | |
| Uses Bedrock LLM to generate a descriptive name | |
| """ | |
| session = self.get_session(session_id) | |
| if not session or session.get("chat_name"): | |
| return # Already has a name | |
| # Get first user message (not file upload message) | |
| messages = self.get_messages(session_id, limit=10, include_content=True) | |
| first_user_msg = None | |
| for msg in messages: | |
| if msg.get("role") == "user": | |
| content = msg.get("content", "") | |
| if not content.lower().startswith("uploaded file"): | |
| first_user_msg = content | |
| break | |
| if not first_user_msg: | |
| return | |
| # Generate name using Bedrock | |
| try: | |
| model_id = os.getenv("BEDROCK_MODEL_ID", "anthropic.claude-3-haiku-20240307-v1:0") | |
| prompt = ( | |
| "Create a succinct, descriptive 3-6 word title for this chat session based on the first user message.\n" | |
| "Return only the title, without quotes.\n\n" | |
| f"First message: {first_user_msg}" | |
| ) | |
| body = { | |
| "anthropic_version": "bedrock-2023-05-31", | |
| "max_tokens": 48, | |
| "temperature": 0.2, | |
| "messages": [{"role": "user", "content": [{"type": "text", "text": prompt}]}] | |
| } | |
| response = self.bedrock_runtime.invoke_model( | |
| modelId=model_id, | |
| accept="application/json", | |
| contentType="application/json", | |
| body=json.dumps(body) | |
| ) | |
| response_body = json.loads(response["body"].read()) | |
| title = "" | |
| if isinstance(response_body.get("content"), list) and response_body["content"]: | |
| part = response_body["content"][0] | |
| if isinstance(part, dict): | |
| title = part.get("text", "") | |
| title = title.strip().strip('"').strip() or "New Chat" | |
| # Save to session | |
| self.sessions_collection.update_one( | |
| {"session_id": session_id}, | |
| { | |
| "$set": { | |
| "chat_name": title[:100], | |
| "chat_name_generated_at": datetime.utcnow().isoformat() + "Z", | |
| "chat_name_model": model_id | |
| } | |
| } | |
| ) | |
| except Exception as e: | |
| # Don't block flow on naming errors | |
| print(f"⚠️ Failed to generate chat name: {e}") | |
| def get_all_session_ids(self) -> List[str]: | |
| """Get list of all session IDs""" | |
| sessions = self.sessions_collection.find({}, {"session_id": 1, "_id": 0}) | |
| return [s["session_id"] for s in sessions] | |
| def get_session_history( | |
| self, | |
| session_id: str, | |
| limit: int = 50 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get conversation history for a session | |
| Args: | |
| session_id: Session ID | |
| limit: Maximum messages to return | |
| Returns: | |
| List of messages with full content | |
| """ | |
| return self.get_messages(session_id, limit=limit, include_content=True) | |
| def get_session_stats(self, session_id: str) -> Dict[str, Any]: | |
| """Get session statistics""" | |
| session = self.get_session(session_id) | |
| if not session: | |
| return {} | |
| # Count messages | |
| message_count = self.messages_collection.count_documents({"session_id": session_id}) | |
| return { | |
| "session_id": session_id, | |
| "created_at": session.get("created_at"), | |
| "last_activity": session.get("last_activity"), | |
| "chat_name": session.get("chat_name"), | |
| "state": session.get("state"), | |
| "message_count": message_count, | |
| "stats": session.get("stats", {}) | |
| } | |
| def save_pipeline_execution( | |
| self, | |
| session_id: str, | |
| pipeline: Dict[str, Any], | |
| result: Dict[str, Any], | |
| file_path: Optional[str] = None, | |
| executor: str = "unknown" | |
| ) -> bool: | |
| """ | |
| DEPRECATED: Use pipeline_manager instead | |
| This method is kept for backwards compatibility | |
| """ | |
| print("⚠️ save_pipeline_execution is deprecated. Use pipeline_manager instead.") | |
| return True | |
| def get_all_sessions(self, limit: int = 100) -> List[Dict[str, Any]]: | |
| """ | |
| Get all sessions sorted by last_activity | |
| Includes current_file metadata | |
| """ | |
| sessions = list(self.sessions_collection.find( | |
| {}, | |
| {"_id": 0} | |
| ).sort("last_activity", -1).limit(limit)) | |
| # Add current file info to each session | |
| for session in sessions: | |
| session["file"] = False | |
| if session.get("current_file_id"): | |
| try: | |
| file_record = self.db["files"].find_one({"file_id": session["current_file_id"]}) | |
| if file_record: | |
| session["current_file"] = { | |
| "file_id": file_record["file_id"], | |
| "file_name": file_record["file_name"], | |
| "file_url": file_record["presigned_url"], | |
| "uploaded_at": file_record["uploaded_at"] | |
| } | |
| session["file"] = True | |
| except Exception as e: | |
| print(f"⚠️ Failed to get file info: {e}") | |
| return sessions | |
| def close(self): | |
| """Close MongoDB connection""" | |
| if self.client: | |
| self.client.close() | |
| print("🔒 MongoDB connection closed") | |
| # Global session manager instance | |
| session_manager = SessionManager() | |