Spaces:
Sleeping
Sleeping
| """ | |
| Session Manager for JOBObike Chatbot | |
| Handles chat history persistence using Firebase Firestore | |
| """ | |
| import uuid | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional, Any | |
| from tools.firebase_config import db | |
| class SessionManager: | |
| """Manages chat sessions and history using Firebase Firestore""" | |
| def __init__(self, collection_name: str = "chat_sessions"): | |
| """ | |
| Initialize the session manager | |
| Args: | |
| collection_name: Name of the Firestore collection to store sessions | |
| """ | |
| self.collection_name = collection_name | |
| self.sessions_collection = db.collection(collection_name) if db else None | |
| def create_session(self, user_id: Optional[str] = None) -> str: | |
| """ | |
| Create a new chat session | |
| Args: | |
| user_id: Optional user identifier | |
| Returns: | |
| Session ID | |
| """ | |
| if not self.sessions_collection: | |
| return str(uuid.uuid4()) | |
| session_id = str(uuid.uuid4()) | |
| session_data = { | |
| "session_id": session_id, | |
| "user_id": user_id or "anonymous", | |
| "created_at": datetime.utcnow(), | |
| "last_active": datetime.utcnow(), | |
| "history": [], | |
| "expired": False | |
| } | |
| try: | |
| self.sessions_collection.document(session_id).set(session_data) | |
| return session_id | |
| except Exception as e: | |
| print(f"Warning: Failed to create session in Firestore: {e}") | |
| return session_id | |
| def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve a session by ID | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| Session data or None if not found | |
| """ | |
| if not self.sessions_collection: | |
| return None | |
| try: | |
| doc = self.sessions_collection.document(session_id).get() | |
| if doc.exists: | |
| session_data = doc.to_dict() | |
| # Convert timestamp strings back to datetime objects | |
| if "created_at" in session_data and isinstance(session_data["created_at"], str): | |
| session_data["created_at"] = datetime.fromisoformat(session_data["created_at"].replace("Z", "+00:00")) | |
| if "last_active" in session_data and isinstance(session_data["last_active"], str): | |
| session_data["last_active"] = datetime.fromisoformat(session_data["last_active"].replace("Z", "+00:00")) | |
| return session_data | |
| return None | |
| except Exception as e: | |
| print(f"Warning: Failed to retrieve session from Firestore: {e}") | |
| return None | |
| def add_message_to_history(self, session_id: str, role: str, content: str) -> bool: | |
| """ | |
| Add a message to the chat history | |
| Args: | |
| session_id: Session identifier | |
| role: Role of the message sender (user/assistant) | |
| content: Message content | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not self.sessions_collection: | |
| return False | |
| try: | |
| # Get current session data | |
| session_doc = self.sessions_collection.document(session_id) | |
| session_data = session_doc.get().to_dict() | |
| if not session_data: | |
| return False | |
| # Add new message to history | |
| message = { | |
| "role": role, | |
| "content": content, | |
| "timestamp": datetime.utcnow() | |
| } | |
| # Update session data | |
| session_data["history"].append(message) | |
| session_data["last_active"] = datetime.utcnow() | |
| # Keep only the last 20 messages to prevent document bloat | |
| if len(session_data["history"]) > 20: | |
| session_data["history"] = session_data["history"][-20:] | |
| # Update in Firestore | |
| session_doc.update({ | |
| "history": session_data["history"], | |
| "last_active": session_data["last_active"] | |
| }) | |
| return True | |
| except Exception as e: | |
| print(f"Warning: Failed to add message to session history: {e}") | |
| return False | |
| def get_session_history(self, session_id: str) -> List[Dict[str, str]]: | |
| """ | |
| Get the chat history for a session | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| List of message dictionaries | |
| """ | |
| session_data = self.get_session(session_id) | |
| if session_data and "history" in session_data: | |
| # Return only role and content for each message | |
| return [{"role": msg["role"], "content": msg["content"]} | |
| for msg in session_data["history"]] | |
| return [] | |
| def cleanup_expired_sessions(self, expiry_hours: int = 24) -> int: | |
| """ | |
| Clean up expired sessions | |
| Args: | |
| expiry_hours: Number of hours after which sessions expire | |
| Returns: | |
| Number of sessions cleaned up | |
| """ | |
| if not self.sessions_collection: | |
| return 0 | |
| try: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=expiry_hours) | |
| expired_sessions = self.sessions_collection.where( | |
| "last_active", "<", cutoff_time | |
| ).where("expired", "==", False).stream() | |
| count = 0 | |
| for session in expired_sessions: | |
| self.sessions_collection.document(session.id).update({ | |
| "expired": True | |
| }) | |
| count += 1 | |
| return count | |
| except Exception as e: | |
| print(f"Warning: Failed to clean up expired sessions: {e}") | |
| return 0 | |
| # Global session manager instance | |
| session_manager = SessionManager() |