dummyQuantum / session_manager.py
Apurva Tiwari
feature: sessions, init
ca961b4
"""
Session Manager
Handles all session CRUD operations with HF persistent storage.
Manages user isolation, alias resolution, and concurrent access.
"""
import uuid
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime
from hf_storage import (
write_json_safe,
read_json_safe,
get_user_dir,
get_session_dir,
list_user_session_dirs,
delete_session_dir,
ensure_session_dir,
)
from session_models import (
SessionMetadata,
SessionState,
AliasIndex,
AliasIndexEntry,
JobReference,
)
class SessionManager:
"""
Manages session lifecycle and persistence for multi-user environment.
Ensures user isolation and safe concurrent access.
"""
def __init__(self, user_id: str):
"""
Initialize session manager for a specific user.
Args:
user_id: Unique identifier for the user
"""
self.user_id = user_id
self.user_dir = get_user_dir(user_id)
ensure_session_dir()
self._alias_index = self._load_alias_index()
def _load_alias_index(self) -> AliasIndex:
"""Load or create the alias index for this user."""
index_path = self.user_dir / "aliases_index.json"
data = read_json_safe(index_path, {})
return AliasIndex.from_dict(data) if data else AliasIndex()
def _save_alias_index(self) -> bool:
"""Save the alias index to disk."""
index_path = self.user_dir / "aliases_index.json"
return write_json_safe(index_path, self._alias_index.to_dict())
def create_session(
self,
alias: str,
app_type: str,
description: str = ""
) -> tuple[str, SessionMetadata]:
"""
Create a new session.
Args:
alias: User-friendly name for the session
app_type: Type of app ("EM" or "QLBM")
description: Optional description
Returns:
Tuple of (session_id, metadata)
"""
session_id = str(uuid.uuid4())
# Create metadata
metadata = SessionMetadata(
session_id=session_id,
user_id=self.user_id,
alias=alias,
app_type=app_type,
description=description,
)
# Create empty state
state = SessionState(
session_id=session_id,
app_type=app_type,
)
# Save to disk
session_dir = get_session_dir(self.user_id, session_id)
metadata_path = session_dir / "metadata.json"
state_path = session_dir / "state.json"
write_json_safe(metadata_path, metadata.to_dict())
write_json_safe(state_path, state.to_dict())
# Update alias index
entry = AliasIndexEntry(
alias=alias,
session_id=session_id,
created_at=metadata.created_at,
last_modified=metadata.last_modified,
)
self._alias_index.add(alias, entry)
self._save_alias_index()
return session_id, metadata
def load_session(self, session_id: str) -> tuple[SessionMetadata, SessionState]:
"""
Load a session by ID.
Args:
session_id: Session to load
Returns:
Tuple of (metadata, state)
Raises:
FileNotFoundError: If session doesn't exist
"""
session_dir = get_session_dir(self.user_id, session_id)
metadata_path = session_dir / "metadata.json"
state_path = session_dir / "state.json"
if not metadata_path.exists():
raise FileNotFoundError(f"Session {session_id} not found")
metadata_data = read_json_safe(metadata_path)
state_data = read_json_safe(state_path)
metadata = SessionMetadata.from_dict(metadata_data)
state = SessionState.from_dict(state_data) if state_data else SessionState(
session_id=session_id,
app_type=metadata.app_type,
)
# Update access timestamp
metadata.last_accessed = datetime.utcnow().isoformat()
return metadata, state
def save_session(self, metadata: SessionMetadata, state: SessionState) -> bool:
"""
Save a session's state and metadata.
Args:
metadata: Session metadata
state: Session state
Returns:
True if successful
"""
session_dir = get_session_dir(self.user_id, metadata.session_id)
metadata_path = session_dir / "metadata.json"
state_path = session_dir / "state.json"
# Update timestamps
metadata.update_timestamp()
state.update_timestamp()
success = True
success &= write_json_safe(metadata_path, metadata.to_dict())
success &= write_json_safe(state_path, state.to_dict())
if success:
# Update alias index
entry = AliasIndexEntry(
alias=metadata.alias,
session_id=metadata.session_id,
created_at=metadata.created_at,
last_modified=metadata.last_modified,
)
self._alias_index.add(metadata.alias, entry)
self._save_alias_index()
return success
def get_by_alias(self, alias: str) -> List[tuple[SessionMetadata, str]]:
"""
Get all sessions matching an alias (sorted by recency).
Args:
alias: Session alias to search for
Returns:
List of (metadata, session_id) tuples, newest first
"""
entries = self._alias_index.get_by_alias(alias)
results = []
for entry in entries:
try:
metadata, _ = self.load_session(entry.session_id)
results.append((metadata, entry.session_id))
except FileNotFoundError:
# Session file was deleted, skip
pass
return results
def get_most_recent_by_alias(self, alias: str) -> Optional[tuple[SessionMetadata, str]]:
"""
Get the most recent session matching an alias.
Args:
alias: Session alias to search for
Returns:
(metadata, session_id) tuple or None if not found
"""
results = self.get_by_alias(alias)
return results[0] if results else None
def list_all_sessions(self) -> List[SessionMetadata]:
"""
List all sessions for this user (unsorted).
Returns:
List of metadata for all user's sessions
"""
session_ids = list_user_session_dirs(self.user_id)
sessions = []
for session_id in session_ids:
try:
metadata, _ = self.load_session(session_id)
sessions.append(metadata)
except FileNotFoundError:
pass
return sessions
def list_sessions_by_app(self, app_type: str) -> List[SessionMetadata]:
"""
List all sessions for a specific app type.
Args:
app_type: "EM" or "QLBM"
Returns:
List of metadata for sessions of this app type
"""
return [s for s in self.list_all_sessions() if s.app_type == app_type]
def list_sessions_sorted_recent(self, limit: Optional[int] = None) -> List[SessionMetadata]:
"""
List sessions sorted by last accessed time (most recent first).
Args:
limit: Maximum number of sessions to return
Returns:
Sorted list of session metadata
"""
sessions = self.list_all_sessions()
sessions.sort(key=lambda s: s.last_accessed, reverse=True)
return sessions[:limit] if limit else sessions
def delete_session(self, session_id: str) -> bool:
"""
Delete a session.
Args:
session_id: Session to delete
Returns:
True if successful
"""
try:
# Load metadata to get alias
metadata, _ = self.load_session(session_id)
# Remove from alias index
self._alias_index.remove(metadata.alias, session_id)
self._save_alias_index()
# Delete directory
return delete_session_dir(self.user_id, session_id)
except FileNotFoundError:
return False
def rename_session(self, session_id: str, new_alias: str) -> bool:
"""
Rename a session.
Args:
session_id: Session to rename
new_alias: New alias
Returns:
True if successful
"""
try:
metadata, state = self.load_session(session_id)
# Update alias index
old_alias = metadata.alias
self._alias_index.remove(old_alias, session_id)
# Update metadata
metadata.alias = new_alias
# Save
return self.save_session(metadata, state)
except FileNotFoundError:
return False
def add_job_to_session(
self,
session_id: str,
job_id: str,
service_type: str
) -> bool:
"""
Add a job reference to a session.
Args:
session_id: Session ID
job_id: Cloud service job ID
service_type: Service type (e.g., "qiskit_ibm", "ionq")
Returns:
True if successful
"""
try:
metadata, state = self.load_session(session_id)
job = JobReference(
job_id=job_id,
service_type=service_type,
)
state.add_job(job)
return self.save_session(metadata, state)
except FileNotFoundError:
return False
def update_job_status(
self,
session_id: str,
job_id: str,
status: str,
result: Optional[Dict[str, Any]] = None
) -> bool:
"""
Update a job's status in a session.
Args:
session_id: Session ID
job_id: Job ID
status: New status
result: Optional result data
Returns:
True if job was found and updated
"""
try:
metadata, state = self.load_session(session_id)
found = state.update_job_status(job_id, status, result)
if found:
self.save_session(metadata, state)
return found
except FileNotFoundError:
return False