""" Auto-Save Utilities Provides decorators and context managers for automatic session saving after long-running operations like job submissions. """ import functools import threading import time from typing import Callable, Optional, Any, Dict from datetime import datetime, timedelta from session_integration import SessionIntegration class AutoSaveManager: """ Manages periodic auto-saving of session state. """ def __init__( self, session_integration: SessionIntegration, trame_state: Any, interval_seconds: int = 30 ): """ Initialize auto-save manager. Args: session_integration: SessionIntegration instance trame_state: Trame server state interval_seconds: Interval between auto-saves """ self.session_integration = session_integration self.trame_state = trame_state self.interval = interval_seconds self.thread: Optional[threading.Thread] = None self.running = False self.last_save = datetime.utcnow() def start(self) -> None: """Start the auto-save thread.""" if self.running: return self.running = True self.thread = threading.Thread(target=self._auto_save_loop, daemon=True) self.thread.name = "SessionAutoSaveThread" self.thread.start() def stop(self) -> None: """Stop the auto-save thread.""" self.running = False if self.thread: self.thread.join(timeout=5) def _auto_save_loop(self) -> None: """Main loop for auto-save thread.""" while self.running: try: time.sleep(self.interval) if self.session_integration.auto_save_enabled: self.session_integration._capture_trame_state(self.trame_state) self.session_integration.save_current_session() self.last_save = datetime.utcnow() except Exception as e: print(f"Auto-save error: {e}") def manual_save(self) -> bool: """Manually trigger a save.""" if not self.session_integration.auto_save_enabled: return False self.session_integration._capture_trame_state(self.trame_state) result = self.session_integration.save_current_session() if result: self.last_save = datetime.utcnow() return result def auto_save_after_operation( session_integration: SessionIntegration, trame_state: Any ) -> Callable: """ Decorator that automatically saves session after a function completes. Usage: @auto_save_after_operation(session_integration, state) def run_long_job(): # Do work pass """ def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs): try: result = func(*args, **kwargs) finally: if session_integration.auto_save_enabled: session_integration._capture_trame_state(trame_state) session_integration.save_current_session() return result return wrapper return decorator def save_on_job_submit( session_integration: SessionIntegration, trame_state: Any, job_id_key: str = "job_id", service_type: str = "unknown" ) -> Callable: """ Decorator that tracks job submission and saves session. Usage: @save_on_job_submit(session_integration, state, job_id_key="job_id", service_type="qiskit_ibm") def submit_to_ibm(): job_id = "..." return job_id """ def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs): result = func(*args, **kwargs) # Extract job ID from result job_id = None if isinstance(result, str): job_id = result elif isinstance(result, dict) and job_id_key in result: job_id = result[job_id_key] # Track job in session if job_id: session_integration.add_job(job_id, service_type) print(f"Job {job_id} tracked in session {session_integration.current_session_id}") # Save session if session_integration.auto_save_enabled: session_integration._capture_trame_state(trame_state) session_integration.save_current_session() return result return wrapper return decorator class JobProgressTracker: """ Tracks progress of long-running jobs and periodically saves session. """ def __init__( self, session_integration: SessionIntegration, trame_state: Any, job_id: str, service_type: str = "unknown", save_interval: int = 10 ): """ Initialize job progress tracker. Args: session_integration: SessionIntegration instance trame_state: Trame server state job_id: Job ID to track service_type: Service type (e.g., "qiskit_ibm") save_interval: Seconds between progress saves """ self.session_integration = session_integration self.trame_state = trame_state self.job_id = job_id self.service_type = service_type self.save_interval = save_interval self.last_save = datetime.utcnow() # Register job in session session_integration.add_job(job_id, service_type) def update_progress(self, progress: float, status: str = "running") -> None: """ Update job progress. Args: progress: Progress 0.0-1.0 status: Job status string """ # Check if it's time to save now = datetime.utcnow() if now - self.last_save > timedelta(seconds=self.save_interval): if self.session_integration.auto_save_enabled: self.session_integration.update_job_status(self.job_id, status) self.session_integration._capture_trame_state(self.trame_state) self.session_integration.save_current_session() self.last_save = now def complete(self, result: Optional[Dict[str, Any]] = None) -> None: """ Mark job as complete. Args: result: Optional result data """ self.session_integration.update_job_status( self.job_id, "completed", result ) self.session_integration._capture_trame_state(self.trame_state) self.session_integration.save_current_session() def fail(self, error: str = "Unknown error") -> None: """ Mark job as failed. Args: error: Error message """ self.session_integration.update_job_status( self.job_id, "failed", {"error": error} ) self.session_integration._capture_trame_state(self.trame_state) self.session_integration.save_current_session()