""" Session Manager Handles all session CRUD operations with HF persistent storage. Manages user isolation, alias resolution, and concurrent access. """ import uuid from pathlib import Path from typing import List, Optional, Dict, Any from datetime import datetime from hf_storage import ( write_json_safe, read_json_safe, get_user_dir, get_session_dir, list_user_session_dirs, delete_session_dir, ensure_session_dir, ) from session_models import ( SessionMetadata, SessionState, AliasIndex, AliasIndexEntry, JobReference, ) class SessionManager: """ Manages session lifecycle and persistence for multi-user environment. Ensures user isolation and safe concurrent access. """ def __init__(self, user_id: str): """ Initialize session manager for a specific user. Args: user_id: Unique identifier for the user """ self.user_id = user_id self.user_dir = get_user_dir(user_id) ensure_session_dir() self._alias_index = self._load_alias_index() def _load_alias_index(self) -> AliasIndex: """Load or create the alias index for this user.""" index_path = self.user_dir / "aliases_index.json" data = read_json_safe(index_path, {}) return AliasIndex.from_dict(data) if data else AliasIndex() def _save_alias_index(self) -> bool: """Save the alias index to disk.""" index_path = self.user_dir / "aliases_index.json" return write_json_safe(index_path, self._alias_index.to_dict()) def create_session( self, alias: str, app_type: str, description: str = "" ) -> tuple[str, SessionMetadata]: """ Create a new session. Args: alias: User-friendly name for the session app_type: Type of app ("EM" or "QLBM") description: Optional description Returns: Tuple of (session_id, metadata) """ session_id = str(uuid.uuid4()) # Create metadata metadata = SessionMetadata( session_id=session_id, user_id=self.user_id, alias=alias, app_type=app_type, description=description, ) # Create empty state state = SessionState( session_id=session_id, app_type=app_type, ) # Save to disk session_dir = get_session_dir(self.user_id, session_id) metadata_path = session_dir / "metadata.json" state_path = session_dir / "state.json" write_json_safe(metadata_path, metadata.to_dict()) write_json_safe(state_path, state.to_dict()) # Update alias index entry = AliasIndexEntry( alias=alias, session_id=session_id, created_at=metadata.created_at, last_modified=metadata.last_modified, ) self._alias_index.add(alias, entry) self._save_alias_index() return session_id, metadata def load_session(self, session_id: str) -> tuple[SessionMetadata, SessionState]: """ Load a session by ID. Args: session_id: Session to load Returns: Tuple of (metadata, state) Raises: FileNotFoundError: If session doesn't exist """ session_dir = get_session_dir(self.user_id, session_id) metadata_path = session_dir / "metadata.json" state_path = session_dir / "state.json" if not metadata_path.exists(): raise FileNotFoundError(f"Session {session_id} not found") metadata_data = read_json_safe(metadata_path) state_data = read_json_safe(state_path) metadata = SessionMetadata.from_dict(metadata_data) state = SessionState.from_dict(state_data) if state_data else SessionState( session_id=session_id, app_type=metadata.app_type, ) # Update access timestamp metadata.last_accessed = datetime.utcnow().isoformat() return metadata, state def save_session(self, metadata: SessionMetadata, state: SessionState) -> bool: """ Save a session's state and metadata. Args: metadata: Session metadata state: Session state Returns: True if successful """ session_dir = get_session_dir(self.user_id, metadata.session_id) metadata_path = session_dir / "metadata.json" state_path = session_dir / "state.json" # Update timestamps metadata.update_timestamp() state.update_timestamp() success = True success &= write_json_safe(metadata_path, metadata.to_dict()) success &= write_json_safe(state_path, state.to_dict()) if success: # Update alias index entry = AliasIndexEntry( alias=metadata.alias, session_id=metadata.session_id, created_at=metadata.created_at, last_modified=metadata.last_modified, ) self._alias_index.add(metadata.alias, entry) self._save_alias_index() return success def get_by_alias(self, alias: str) -> List[tuple[SessionMetadata, str]]: """ Get all sessions matching an alias (sorted by recency). Args: alias: Session alias to search for Returns: List of (metadata, session_id) tuples, newest first """ entries = self._alias_index.get_by_alias(alias) results = [] for entry in entries: try: metadata, _ = self.load_session(entry.session_id) results.append((metadata, entry.session_id)) except FileNotFoundError: # Session file was deleted, skip pass return results def get_most_recent_by_alias(self, alias: str) -> Optional[tuple[SessionMetadata, str]]: """ Get the most recent session matching an alias. Args: alias: Session alias to search for Returns: (metadata, session_id) tuple or None if not found """ results = self.get_by_alias(alias) return results[0] if results else None def list_all_sessions(self) -> List[SessionMetadata]: """ List all sessions for this user (unsorted). Returns: List of metadata for all user's sessions """ session_ids = list_user_session_dirs(self.user_id) sessions = [] for session_id in session_ids: try: metadata, _ = self.load_session(session_id) sessions.append(metadata) except FileNotFoundError: pass return sessions def list_sessions_by_app(self, app_type: str) -> List[SessionMetadata]: """ List all sessions for a specific app type. Args: app_type: "EM" or "QLBM" Returns: List of metadata for sessions of this app type """ return [s for s in self.list_all_sessions() if s.app_type == app_type] def list_sessions_sorted_recent(self, limit: Optional[int] = None) -> List[SessionMetadata]: """ List sessions sorted by last accessed time (most recent first). Args: limit: Maximum number of sessions to return Returns: Sorted list of session metadata """ sessions = self.list_all_sessions() sessions.sort(key=lambda s: s.last_accessed, reverse=True) return sessions[:limit] if limit else sessions def delete_session(self, session_id: str) -> bool: """ Delete a session. Args: session_id: Session to delete Returns: True if successful """ try: # Load metadata to get alias metadata, _ = self.load_session(session_id) # Remove from alias index self._alias_index.remove(metadata.alias, session_id) self._save_alias_index() # Delete directory return delete_session_dir(self.user_id, session_id) except FileNotFoundError: return False def rename_session(self, session_id: str, new_alias: str) -> bool: """ Rename a session. Args: session_id: Session to rename new_alias: New alias Returns: True if successful """ try: metadata, state = self.load_session(session_id) # Update alias index old_alias = metadata.alias self._alias_index.remove(old_alias, session_id) # Update metadata metadata.alias = new_alias # Save return self.save_session(metadata, state) except FileNotFoundError: return False def add_job_to_session( self, session_id: str, job_id: str, service_type: str ) -> bool: """ Add a job reference to a session. Args: session_id: Session ID job_id: Cloud service job ID service_type: Service type (e.g., "qiskit_ibm", "ionq") Returns: True if successful """ try: metadata, state = self.load_session(session_id) job = JobReference( job_id=job_id, service_type=service_type, ) state.add_job(job) return self.save_session(metadata, state) except FileNotFoundError: return False def update_job_status( self, session_id: str, job_id: str, status: str, result: Optional[Dict[str, Any]] = None ) -> bool: """ Update a job's status in a session. Args: session_id: Session ID job_id: Job ID status: New status result: Optional result data Returns: True if job was found and updated """ try: metadata, state = self.load_session(session_id) found = state.update_job_status(job_id, status, result) if found: self.save_session(metadata, state) return found except FileNotFoundError: return False