dummyQuantum / auto_save.py
Apurva Tiwari
feature: sessions, init
ca961b4
"""
Auto-Save Utilities
Provides decorators and context managers for automatic session saving
after long-running operations like job submissions.
"""
import functools
import threading
import time
from typing import Callable, Optional, Any, Dict
from datetime import datetime, timedelta
from session_integration import SessionIntegration
class AutoSaveManager:
"""
Manages periodic auto-saving of session state.
"""
def __init__(
self,
session_integration: SessionIntegration,
trame_state: Any,
interval_seconds: int = 30
):
"""
Initialize auto-save manager.
Args:
session_integration: SessionIntegration instance
trame_state: Trame server state
interval_seconds: Interval between auto-saves
"""
self.session_integration = session_integration
self.trame_state = trame_state
self.interval = interval_seconds
self.thread: Optional[threading.Thread] = None
self.running = False
self.last_save = datetime.utcnow()
def start(self) -> None:
"""Start the auto-save thread."""
if self.running:
return
self.running = True
self.thread = threading.Thread(target=self._auto_save_loop, daemon=True)
self.thread.name = "SessionAutoSaveThread"
self.thread.start()
def stop(self) -> None:
"""Stop the auto-save thread."""
self.running = False
if self.thread:
self.thread.join(timeout=5)
def _auto_save_loop(self) -> None:
"""Main loop for auto-save thread."""
while self.running:
try:
time.sleep(self.interval)
if self.session_integration.auto_save_enabled:
self.session_integration._capture_trame_state(self.trame_state)
self.session_integration.save_current_session()
self.last_save = datetime.utcnow()
except Exception as e:
print(f"Auto-save error: {e}")
def manual_save(self) -> bool:
"""Manually trigger a save."""
if not self.session_integration.auto_save_enabled:
return False
self.session_integration._capture_trame_state(self.trame_state)
result = self.session_integration.save_current_session()
if result:
self.last_save = datetime.utcnow()
return result
def auto_save_after_operation(
session_integration: SessionIntegration,
trame_state: Any
) -> Callable:
"""
Decorator that automatically saves session after a function completes.
Usage:
@auto_save_after_operation(session_integration, state)
def run_long_job():
# Do work
pass
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
finally:
if session_integration.auto_save_enabled:
session_integration._capture_trame_state(trame_state)
session_integration.save_current_session()
return result
return wrapper
return decorator
def save_on_job_submit(
session_integration: SessionIntegration,
trame_state: Any,
job_id_key: str = "job_id",
service_type: str = "unknown"
) -> Callable:
"""
Decorator that tracks job submission and saves session.
Usage:
@save_on_job_submit(session_integration, state, job_id_key="job_id", service_type="qiskit_ibm")
def submit_to_ibm():
job_id = "..."
return job_id
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
# Extract job ID from result
job_id = None
if isinstance(result, str):
job_id = result
elif isinstance(result, dict) and job_id_key in result:
job_id = result[job_id_key]
# Track job in session
if job_id:
session_integration.add_job(job_id, service_type)
print(f"Job {job_id} tracked in session {session_integration.current_session_id}")
# Save session
if session_integration.auto_save_enabled:
session_integration._capture_trame_state(trame_state)
session_integration.save_current_session()
return result
return wrapper
return decorator
class JobProgressTracker:
"""
Tracks progress of long-running jobs and periodically saves session.
"""
def __init__(
self,
session_integration: SessionIntegration,
trame_state: Any,
job_id: str,
service_type: str = "unknown",
save_interval: int = 10
):
"""
Initialize job progress tracker.
Args:
session_integration: SessionIntegration instance
trame_state: Trame server state
job_id: Job ID to track
service_type: Service type (e.g., "qiskit_ibm")
save_interval: Seconds between progress saves
"""
self.session_integration = session_integration
self.trame_state = trame_state
self.job_id = job_id
self.service_type = service_type
self.save_interval = save_interval
self.last_save = datetime.utcnow()
# Register job in session
session_integration.add_job(job_id, service_type)
def update_progress(self, progress: float, status: str = "running") -> None:
"""
Update job progress.
Args:
progress: Progress 0.0-1.0
status: Job status string
"""
# Check if it's time to save
now = datetime.utcnow()
if now - self.last_save > timedelta(seconds=self.save_interval):
if self.session_integration.auto_save_enabled:
self.session_integration.update_job_status(self.job_id, status)
self.session_integration._capture_trame_state(self.trame_state)
self.session_integration.save_current_session()
self.last_save = now
def complete(self, result: Optional[Dict[str, Any]] = None) -> None:
"""
Mark job as complete.
Args:
result: Optional result data
"""
self.session_integration.update_job_status(
self.job_id,
"completed",
result
)
self.session_integration._capture_trame_state(self.trame_state)
self.session_integration.save_current_session()
def fail(self, error: str = "Unknown error") -> None:
"""
Mark job as failed.
Args:
error: Error message
"""
self.session_integration.update_job_status(
self.job_id,
"failed",
{"error": error}
)
self.session_integration._capture_trame_state(self.trame_state)
self.session_integration.save_current_session()