LP_2-test / src /session /manager.py
DocUA's picture
Clean deployment without large index files
461adca
raw
history blame
7.62 kB
"""
Session manager for handling user sessions.
"""
import asyncio
import uuid
from typing import Dict, Optional
from config import get_settings
from src.session.state import UserSessionState, generate_session_id
from src.session.storage import create_storage, BaseStorage
class SessionManager:
"""
Manages user sessions with isolation.
This class ensures that each user has their own isolated session state,
which is critical for multi-user environments like Hugging Face Spaces.
"""
def __init__(self, storage_type: str = "memory", **storage_kwargs):
"""
Initialize the session manager.
Args:
storage_type: Type of storage ("memory" or "redis")
**storage_kwargs: Additional arguments for storage initialization
"""
self.storage: BaseStorage = create_storage(storage_type, **storage_kwargs)
self._cleanup_task: Optional[asyncio.Task] = None
self._lock = asyncio.Lock()
# Don't start cleanup task here — no event loop may be running yet.
# It will be started lazily on first get_session() call.
def _start_cleanup_task(self) -> None:
"""Start the background cleanup task (only if inside a running event loop)."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running event loop — skip, will retry on next get_session()
return
if self._cleanup_task is None or self._cleanup_task.done():
self._cleanup_task = loop.create_task(self._cleanup_loop())
async def _cleanup_loop(self) -> None:
"""Background task to clean up expired sessions."""
try:
settings = get_settings()
cleanup_interval = settings.session.cleanup_interval_minutes
while True:
try:
await asyncio.sleep(cleanup_interval * 60) # Convert to seconds
timeout_minutes = settings.session.timeout_minutes
cleaned_count = await self.storage.cleanup_expired(timeout_minutes)
if cleaned_count > 0:
print(f"Cleaned up {cleaned_count} expired sessions")
except Exception as e:
print(f"Error in session cleanup: {e}")
await asyncio.sleep(60) # Wait a minute before retrying
except asyncio.CancelledError:
print("Session cleanup task cancelled")
raise
async def get_session(self, session_id: Optional[str] = None) -> UserSessionState:
"""
Get or create a user session.
Args:
session_id: Existing session ID, or None to create new
Returns:
UserSessionState instance
"""
# Lazily start cleanup task now that we're inside a running event loop
self._start_cleanup_task()
async with self._lock:
if session_id is None:
session_id = generate_session_id()
# Try to get existing session
session = await self.storage.get(session_id)
if session is None:
# Create new session
session = UserSessionState(session_id=session_id)
await self.storage.set(session)
print(f"Created new session: {session_id}")
else:
# Update activity timestamp
session.update_activity()
await self.storage.set(session)
return session
async def update_session(self, session: UserSessionState) -> None:
"""
Update session data in storage.
Args:
session: Session to update
"""
async with self._lock:
session.update_activity()
await self.storage.set(session)
async def delete_session(self, session_id: str) -> None:
"""
Delete a session.
Args:
session_id: ID of session to delete
"""
async with self._lock:
await self.storage.delete(session_id)
print(f"Deleted session: {session_id}")
async def cleanup_expired_sessions(self) -> int:
"""
Manually trigger cleanup of expired sessions.
Returns:
Number of sessions cleaned up
"""
async with self._lock:
settings = get_settings()
timeout_minutes = settings.session.timeout_minutes
cleaned_count = await self.storage.cleanup_expired(timeout_minutes)
if cleaned_count > 0:
print(f"Manually cleaned up {cleaned_count} expired sessions")
return cleaned_count
async def get_all_sessions(self) -> Dict[str, UserSessionState]:
"""
Get all active sessions (for monitoring/debugging).
Returns:
Dictionary of session_id -> UserSessionState
"""
async with self._lock:
return await self.storage.get_all_sessions()
async def get_session_count(self) -> int:
"""
Get the total number of active sessions.
Returns:
Number of active sessions
"""
sessions = await self.get_all_sessions()
return len(sessions)
async def shutdown(self) -> None:
"""Shutdown the session manager and cleanup resources."""
if self._cleanup_task and not self._cleanup_task.done():
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
print("Session manager shutdown complete")
def __str__(self) -> str:
"""String representation for debugging."""
return f"SessionManager(storage_type={type(self.storage).__name__})"
def __repr__(self) -> str:
"""Detailed string representation."""
return self.__str__()
# Global session manager instance
_session_manager: Optional[SessionManager] = None
def get_session_manager() -> SessionManager:
"""Get or create the global session manager instance."""
global _session_manager
if _session_manager is None:
settings = get_settings()
# Configure storage based on settings
storage_kwargs = {}
if settings.session.storage_type == "redis":
storage_kwargs.update({
"host": settings.redis.host,
"port": settings.redis.port,
"db": settings.redis.db,
"password": settings.redis.password,
})
_session_manager = SessionManager(
storage_type=settings.session.storage_type,
**storage_kwargs
)
return _session_manager
async def create_user_session() -> UserSessionState:
"""
Create a new user session.
Returns:
New UserSessionState instance
"""
manager = get_session_manager()
return await manager.get_session()
async def get_user_session(session_id: str) -> UserSessionState:
"""
Get an existing user session.
Args:
session_id: Session ID
Returns:
UserSessionState instance
"""
manager = get_session_manager()
return await manager.get_session(session_id)