Spaces:
Runtime error
Runtime error
| """ | |
| Auto-Save Utilities | |
| Provides decorators and context managers for automatic session saving | |
| after long-running operations like job submissions. | |
| """ | |
| import functools | |
| import threading | |
| import time | |
| from typing import Callable, Optional, Any, Dict | |
| from datetime import datetime, timedelta | |
| from session_integration import SessionIntegration | |
| class AutoSaveManager: | |
| """ | |
| Manages periodic auto-saving of session state. | |
| """ | |
| def __init__( | |
| self, | |
| session_integration: SessionIntegration, | |
| trame_state: Any, | |
| interval_seconds: int = 30 | |
| ): | |
| """ | |
| Initialize auto-save manager. | |
| Args: | |
| session_integration: SessionIntegration instance | |
| trame_state: Trame server state | |
| interval_seconds: Interval between auto-saves | |
| """ | |
| self.session_integration = session_integration | |
| self.trame_state = trame_state | |
| self.interval = interval_seconds | |
| self.thread: Optional[threading.Thread] = None | |
| self.running = False | |
| self.last_save = datetime.utcnow() | |
| def start(self) -> None: | |
| """Start the auto-save thread.""" | |
| if self.running: | |
| return | |
| self.running = True | |
| self.thread = threading.Thread(target=self._auto_save_loop, daemon=True) | |
| self.thread.name = "SessionAutoSaveThread" | |
| self.thread.start() | |
| def stop(self) -> None: | |
| """Stop the auto-save thread.""" | |
| self.running = False | |
| if self.thread: | |
| self.thread.join(timeout=5) | |
| def _auto_save_loop(self) -> None: | |
| """Main loop for auto-save thread.""" | |
| while self.running: | |
| try: | |
| time.sleep(self.interval) | |
| if self.session_integration.auto_save_enabled: | |
| self.session_integration._capture_trame_state(self.trame_state) | |
| self.session_integration.save_current_session() | |
| self.last_save = datetime.utcnow() | |
| except Exception as e: | |
| print(f"Auto-save error: {e}") | |
| def manual_save(self) -> bool: | |
| """Manually trigger a save.""" | |
| if not self.session_integration.auto_save_enabled: | |
| return False | |
| self.session_integration._capture_trame_state(self.trame_state) | |
| result = self.session_integration.save_current_session() | |
| if result: | |
| self.last_save = datetime.utcnow() | |
| return result | |
| def auto_save_after_operation( | |
| session_integration: SessionIntegration, | |
| trame_state: Any | |
| ) -> Callable: | |
| """ | |
| Decorator that automatically saves session after a function completes. | |
| Usage: | |
| @auto_save_after_operation(session_integration, state) | |
| def run_long_job(): | |
| # Do work | |
| pass | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| try: | |
| result = func(*args, **kwargs) | |
| finally: | |
| if session_integration.auto_save_enabled: | |
| session_integration._capture_trame_state(trame_state) | |
| session_integration.save_current_session() | |
| return result | |
| return wrapper | |
| return decorator | |
| def save_on_job_submit( | |
| session_integration: SessionIntegration, | |
| trame_state: Any, | |
| job_id_key: str = "job_id", | |
| service_type: str = "unknown" | |
| ) -> Callable: | |
| """ | |
| Decorator that tracks job submission and saves session. | |
| Usage: | |
| @save_on_job_submit(session_integration, state, job_id_key="job_id", service_type="qiskit_ibm") | |
| def submit_to_ibm(): | |
| job_id = "..." | |
| return job_id | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| result = func(*args, **kwargs) | |
| # Extract job ID from result | |
| job_id = None | |
| if isinstance(result, str): | |
| job_id = result | |
| elif isinstance(result, dict) and job_id_key in result: | |
| job_id = result[job_id_key] | |
| # Track job in session | |
| if job_id: | |
| session_integration.add_job(job_id, service_type) | |
| print(f"Job {job_id} tracked in session {session_integration.current_session_id}") | |
| # Save session | |
| if session_integration.auto_save_enabled: | |
| session_integration._capture_trame_state(trame_state) | |
| session_integration.save_current_session() | |
| return result | |
| return wrapper | |
| return decorator | |
| class JobProgressTracker: | |
| """ | |
| Tracks progress of long-running jobs and periodically saves session. | |
| """ | |
| def __init__( | |
| self, | |
| session_integration: SessionIntegration, | |
| trame_state: Any, | |
| job_id: str, | |
| service_type: str = "unknown", | |
| save_interval: int = 10 | |
| ): | |
| """ | |
| Initialize job progress tracker. | |
| Args: | |
| session_integration: SessionIntegration instance | |
| trame_state: Trame server state | |
| job_id: Job ID to track | |
| service_type: Service type (e.g., "qiskit_ibm") | |
| save_interval: Seconds between progress saves | |
| """ | |
| self.session_integration = session_integration | |
| self.trame_state = trame_state | |
| self.job_id = job_id | |
| self.service_type = service_type | |
| self.save_interval = save_interval | |
| self.last_save = datetime.utcnow() | |
| # Register job in session | |
| session_integration.add_job(job_id, service_type) | |
| def update_progress(self, progress: float, status: str = "running") -> None: | |
| """ | |
| Update job progress. | |
| Args: | |
| progress: Progress 0.0-1.0 | |
| status: Job status string | |
| """ | |
| # Check if it's time to save | |
| now = datetime.utcnow() | |
| if now - self.last_save > timedelta(seconds=self.save_interval): | |
| if self.session_integration.auto_save_enabled: | |
| self.session_integration.update_job_status(self.job_id, status) | |
| self.session_integration._capture_trame_state(self.trame_state) | |
| self.session_integration.save_current_session() | |
| self.last_save = now | |
| def complete(self, result: Optional[Dict[str, Any]] = None) -> None: | |
| """ | |
| Mark job as complete. | |
| Args: | |
| result: Optional result data | |
| """ | |
| self.session_integration.update_job_status( | |
| self.job_id, | |
| "completed", | |
| result | |
| ) | |
| self.session_integration._capture_trame_state(self.trame_state) | |
| self.session_integration.save_current_session() | |
| def fail(self, error: str = "Unknown error") -> None: | |
| """ | |
| Mark job as failed. | |
| Args: | |
| error: Error message | |
| """ | |
| self.session_integration.update_job_status( | |
| self.job_id, | |
| "failed", | |
| {"error": error} | |
| ) | |
| self.session_integration._capture_trame_state(self.trame_state) | |
| self.session_integration.save_current_session() | |