""" Session manager for handling user sessions. """ import asyncio import uuid from typing import Dict, Optional from config import get_settings from src.session.state import UserSessionState, generate_session_id from src.session.storage import create_storage, BaseStorage class SessionManager: """ Manages user sessions with isolation. This class ensures that each user has their own isolated session state, which is critical for multi-user environments like Hugging Face Spaces. """ def __init__(self, storage_type: str = "memory", **storage_kwargs): """ Initialize the session manager. Args: storage_type: Type of storage ("memory" or "redis") **storage_kwargs: Additional arguments for storage initialization """ self.storage: BaseStorage = create_storage(storage_type, **storage_kwargs) self._cleanup_task: Optional[asyncio.Task] = None self._lock = asyncio.Lock() # Don't start cleanup task here — no event loop may be running yet. # It will be started lazily on first get_session() call. def _start_cleanup_task(self) -> None: """Start the background cleanup task (only if inside a running event loop).""" try: loop = asyncio.get_running_loop() except RuntimeError: # No running event loop — skip, will retry on next get_session() return if self._cleanup_task is None or self._cleanup_task.done(): self._cleanup_task = loop.create_task(self._cleanup_loop()) async def _cleanup_loop(self) -> None: """Background task to clean up expired sessions.""" try: settings = get_settings() cleanup_interval = settings.session.cleanup_interval_minutes while True: try: await asyncio.sleep(cleanup_interval * 60) # Convert to seconds timeout_minutes = settings.session.timeout_minutes cleaned_count = await self.storage.cleanup_expired(timeout_minutes) if cleaned_count > 0: print(f"Cleaned up {cleaned_count} expired sessions") except Exception as e: print(f"Error in session cleanup: {e}") await asyncio.sleep(60) # Wait a minute before retrying except asyncio.CancelledError: print("Session cleanup task cancelled") raise async def get_session(self, session_id: Optional[str] = None) -> UserSessionState: """ Get or create a user session. Args: session_id: Existing session ID, or None to create new Returns: UserSessionState instance """ # Lazily start cleanup task now that we're inside a running event loop self._start_cleanup_task() async with self._lock: if session_id is None: session_id = generate_session_id() # Try to get existing session session = await self.storage.get(session_id) if session is None: # Create new session session = UserSessionState(session_id=session_id) await self.storage.set(session) print(f"Created new session: {session_id}") else: # Update activity timestamp session.update_activity() await self.storage.set(session) return session async def update_session(self, session: UserSessionState) -> None: """ Update session data in storage. Args: session: Session to update """ async with self._lock: session.update_activity() await self.storage.set(session) async def delete_session(self, session_id: str) -> None: """ Delete a session. Args: session_id: ID of session to delete """ async with self._lock: await self.storage.delete(session_id) print(f"Deleted session: {session_id}") async def cleanup_expired_sessions(self) -> int: """ Manually trigger cleanup of expired sessions. Returns: Number of sessions cleaned up """ async with self._lock: settings = get_settings() timeout_minutes = settings.session.timeout_minutes cleaned_count = await self.storage.cleanup_expired(timeout_minutes) if cleaned_count > 0: print(f"Manually cleaned up {cleaned_count} expired sessions") return cleaned_count async def get_all_sessions(self) -> Dict[str, UserSessionState]: """ Get all active sessions (for monitoring/debugging). Returns: Dictionary of session_id -> UserSessionState """ async with self._lock: return await self.storage.get_all_sessions() async def get_session_count(self) -> int: """ Get the total number of active sessions. Returns: Number of active sessions """ sessions = await self.get_all_sessions() return len(sessions) async def shutdown(self) -> None: """Shutdown the session manager and cleanup resources.""" if self._cleanup_task and not self._cleanup_task.done(): self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass print("Session manager shutdown complete") def __str__(self) -> str: """String representation for debugging.""" return f"SessionManager(storage_type={type(self.storage).__name__})" def __repr__(self) -> str: """Detailed string representation.""" return self.__str__() # Global session manager instance _session_manager: Optional[SessionManager] = None def get_session_manager() -> SessionManager: """Get or create the global session manager instance.""" global _session_manager if _session_manager is None: settings = get_settings() # Configure storage based on settings storage_kwargs = {} if settings.session.storage_type == "redis": storage_kwargs.update({ "host": settings.redis.host, "port": settings.redis.port, "db": settings.redis.db, "password": settings.redis.password, }) _session_manager = SessionManager( storage_type=settings.session.storage_type, **storage_kwargs ) return _session_manager async def create_user_session() -> UserSessionState: """ Create a new user session. Returns: New UserSessionState instance """ manager = get_session_manager() return await manager.get_session() async def get_user_session(session_id: str) -> UserSessionState: """ Get an existing user session. Args: session_id: Session ID Returns: UserSessionState instance """ manager = get_session_manager() return await manager.get_session(session_id)