Spaces:
Runtime error
Runtime error
| """ | |
| Session Manager | |
| Handles all session CRUD operations with HF persistent storage. | |
| Manages user isolation, alias resolution, and concurrent access. | |
| """ | |
| import uuid | |
| from pathlib import Path | |
| from typing import List, Optional, Dict, Any | |
| from datetime import datetime | |
| from hf_storage import ( | |
| write_json_safe, | |
| read_json_safe, | |
| get_user_dir, | |
| get_session_dir, | |
| list_user_session_dirs, | |
| delete_session_dir, | |
| ensure_session_dir, | |
| ) | |
| from session_models import ( | |
| SessionMetadata, | |
| SessionState, | |
| AliasIndex, | |
| AliasIndexEntry, | |
| JobReference, | |
| ) | |
| class SessionManager: | |
| """ | |
| Manages session lifecycle and persistence for multi-user environment. | |
| Ensures user isolation and safe concurrent access. | |
| """ | |
| def __init__(self, user_id: str): | |
| """ | |
| Initialize session manager for a specific user. | |
| Args: | |
| user_id: Unique identifier for the user | |
| """ | |
| self.user_id = user_id | |
| self.user_dir = get_user_dir(user_id) | |
| ensure_session_dir() | |
| self._alias_index = self._load_alias_index() | |
| def _load_alias_index(self) -> AliasIndex: | |
| """Load or create the alias index for this user.""" | |
| index_path = self.user_dir / "aliases_index.json" | |
| data = read_json_safe(index_path, {}) | |
| return AliasIndex.from_dict(data) if data else AliasIndex() | |
| def _save_alias_index(self) -> bool: | |
| """Save the alias index to disk.""" | |
| index_path = self.user_dir / "aliases_index.json" | |
| return write_json_safe(index_path, self._alias_index.to_dict()) | |
| def create_session( | |
| self, | |
| alias: str, | |
| app_type: str, | |
| description: str = "" | |
| ) -> tuple[str, SessionMetadata]: | |
| """ | |
| Create a new session. | |
| Args: | |
| alias: User-friendly name for the session | |
| app_type: Type of app ("EM" or "QLBM") | |
| description: Optional description | |
| Returns: | |
| Tuple of (session_id, metadata) | |
| """ | |
| session_id = str(uuid.uuid4()) | |
| # Create metadata | |
| metadata = SessionMetadata( | |
| session_id=session_id, | |
| user_id=self.user_id, | |
| alias=alias, | |
| app_type=app_type, | |
| description=description, | |
| ) | |
| # Create empty state | |
| state = SessionState( | |
| session_id=session_id, | |
| app_type=app_type, | |
| ) | |
| # Save to disk | |
| session_dir = get_session_dir(self.user_id, session_id) | |
| metadata_path = session_dir / "metadata.json" | |
| state_path = session_dir / "state.json" | |
| write_json_safe(metadata_path, metadata.to_dict()) | |
| write_json_safe(state_path, state.to_dict()) | |
| # Update alias index | |
| entry = AliasIndexEntry( | |
| alias=alias, | |
| session_id=session_id, | |
| created_at=metadata.created_at, | |
| last_modified=metadata.last_modified, | |
| ) | |
| self._alias_index.add(alias, entry) | |
| self._save_alias_index() | |
| return session_id, metadata | |
| def load_session(self, session_id: str) -> tuple[SessionMetadata, SessionState]: | |
| """ | |
| Load a session by ID. | |
| Args: | |
| session_id: Session to load | |
| Returns: | |
| Tuple of (metadata, state) | |
| Raises: | |
| FileNotFoundError: If session doesn't exist | |
| """ | |
| session_dir = get_session_dir(self.user_id, session_id) | |
| metadata_path = session_dir / "metadata.json" | |
| state_path = session_dir / "state.json" | |
| if not metadata_path.exists(): | |
| raise FileNotFoundError(f"Session {session_id} not found") | |
| metadata_data = read_json_safe(metadata_path) | |
| state_data = read_json_safe(state_path) | |
| metadata = SessionMetadata.from_dict(metadata_data) | |
| state = SessionState.from_dict(state_data) if state_data else SessionState( | |
| session_id=session_id, | |
| app_type=metadata.app_type, | |
| ) | |
| # Update access timestamp | |
| metadata.last_accessed = datetime.utcnow().isoformat() | |
| return metadata, state | |
| def save_session(self, metadata: SessionMetadata, state: SessionState) -> bool: | |
| """ | |
| Save a session's state and metadata. | |
| Args: | |
| metadata: Session metadata | |
| state: Session state | |
| Returns: | |
| True if successful | |
| """ | |
| session_dir = get_session_dir(self.user_id, metadata.session_id) | |
| metadata_path = session_dir / "metadata.json" | |
| state_path = session_dir / "state.json" | |
| # Update timestamps | |
| metadata.update_timestamp() | |
| state.update_timestamp() | |
| success = True | |
| success &= write_json_safe(metadata_path, metadata.to_dict()) | |
| success &= write_json_safe(state_path, state.to_dict()) | |
| if success: | |
| # Update alias index | |
| entry = AliasIndexEntry( | |
| alias=metadata.alias, | |
| session_id=metadata.session_id, | |
| created_at=metadata.created_at, | |
| last_modified=metadata.last_modified, | |
| ) | |
| self._alias_index.add(metadata.alias, entry) | |
| self._save_alias_index() | |
| return success | |
| def get_by_alias(self, alias: str) -> List[tuple[SessionMetadata, str]]: | |
| """ | |
| Get all sessions matching an alias (sorted by recency). | |
| Args: | |
| alias: Session alias to search for | |
| Returns: | |
| List of (metadata, session_id) tuples, newest first | |
| """ | |
| entries = self._alias_index.get_by_alias(alias) | |
| results = [] | |
| for entry in entries: | |
| try: | |
| metadata, _ = self.load_session(entry.session_id) | |
| results.append((metadata, entry.session_id)) | |
| except FileNotFoundError: | |
| # Session file was deleted, skip | |
| pass | |
| return results | |
| def get_most_recent_by_alias(self, alias: str) -> Optional[tuple[SessionMetadata, str]]: | |
| """ | |
| Get the most recent session matching an alias. | |
| Args: | |
| alias: Session alias to search for | |
| Returns: | |
| (metadata, session_id) tuple or None if not found | |
| """ | |
| results = self.get_by_alias(alias) | |
| return results[0] if results else None | |
| def list_all_sessions(self) -> List[SessionMetadata]: | |
| """ | |
| List all sessions for this user (unsorted). | |
| Returns: | |
| List of metadata for all user's sessions | |
| """ | |
| session_ids = list_user_session_dirs(self.user_id) | |
| sessions = [] | |
| for session_id in session_ids: | |
| try: | |
| metadata, _ = self.load_session(session_id) | |
| sessions.append(metadata) | |
| except FileNotFoundError: | |
| pass | |
| return sessions | |
| def list_sessions_by_app(self, app_type: str) -> List[SessionMetadata]: | |
| """ | |
| List all sessions for a specific app type. | |
| Args: | |
| app_type: "EM" or "QLBM" | |
| Returns: | |
| List of metadata for sessions of this app type | |
| """ | |
| return [s for s in self.list_all_sessions() if s.app_type == app_type] | |
| def list_sessions_sorted_recent(self, limit: Optional[int] = None) -> List[SessionMetadata]: | |
| """ | |
| List sessions sorted by last accessed time (most recent first). | |
| Args: | |
| limit: Maximum number of sessions to return | |
| Returns: | |
| Sorted list of session metadata | |
| """ | |
| sessions = self.list_all_sessions() | |
| sessions.sort(key=lambda s: s.last_accessed, reverse=True) | |
| return sessions[:limit] if limit else sessions | |
| def delete_session(self, session_id: str) -> bool: | |
| """ | |
| Delete a session. | |
| Args: | |
| session_id: Session to delete | |
| Returns: | |
| True if successful | |
| """ | |
| try: | |
| # Load metadata to get alias | |
| metadata, _ = self.load_session(session_id) | |
| # Remove from alias index | |
| self._alias_index.remove(metadata.alias, session_id) | |
| self._save_alias_index() | |
| # Delete directory | |
| return delete_session_dir(self.user_id, session_id) | |
| except FileNotFoundError: | |
| return False | |
| def rename_session(self, session_id: str, new_alias: str) -> bool: | |
| """ | |
| Rename a session. | |
| Args: | |
| session_id: Session to rename | |
| new_alias: New alias | |
| Returns: | |
| True if successful | |
| """ | |
| try: | |
| metadata, state = self.load_session(session_id) | |
| # Update alias index | |
| old_alias = metadata.alias | |
| self._alias_index.remove(old_alias, session_id) | |
| # Update metadata | |
| metadata.alias = new_alias | |
| # Save | |
| return self.save_session(metadata, state) | |
| except FileNotFoundError: | |
| return False | |
| def add_job_to_session( | |
| self, | |
| session_id: str, | |
| job_id: str, | |
| service_type: str | |
| ) -> bool: | |
| """ | |
| Add a job reference to a session. | |
| Args: | |
| session_id: Session ID | |
| job_id: Cloud service job ID | |
| service_type: Service type (e.g., "qiskit_ibm", "ionq") | |
| Returns: | |
| True if successful | |
| """ | |
| try: | |
| metadata, state = self.load_session(session_id) | |
| job = JobReference( | |
| job_id=job_id, | |
| service_type=service_type, | |
| ) | |
| state.add_job(job) | |
| return self.save_session(metadata, state) | |
| except FileNotFoundError: | |
| return False | |
| def update_job_status( | |
| self, | |
| session_id: str, | |
| job_id: str, | |
| status: str, | |
| result: Optional[Dict[str, Any]] = None | |
| ) -> bool: | |
| """ | |
| Update a job's status in a session. | |
| Args: | |
| session_id: Session ID | |
| job_id: Job ID | |
| status: New status | |
| result: Optional result data | |
| Returns: | |
| True if job was found and updated | |
| """ | |
| try: | |
| metadata, state = self.load_session(session_id) | |
| found = state.update_job_status(job_id, status, result) | |
| if found: | |
| self.save_session(metadata, state) | |
| return found | |
| except FileNotFoundError: | |
| return False | |